/** * 版权所有 @鸿名物联 * 未经授权,禁止侵权和商业,违法必究 * 联系QQ:2224313811 * */ package com.lp.mqtt; import com.lp.util.LogUtil; import com.lp.util.LogUtil4j; import com.lp.util.PropertiesUtil; import org.apache.commons.lang.StringUtils; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.DependsOn; import org.springframework.core.task.TaskExecutor; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; @Service @DependsOn(value = {"taskExecutor" , "applicationContext"}) public class MqttService { @Autowired private TaskExecutor taskExecutor ; // MQTT安装的服务器地址:MQTT定义的端口号 public static final String HOST = PropertiesUtil.getProperty("mqtt.config" ,"mqtt.serverURI1"); // 定阅的主题 public static final String TOPIC = PropertiesUtil.getProperty("mqtt.config" ,"mqtt.service.topic"); // 定义MQTT的ID private static final String clientid = PropertiesUtil.getProperty("mqtt.config" ,"mqtt.clientId"); public static final String userName = PropertiesUtil.getProperty("mqtt.config" ,"mqtt.username"); public static final String passWord = PropertiesUtil.getProperty("mqtt.config" ,"mqtt.password"); public static final Boolean cleanSession = "true".equalsIgnoreCase(PropertiesUtil.getProperty("mqtt.config" ,"mqtt.cleanSession")) ; public static final Boolean autoReconn = "true".equalsIgnoreCase(PropertiesUtil.getProperty("mqtt.config" ,"mqtt.autoReconnect")) ; public static final String keepalive = PropertiesUtil.getProperty("mqtt.config" ,"mqtt.keepalive") ; public static final String timeout = PropertiesUtil.getProperty("mqtt.config" ,"mqtt.timeout") ; public static Logger LOGGER = (Logger) LoggerFactory.getLogger(MqttService.class); public static MqttClient client; /** * 构造函数 * @throws MqttException */ public MqttService() throws MqttException { // MemoryPersistence设置 client = new MqttClient(HOST, clientid, new MemoryPersistence()); } @PostConstruct public void init(){ connect(); } /** * 用来连接服务器 */ public void connect() { MqttConnectOptions options = new MqttConnectOptions(); options.setCleanSession(false); options.setUserName(userName); options.setPassword(passWord.toCharArray()); // 设置超时时间 if(StringUtils.isNumeric(timeout)){ options.setConnectionTimeout(Integer.parseInt(timeout)); } // 设置会话心跳时间 if(StringUtils.isNumeric(keepalive)){ options.setKeepAliveInterval(Integer.parseInt(keepalive)); } // 重连 options.setAutomaticReconnect(autoReconn); // 清楚缓存 options.setCleanSession(cleanSession); try { client.setCallback(new MessageCallback(taskExecutor)); client.connect(options); } catch (Exception e) { LogUtil.errorLog(e); } } public static void subscribe(){ try{ // 订阅消息 String[] topic1 = TOPIC.split(",") ; int[] Qos = new int[topic1.length]; for(int i = 0; i< Qos.length ;i++ ){ Qos[i] =1 ; } client.subscribe(topic1, Qos); LogUtil4j.debugLogger.debug("subscribe with {}", topic1); }catch (Exception e) { LOGGER.error("subscribe error", e); } } /** * 消息发送 * @param message byte * @param topic */ public static void pubMessage(byte[] message,String topic){ MqttMessage mess = new MqttMessage(); mess.setQos(1); mess.setRetained(false); mess.setPayload(message); try { client.publish(topic, mess); } catch (Exception e) { LOGGER.error("pubMessage", e); } } /** * 消息发送 * @param message * @param topic */ public static void pubMessage(String message,String topic){ MqttMessage mess = new MqttMessage(); mess.setQos(1); mess.setRetained(false); mess.setPayload(message.getBytes()); try { client.publish(topic, mess); } catch (Exception e) { LOGGER.error("pubMessage", e); } } }