diff --git a/src/main/java/com/lp/mqtt/protocol/ProtocalMing.java b/src/main/java/com/lp/mqtt/protocol/ProtocalMing.java index 45c6afc..1abee95 100644 --- a/src/main/java/com/lp/mqtt/protocol/ProtocalMing.java +++ b/src/main/java/com/lp/mqtt/protocol/ProtocalMing.java @@ -30,10 +30,10 @@ public class ProtocalMing implements Iprotocal { // 这边可以保证主题都是 /dev/coo/device_id msg =msg.replaceAll("\r|\n", ""); - + // 交互的 List list = new ArrayList<>(); - + int report_flag = 0; // 判断mqtt json 标识,兼容多种格式 if(msg.contains("sid")){ List tmpList = JSON.parseArray(msg, SimpleProtocolMqtt.class); @@ -49,7 +49,7 @@ public class ProtocalMing implements Iprotocal { }else{ list = JSON.parseArray(msg, IotSensorInfoBO.class); } - + String[] tmp = topic.split("/"); String deviceCode = tmp[tmp.length -1]; // LogUtil4j.debugLogger.debug("analysisData get msg from topic:{}---device:{}", topic, deviceCode); @@ -82,21 +82,22 @@ public class ProtocalMing implements Iprotocal { IotSensorInfoServiceImpl app = (IotSensorInfoServiceImpl) SpringApplicationContext.getBean("iotSensorInfoServiceImpl") ; app.updateRealTimeData(sensorInfo) ; } - + // 下发通知消息 IotNodeInfoBO iotNodeInfo = ProCacheUtil.getCache(CacheName.NODEINFO_DEVICECODE, deviceCode); - + // websocket 发送消息 if(ObjectUtil.isNotEmpty(iotNodeInfo)){ + String message = "{\"report_flag\":\"" + report_flag + "\"" + (report_flag == 1 ? ", \"sensor_id\":\"" + list.get(0).getId() + "\"" : "") + "}"; String scene_id = ProCacheUtil.getCache(CacheName.SCENE_IPDATE_FLAG , "scene_" + iotNodeInfo.getScene_id() ); if(ObjectUtil.isNotEmpty(scene_id)){ // LogUtil4j.debugLogger.debug("will send msg to /scene/update/{}", iotNodeInfo.getScene_id()); - MqttService.pubMessage( "1", "/scene/update/"+ iotNodeInfo.getScene_id() ); + MqttService.pubMessage( message, "/scene/update/"+ iotNodeInfo.getScene_id() ); } String node_id = ProCacheUtil.getCache(CacheName.SCENE_IPDATE_FLAG , "node_" + iotNodeInfo.getId() ); if(ObjectUtil.isNotEmpty(node_id)){ // LogUtil4j.debugLogger.debug("will send msg to /node/update/{}", iotNodeInfo.getId()); - MqttService.pubMessage( "1", "/node/update/"+ iotNodeInfo.getId() ); + MqttService.pubMessage( message, "/node/update/"+ iotNodeInfo.getId() ); } } } @@ -161,5 +162,5 @@ public class ProtocalMing implements Iprotocal { return -1 ; } } - + } diff --git a/src/main/java/com/lp/mqtt/protocol/ProtocalXRxasn.java b/src/main/java/com/lp/mqtt/protocol/ProtocalXRxasn.java index c65631f..9071293 100644 --- a/src/main/java/com/lp/mqtt/protocol/ProtocalXRxasn.java +++ b/src/main/java/com/lp/mqtt/protocol/ProtocalXRxasn.java @@ -37,7 +37,7 @@ public class ProtocalXRxasn implements Iprotocal { // LogUtil4j.debugLogger.debug("analysisData get {}---{}---{}", topic, data, msg); // 这边可以保证主题都是 /dev/coo/device_id msg =msg.replaceAll("\r|\n", ""); - + // 交互的 List list = new ArrayList<>(); @@ -49,7 +49,7 @@ public class ProtocalXRxasn implements Iprotocal { LogUtil4j.debugLogger.warn("get no nodeInfo with {}", deviceCode); return ; } - + int report_flag = 0; if(ObjectUtil.isNotEmpty(xRxasnProtocolMqtt.getType()) && !topic.endsWith("/rtg")){ if(topic.endsWith("cmd/set/cack")){ IotLoggingServiceImpl iotLoggingService = (IotLoggingServiceImpl) SpringApplicationContext.getBean("iotLoggingServiceImpl") ; @@ -75,9 +75,11 @@ public class ProtocalXRxasn implements Iprotocal { tmp.setScene_id(nodeInfo.getScene_id()); tmp.setSeq(Integer.valueOf(xRxasnProtocolMqtt.getSeq())); if(xRsasnTag.getValid() == 1 && xRsasnTag.getRemark().equals("success")) { + report_flag = 1; IotSensorInfoBO tp = new IotSensorInfoBO(); tp.setSensor_device_id(sensor_device_id); tp.setSdata(xRsasnTag.getV()); + tp.setId(sensorInfo.getId()); list.add(tp); tmp.setSensor_device_id(sensor_device_id); tmp.setInfos("0"); @@ -92,6 +94,7 @@ public class ProtocalXRxasn implements Iprotocal { } else return; } else if (topic.endsWith("/rtg") && ObjectUtil.isEmpty(xRxasnProtocolMqtt.getType())) { + report_flag = 0; List xRsasnDevs = JSON.parseArray(xRxasnProtocolMqtt.getDevs().toString(), XRsasnDev.class); xRsasnDevs.forEach(dev -> { dev.getDev(); @@ -131,19 +134,21 @@ public class ProtocalXRxasn implements Iprotocal { IotSensorInfoServiceImpl app = (IotSensorInfoServiceImpl) SpringApplicationContext.getBean("iotSensorInfoServiceImpl") ; app.updateRealTimeData(sensorInfo) ; } - + // 下发通知消息 IotNodeInfoBO iotNodeInfo = ProCacheUtil.getCache(CacheName.NODEINFO_DEVICECODE, deviceCode); // LogUtil4j.debugLogger.debug("sending update to web for {}", ObjectUtil.isNotEmpty(iotNodeInfo) ? iotNodeInfo.getScene_id() : null); // websocket 发送消息 + if(ObjectUtil.isNotEmpty(iotNodeInfo)){ + String message = "{\"report_flag\":\"" + report_flag + "\"" + (report_flag == 1 ? ", \"sensor_id\":\"" + list.get(0).getId() + "\"" : "") + "}"; String scene_id = ProCacheUtil.getCache(CacheName.SCENE_IPDATE_FLAG , "scene_" + iotNodeInfo.getScene_id() ); if(ObjectUtil.isNotEmpty(scene_id)){ - MqttService.pubMessage( "1", "/scene/update/"+ iotNodeInfo.getScene_id() ); + MqttService.pubMessage( message, "/scene/update/"+ iotNodeInfo.getScene_id() ); } String node_id = ProCacheUtil.getCache(CacheName.SCENE_IPDATE_FLAG , "node_" + iotNodeInfo.getId() ); if(ObjectUtil.isNotEmpty(node_id)){ - MqttService.pubMessage( "1", "/node/update/"+ iotNodeInfo.getId() ); + MqttService.pubMessage( message, "/node/update/"+ iotNodeInfo.getId() ); } } } @@ -151,7 +156,7 @@ public class ProtocalXRxasn implements Iprotocal { @Override public void handbert(String topic) { // TODO Auto-generated method stub - + } @Override @@ -275,5 +280,5 @@ public class ProtocalXRxasn implements Iprotocal { return -1 ; } } - + }