/** * 版权所有 @鸿名物联 * 未经授权,禁止侵权和商业,违法必究 * 联系QQ:2224313811 * */ package com.lp.mqtt; import com.lp.util.LogUtil4j; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallbackExtended; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.core.task.TaskExecutor; public class MessageCallback implements MqttCallbackExtended { private TaskExecutor taskExecutor ; protected final static Logger LOGGER = LoggerFactory.getLogger(MessageCallback.class); public MessageCallback(TaskExecutor taskExecutor2) { super(); this.taskExecutor = taskExecutor2 ; } @Override public void connectionLost(Throwable arg0) { LogUtil4j.debugLogger.debug("connectionLost {}", arg0.getLocalizedMessage()); // TODO 连接断开,可以做重连,目前重连失败,还没有设置 } @Override public void deliveryComplete(IMqttDeliveryToken token) { // LogUtil4j.debugLogger.debug("deliveryComplete {}", token.getTopics()); // TODO delivery 传送OK } @Override public void messageArrived(String topic, MqttMessage message) { try{ Thread.sleep(50); // 消息放入线程池中处理 taskExecutor.execute(new MessageHandler(message.getPayload() , new String(message.getPayload()), topic)); }catch(Exception e){ LOGGER.error("messageArrived error: ", e); e.printStackTrace(); } } @Override public void connectComplete(boolean arg0, String arg1) { // 连接成功后,重新订阅自己的主题 // LogUtil4j.debugLogger.debug("connectComplete with ({})({})", arg0, arg1); MqttService.subscribe(); } }