diff --git a/src/main/java/com/lp/bean/XRsasnDev.java b/src/main/java/com/lp/bean/XRsasnDev.java new file mode 100644 index 0000000..cf6efb6 --- /dev/null +++ b/src/main/java/com/lp/bean/XRsasnDev.java @@ -0,0 +1,24 @@ +package com.lp.bean; + +import com.lp.bo.IotTriggerInfoBO; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.codehaus.jackson.map.annotate.JsonSerialize; + +import java.util.ArrayList; +import java.util.List; + +/** + * mqtt json 字符串 转化实体对象 + * @author chenrj + * + */ +@Data +@NoArgsConstructor +@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) +public class XRsasnDev { + private List tagList = new ArrayList<>(); + private Object d; + private String dev ; + +} diff --git a/src/main/java/com/lp/bean/XRsasnTag.java b/src/main/java/com/lp/bean/XRsasnTag.java new file mode 100644 index 0000000..e11ad17 --- /dev/null +++ b/src/main/java/com/lp/bean/XRsasnTag.java @@ -0,0 +1,23 @@ +package com.lp.bean; + +import lombok.Data; +import lombok.NoArgsConstructor; +import org.codehaus.jackson.map.annotate.JsonSerialize; + +/** + * mqtt json 字符串 转化实体对象 + * @author chenrj + * + */ +@Data +@NoArgsConstructor +@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) +public class XRsasnTag { + + private String m ; + + private Long ts ; + + private Float v ; + +} diff --git a/src/main/java/com/lp/bean/XRxasnProtocolMqtt.java b/src/main/java/com/lp/bean/XRxasnProtocolMqtt.java new file mode 100644 index 0000000..a64a4c2 --- /dev/null +++ b/src/main/java/com/lp/bean/XRxasnProtocolMqtt.java @@ -0,0 +1,28 @@ +package com.lp.bean; + +import lombok.Data; +import lombok.NoArgsConstructor; +import org.codehaus.jackson.map.annotate.JsonSerialize; + +import java.util.ArrayList; +import java.util.List; + +/** + * mqtt json 字符串 转化实体对象 + * @author chenrj + * + */ +@Data +@NoArgsConstructor +@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) +public class XRxasnProtocolMqtt { + + private List devList = new ArrayList<>(); + private Object devs; + + private String pKey ; + private String sn ; + private Long ts ; + private String ver ; + +} diff --git a/src/main/java/com/lp/bean/XRxasnTagWriteMqtt.java b/src/main/java/com/lp/bean/XRxasnTagWriteMqtt.java new file mode 100644 index 0000000..7c293ae --- /dev/null +++ b/src/main/java/com/lp/bean/XRxasnTagWriteMqtt.java @@ -0,0 +1,29 @@ +package com.lp.bean; + +import lombok.Data; +import lombok.NoArgsConstructor; +import org.codehaus.jackson.map.annotate.JsonSerialize; + +/** + * mqtt json 字符串 转化实体对象 + * @author chenrj + * + */ +@Data +@NoArgsConstructor +@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) +public class XRxasnTagWriteMqtt { + +// 系统 ID,可自定义, +// 如果无需记录控制命令可视 +// 为无效参数 + private String sysid; + +// 设备名称 + private String dev ; + +// 标签名称 + private String m ; + private Float v ; + +} diff --git a/src/main/java/com/lp/bean/XRxasnWriteMqtt.java b/src/main/java/com/lp/bean/XRxasnWriteMqtt.java new file mode 100644 index 0000000..0f1d1ce --- /dev/null +++ b/src/main/java/com/lp/bean/XRxasnWriteMqtt.java @@ -0,0 +1,38 @@ +package com.lp.bean; + +import lombok.Data; +import lombok.NoArgsConstructor; +import org.codehaus.jackson.map.annotate.JsonSerialize; + +import java.util.ArrayList; +import java.util.List; + +/** + * mqtt json 字符串 转化实体对象 + * @author chenrj + * + */ +@Data +@NoArgsConstructor +@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) +public class XRxasnWriteMqtt { + +// 产品系列编号 + private String pKey ; + +// 数据帧序号 + private Integer seq ; + +// 包类型 功能码,指令类型 + private String type ; + private XRxasnTagWriteMqtt data ; + +// 网关编号 + private String sn ; + +// 绝对时间 + private Long ts ; + +// ver:数据格式版本协议版本 + private String ver ; +} diff --git a/src/main/java/com/lp/cache/ProCache.java b/src/main/java/com/lp/cache/ProCache.java index cbc4a3e..2ea293f 100644 --- a/src/main/java/com/lp/cache/ProCache.java +++ b/src/main/java/com/lp/cache/ProCache.java @@ -36,12 +36,12 @@ public class ProCache extends ResultMapUtils { taskExecutor.execute(new DictionaryThread(baseDao)); // 用户缓存 taskExecutor.execute(new UserInfoThread(baseDao)); + // 网关缓存 + taskExecutor.execute(new IotNodeInfoThread(baseDao)); // 传感器缓存 taskExecutor.execute(new IotSensorInfoThread(baseDao)); // LPM 中间件缓存 // taskExecutor.execute(new LpmInfoThread(baseDao)); - // 网关缓存 - taskExecutor.execute(new IotNodeInfoThread(baseDao)); // 传感器触发列表缓存 taskExecutor.execute(new IotSensorTriggerThread(baseDao)); // 场景缓存 @@ -245,13 +245,20 @@ public class ProCache extends ResultMapUtils { // 所有传感器的包含配置和数据 objt.setData_type(-1); List iotSensorInfoList = baseDao.selectList("IotSensorInfo.select", objt); + if( ObjectUtil.isNotEmpty(iotSensorInfoList) ){ for(IotSensorInfoBO obj: iotSensorInfoList){ + IotNodeInfoBO nobj = new IotNodeInfoBO(obj.getNode_id()); + nobj = ProCacheUtil.getCache(CacheName.NODEINFO, obj.getNode_id().toString(), nobj); ProCacheUtil.addCache(CacheName.SENSORINFO, obj.getId()+"", obj); // LogUtil4j.debugLogger.debug("add cache {}---{}", CacheName.SENSORINFO_NSP // , obj.getNode_id()+"-"+obj.getSensor_device_id()+"-"+obj.getPort_id()); - ProCacheUtil.addCache(CacheName.SENSORINFO_NSP, - obj.getNode_id()+"-"+obj.getSensor_device_id()+"-"+obj.getPort_id(), obj); + String port_id = (ObjectUtil.isNotEmpty(nobj) && nobj.getIot_protocal_category().equalsIgnoreCase("ProtocalXRxasn")) ? "" : "-"+obj.getPort_id(); + ProCacheUtil.addCache(CacheName.SENSORINFO_NSP, + obj.getNode_id()+"-"+obj.getSensor_device_id()+port_id, obj); + if(obj.getNode_id() == 22) { + System.out.println("aaa"); + } } } } diff --git a/src/main/java/com/lp/controller/iot/IotSensorInfoController.java b/src/main/java/com/lp/controller/iot/IotSensorInfoController.java index 63becab..e1c7ff3 100644 --- a/src/main/java/com/lp/controller/iot/IotSensorInfoController.java +++ b/src/main/java/com/lp/controller/iot/IotSensorInfoController.java @@ -92,14 +92,15 @@ public class IotSensorInfoController extends BaseController { obj.setMtime(new Date()); resultMap = service.insert("IotSensorInfo.insert", obj) ; if(isOk(resultMap)){ + IotNodeInfoBO nodeDt = ProCacheUtil.getCache(CacheName.NODEINFO, obj.getNode_id().toString()); ProCacheUtil.addCache(CacheName.SENSORINFO, obj.getId().toString(), obj); // LogUtil4j.debugLogger.debug("add cache {}---{}", CacheName.SENSORINFO_NSP // , obj.getNode_id()+"-"+obj.getSensor_device_id()+"-"+obj.getPort_id()); - ProCacheUtil.addCache(CacheName.SENSORINFO_NSP, obj.getNode_id()+"-"+obj.getSensor_device_id()+"-"+obj.getPort_id(), obj); - + String port_id = (ObjectUtil.isNotEmpty(nodeDt) && nodeDt.getIot_protocal_category().equalsIgnoreCase("ProtocalXRxasn")) ? "" : "-"+obj.getPort_id(); + ProCacheUtil.addCache(CacheName.SENSORINFO_NSP, obj.getNode_id()+"-"+obj.getSensor_device_id()+port_id, obj); + // 这边获取到网关的缓存信息,如果是modbus设备,则更新下发的缓存 - IotNodeInfoBO nodeDt = ProCacheUtil.getCache(CacheName.NODEINFO, obj.getNode_id().toString()); if(ObjectUtil.isNotEmpty(nodeDt)){ if( nodeDt.getIot_node_type()+0 == 83 && nodeDt.getIot_protocal_category().contains("ProtocalModbus") ){ // 修改网关信息后,则设置设备重新连接 TCP协议 @@ -247,6 +248,7 @@ public class IotSensorInfoController extends BaseController { tmp.setSdata(obj.getSdata()); tmp.setRequest_sdata(obj.getRequest_sdata()); tmp.setUser_id(user.getId()); + tmp.setData_type(-1); // 发送消息给LPM if( ProtocolUtil.sendControlSensorCommand(obj) >-1){ resultMap = service.update("IotSensorInfo.update", tmp) ; @@ -314,6 +316,7 @@ public class IotSensorInfoController extends BaseController { tmp.setData_type(-1); tmp.setMtime(new Date()); // 发送消息给LPM + obj.setData_type(-1); if( ProtocolUtil.sendSensorParamDown(obj) >-1){ ProCacheUtil.addCache(CacheName.SENSOR_PARAM_SETTING, obj.getId()+"", tmp ); }else{ @@ -373,8 +376,6 @@ public class IotSensorInfoController extends BaseController { resultMap = service.update("IotSensorInfo.update", obj) ; if(isOk(resultMap)){ ProCacheUtil.addCache(CacheName.SENSORINFO, obj.getId().toString(), obj); -// LogUtil4j.debugLogger.debug("add cache {}---{}", CacheName.SENSORINFO_NSP -// , obj.getNode_id()+"-"+obj.getSensor_device_id()+"-"+obj.getPort_id()); ProCacheUtil.addCache(CacheName.SENSORINFO_NSP, obj.getNode_id()+"-"+obj.getSensor_device_id()+"-"+obj.getPort_id(), obj); // 这边获取到网关的缓存信息,如果是modbus设备,则更新下发的缓存 diff --git a/src/main/java/com/lp/mqtt/protocol/ProtocalXRxasn.java b/src/main/java/com/lp/mqtt/protocol/ProtocalXRxasn.java new file mode 100644 index 0000000..bb1e977 --- /dev/null +++ b/src/main/java/com/lp/mqtt/protocol/ProtocalXRxasn.java @@ -0,0 +1,232 @@ +package com.lp.mqtt.protocol; + +import com.alibaba.fastjson.JSON; +import com.lp.bean.*; +import com.lp.bo.IotNodeInfoBO; +import com.lp.bo.IotSensorInfoBO; +import com.lp.cache.CacheName; +import com.lp.cache.ProCacheUtil; +import com.lp.common.CodeIot; +import com.lp.mqtt.MqttService; +import com.lp.service.impl.IotNodeInfoServerImpl; +import com.lp.service.impl.IotSensorInfoServiceImpl; +import com.lp.util.LogUtil4j; +import com.lp.util.ObjectUtil; +import com.lp.util.SpringApplicationContext; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.Array; +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Random; + +import static com.lp.util.CommonUtil.UUIDString.getUUIDString; + +//接收mqtt消息 +public class ProtocalXRxasn implements Iprotocal { + + protected final static Logger LOGGER = LoggerFactory.getLogger(ProtocalXRxasn.class); + + @Override + public void analysisData(String topic, byte[] data, String msg) { +// LogUtil4j.debugLogger.debug("analysisData get {}---{}---{}", topic, data, msg); + // 这边可以保证主题都是 /dev/coo/device_id + msg =msg.replaceAll("\r|\n", ""); + + // 交互的 + List list = new ArrayList<>(); + + XRxasnProtocolMqtt xRxasnProtocolMqtt = JSON.parseObject(msg, XRxasnProtocolMqtt.class); + List xRsasnDevs = JSON.parseArray(xRxasnProtocolMqtt.getDevs().toString(), XRsasnDev.class); + xRsasnDevs.forEach(dev -> { + dev.getDev(); + List xRsasnTags = JSON.parseArray(dev.getD().toString(), XRsasnTag.class); + xRsasnTags.forEach(tag -> { + IotSensorInfoBO tp = new IotSensorInfoBO(); + tp.setSensor_device_id(dev.getDev() + "." + tag.getM()); +// tp.setPort_id(0); + tp.setSdata(tag.getV()); + list.add(tp); +// LogUtil4j.debugLogger.debug("get sensor {} from topic {}", dev.getDev() + "." + tag.getM(), topic); + }); + }); + String deviceCode = xRxasnProtocolMqtt.getSn(); + + IotNodeInfoBO nodeInfo = ProCacheUtil.getCache(CacheName.NODEINFO_DEVICECODE, deviceCode ); + if(ObjectUtil.isEmpty(nodeInfo)){ + LogUtil4j.debugLogger.warn("get no nodeInfo with {}", deviceCode); + return ; + } + + if(nodeInfo.getIot_node_status() != CodeIot.DEVICE_STATUS.ONLINE){ + IotNodeInfoBO nodB = new IotNodeInfoBO() ; + nodB.setDevice_code(deviceCode); + nodB.setIot_node_status(CodeIot.DEVICE_STATUS.ONLINE); + IotNodeInfoServerImpl nodeTmp = SpringApplicationContext.getBeanType("iotNodeInfoServerImpl") ; + nodeTmp.updateNodeStatus(nodB); + } +// LogUtil4j.debugLogger.debug("get IotSensorInfoBO list {}", list.toString()); + for(IotSensorInfoBO sensorInfo : list){ + sensorInfo.setDevice_code(deviceCode); + sensorInfo.setRequest_sdata(sensorInfo.getSdata()); +// if(sensorInfo.getPort_id() == null){ +// sensorInfo.setPort_id( Integer.parseInt(sensorInfo.getSensor_device_id()) ); +// } + if(ObjectUtil.isEmpty(sensorInfo.getMtime())){ + sensorInfo.setMtime(new Date()); + } +// LogUtil4j.debugLogger.debug("get IotSensorInfoBO from list {}", sensorInfo.toString()); + // 传感器处理 + IotSensorInfoServiceImpl app = (IotSensorInfoServiceImpl) SpringApplicationContext.getBean("iotSensorInfoServiceImpl") ; + app.updateRealTimeData(sensorInfo) ; + } + + // 下发通知消息 + IotNodeInfoBO iotNodeInfo = ProCacheUtil.getCache(CacheName.NODEINFO_DEVICECODE, deviceCode); + LogUtil4j.debugLogger.debug("sending update to web for {}", ObjectUtil.isNotEmpty(iotNodeInfo) ? iotNodeInfo.getScene_id() : null); + // websocket 发送消息 + if(ObjectUtil.isNotEmpty(iotNodeInfo)){ + String scene_id = ProCacheUtil.getCache(CacheName.SCENE_IPDATE_FLAG , "scene_" + iotNodeInfo.getScene_id() ); + if(ObjectUtil.isNotEmpty(scene_id)){ + MqttService.pubMessage( "1", "/scene/update/"+ iotNodeInfo.getScene_id() ); + } + String node_id = ProCacheUtil.getCache(CacheName.SCENE_IPDATE_FLAG , "node_" + iotNodeInfo.getId() ); + if(ObjectUtil.isNotEmpty(node_id)){ + MqttService.pubMessage( "1", "/node/update/"+ iotNodeInfo.getId() ); + } + } + } + + @Override + public void handbert(String topic) { + // TODO Auto-generated method stub + + } + + @Override + public void loginProtocal(Object obj) { + // TODO Auto-generated method stub + IotNodeInfoServerImpl tmp = SpringApplicationContext.getBeanType("iotNodeInfoServerImpl") ; + tmp.updateNodeStatus(((IotNodeInfoBO) obj)); + } + + @Override + public void logout(Object obj) { + // TODO Auto-generated method stub + IotNodeInfoServerImpl tmp = SpringApplicationContext.getBeanType("iotNodeInfoServerImpl") ; + tmp.updateNodeStatus(((IotNodeInfoBO) obj)); + } + + @Override + public Integer execServerControll(IotSensorInfoBO sensor, IotNodeInfoBO node) { + if(node.getIot_node_status() == CodeIot.DEVICE_STATUS.ONLINE+0){ + String[] split = sensor.getSensor_device_id().split("\\.", 2); + String value = null; + if(sensor.getSdata_degree() == null || sensor.getSdata_degree() == 0 ){ + value = String.valueOf(sensor.getRequest_sdata().intValue()); + } else value = String.valueOf(sensor.getRequest_sdata()); + String msg = "{\n" + + "\"ver\":\"v2.0.0\",\n" + + "\"pKey\":\"iotadmin\",\n" + + "\"sn\":\""+node.getDevice_code()+"\",\n" + + "\"seq\":"+(new Random()).nextInt(65535)+",\n" + + "\"type\":\"cmd/set\",\n" + + "\"ts\":"+(new Date()).getTime()+",\n" + + "\"data\":{\n" + + "\"sysid\":\""+getUUIDString()+"\",\n" + + "\"dev\":\""+split[0]+"\",\n" + + "\"m\":\""+split[1]+"\",\n" + + "\"v\":"+value+"\n" + + "}\n" + + "}"; +// LogUtil4j.debugLogger.debug("sending {} to {}", msg, node.getDevice_code()); + MqttService.pubMessage(msg, + "/cloud" + "/iotadmin" + "/" + node.getDevice_code() + "/cmd/set" ); + return 0; + }else{ + return -1 ; + } + } + + @Override + public Integer execServerParamWrite(IotSensorInfoBO sensor, IotNodeInfoBO node) { +// LogUtil4j.debugLogger.debug("{} to {}---{}---{}", sensor.getSensor_device_id(), sensor.getSdata(), node.getIot_node_status(), sensor.getSdata_degree()); + if(node.getIot_node_status() == CodeIot.DEVICE_STATUS.ONLINE+0){ + String[] split = sensor.getSensor_device_id().split("\\.", 2); + String value = null; + if(sensor.getSdata_degree() == null || sensor.getSdata_degree() == 0 ){ + value = String.valueOf(sensor.getRequest_sdata().intValue()); + } else value = String.valueOf(sensor.getRequest_sdata()); + String msg = "{\n" + + "\"ver\":\"v2.0.0\",\n" + + "\"pKey\":\"iotadmin\",\n" + + "\"sn\":\""+node.getDevice_code()+"\",\n" + + "\"seq\":"+(new Random()).nextInt(65535)+",\n" + + "\"type\":\"cmd/set\",\n" + + "\"ts\":"+(new Date()).getTime()+",\n" + + "\"data\":{\n" + + "\"sysid\":\""+getUUIDString()+"\",\n" + + "\"dev\":\""+split[0]+"\",\n" + + "\"m\":\""+split[1]+"\",\n" + + "\"v\":"+value+"\n" + + "}\n" + + "}"; +// XRxasnWriteMqtt xRxasnWriteMqtt = new XRxasnWriteMqtt(); +// xRxasnWriteMqtt.setVer("v2.0.0"); +// xRxasnWriteMqtt.setPKey("/iotadmin"); +// xRxasnWriteMqtt.setSeq((new Random()).nextInt(65535)); +// xRxasnWriteMqtt.setSn(node.getDevice_code()); +// xRxasnWriteMqtt.setTs((new Date()).getTime()); +// xRxasnWriteMqtt.setType("cmd/set"); +// +// XRxasnTagWriteMqtt xRxasnTagWriteMqtt = new XRxasnTagWriteMqtt(); +// xRxasnTagWriteMqtt.setSysid(getUUIDString()); +// xRxasnTagWriteMqtt.setDev(split[0]); +// xRxasnTagWriteMqtt.setM(split[1]); +// xRxasnTagWriteMqtt.setV(sensor.getRequest_sdata()); +// xRxasnWriteMqtt.setData(xRxasnTagWriteMqtt); +// LogUtil4j.debugLogger.debug("sending {} to {}", msg, node.getDevice_code()); + MqttService.pubMessage(msg, + "/cloud" + "/iotadmin" + "/" + node.getDevice_code() + "/cmd/set" ); + return 0 ; + }else{ + return -1 ; + } + } + + @Override + public Integer execServerParamRead(IotSensorInfoBO sensor, IotNodeInfoBO node) { + if(node.getIot_node_status() == CodeIot.DEVICE_STATUS.ONLINE+0){ + String[] split = sensor.getSensor_device_id().split("\\.", 2); + String value = null; + if(sensor.getSdata_degree() == null || sensor.getSdata_degree() == 0 ){ + value = String.valueOf(sensor.getRequest_sdata().intValue()); + } else value = String.valueOf(sensor.getRequest_sdata()); + String msg = "{\n" + + "\"ver\":\"v2.0.0\",\n" + + "\"pKey\":\"iotadmin\",\n" + + "\"sn\":\""+node.getDevice_code()+"\",\n" + + "\"seq\":"+(new Random()).nextInt(65535)+",\n" + + "\"type\":\"cmd/set\",\n" + + "\"ts\":"+(new Date()).getTime()+",\n" + + "\"data\":{\n" + + "\"sysid\":\""+getUUIDString()+"\",\n" + + "\"dev\":\""+split[0]+"\",\n" + + "\"m\":\""+split[1]+"\",\n" + + "\"v\":"+value+"\n" + + "}\n" + + "}"; +// LogUtil4j.debugLogger.debug("sending {} to {}", msg, node.getDevice_code()); + MqttService.pubMessage(msg, + "/cloud" + "/iotadmin" + "/" + node.getDevice_code() + "/cmd/set" ); + return 0 ; + }else{ + return -1 ; + } + } + +} diff --git a/src/main/java/com/lp/mqtt/protocol/ProtocalXinaoV1.java b/src/main/java/com/lp/mqtt/protocol/ProtocalXinaoV1.java deleted file mode 100644 index 53a8ea8..0000000 --- a/src/main/java/com/lp/mqtt/protocol/ProtocalXinaoV1.java +++ /dev/null @@ -1,157 +0,0 @@ -package com.lp.mqtt.protocol; - -import com.alibaba.fastjson.JSON; -import com.lp.bean.SimpleProtocolMqtt; -import com.lp.bo.IotNodeInfoBO; -import com.lp.bo.IotSensorInfoBO; -import com.lp.cache.CacheName; -import com.lp.cache.ProCacheUtil; -import com.lp.common.CodeIot; -import com.lp.mqtt.MqttService; -import com.lp.service.impl.IotNodeInfoServerImpl; -import com.lp.service.impl.IotSensorInfoServiceImpl; -import com.lp.util.LogUtil4j; -import com.lp.util.ObjectUtil; -import com.lp.util.SpringApplicationContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.Date; -import java.util.List; - -//接收mqtt消息 -public class ProtocalXinaoV1 implements Iprotocal { - - protected final static Logger LOGGER = LoggerFactory.getLogger(ProtocalXinaoV1.class); - - @Override - public void analysisData(String topic, byte[] data, String msg) { - LogUtil4j.debugLogger.debug("analysisData get {}---{}---{}", topic, data, msg); - // 这边可以保证主题都是 /dev/coo/device_id - msg =msg.replaceAll("\r|\n", ""); - - // 交互的 - List list = new ArrayList<>(); - - // 判断mqtt json 标识,兼容多种格式 - if(msg.contains("sid")){ - List tmpList = JSON.parseArray(msg, SimpleProtocolMqtt.class); - // 对象转换 - for(SimpleProtocolMqtt dt : tmpList ){ - LogUtil4j.debugLogger.debug("analysisData msg from json {}", dt.toString()); - IotSensorInfoBO tp = new IotSensorInfoBO(); - tp.setSensor_device_id(dt.getSid()); - tp.setPort_id(dt.getPid()); - tp.setSdata(dt.getDat()); - list.add(tp); - } - }else{ - list = JSON.parseArray(msg, IotSensorInfoBO.class); - } - - String[] tmp = topic.split("/"); - String deviceCode = tmp[tmp.length -1]; - - IotNodeInfoBO nodeInfo = ProCacheUtil.getCache(CacheName.NODEINFO_DEVICECODE, deviceCode ); - LogUtil4j.debugLogger.debug("get nodeInfo with {}---{}", deviceCode, nodeInfo.toString()); - if(ObjectUtil.isEmpty(nodeInfo)){ - return ; - } - - if(nodeInfo.getIot_node_status() != CodeIot.DEVICE_STATUS.ONLINE){ - IotNodeInfoBO nodB = new IotNodeInfoBO() ; - nodB.setDevice_code(deviceCode); - nodB.setIot_node_status(CodeIot.DEVICE_STATUS.ONLINE); - IotNodeInfoServerImpl nodeTmp = SpringApplicationContext.getBeanType("iotNodeInfoServerImpl") ; - nodeTmp.updateNodeStatus(nodB); - } - LogUtil4j.debugLogger.debug("get IotSensorInfoBO list {}", list.toString()); - for(IotSensorInfoBO sensorInfo : list){ - sensorInfo.setDevice_code(deviceCode); - sensorInfo.setRequest_sdata(sensorInfo.getSdata()); - if(sensorInfo.getPort_id() == null){ - sensorInfo.setPort_id( Integer.parseInt(sensorInfo.getSensor_device_id()) ); - } - if(ObjectUtil.isEmpty(sensorInfo.getMtime())){ - sensorInfo.setMtime(new Date()); - } - LogUtil4j.debugLogger.debug("get IotSensorInfoBO from list {}", sensorInfo.toString()); - // 传感器处理 - IotSensorInfoServiceImpl app = (IotSensorInfoServiceImpl) SpringApplicationContext.getBean("iotSensorInfoServiceImpl") ; - app.updateRealTimeData(sensorInfo) ; - } - - // 下发通知消息 - IotNodeInfoBO iotNodeInfo = ProCacheUtil.getCache(CacheName.NODEINFO_DEVICECODE, deviceCode); - - // websocket 发送消息 - if(ObjectUtil.isNotEmpty(iotNodeInfo)){ - String scene_id = ProCacheUtil.getCache(CacheName.SCENE_IPDATE_FLAG , "scene_" + iotNodeInfo.getScene_id() ); - if(ObjectUtil.isNotEmpty(scene_id)){ - MqttService.pubMessage( "1", "/scene/update/"+ iotNodeInfo.getScene_id() ); - } - String node_id = ProCacheUtil.getCache(CacheName.SCENE_IPDATE_FLAG , "node_" + iotNodeInfo.getId() ); - if(ObjectUtil.isNotEmpty(node_id)){ - MqttService.pubMessage( "1", "/node/update/"+ iotNodeInfo.getId() ); - } - } - } - - @Override - public void handbert(String topic) { - // TODO Auto-generated method stub - - } - - @Override - public void loginProtocal(Object obj) { - // TODO Auto-generated method stub - IotNodeInfoServerImpl tmp = SpringApplicationContext.getBeanType("iotNodeInfoServerImpl") ; - tmp.updateNodeStatus(((IotNodeInfoBO) obj)); - } - - @Override - public void logout(Object obj) { - // TODO Auto-generated method stub - IotNodeInfoServerImpl tmp = SpringApplicationContext.getBeanType("iotNodeInfoServerImpl") ; - tmp.updateNodeStatus(((IotNodeInfoBO) obj)); - } - - @Override - public Integer execServerControll(IotSensorInfoBO sensor, IotNodeInfoBO node) { - if(node.getIot_node_status() == CodeIot.DEVICE_STATUS.ONLINE+0){ - MqttService.pubMessage("{\"sensor_device_id\":"+ sensor.getSensor_device_id() - +",\"port_id\":"+sensor.getPort_id()+",\"sdata\":"+sensor.getRequest_sdata() +"}", - "/server/coo/" + node.getDevice_code() ); - return 0 ; - }else{ - return -1 ; - } - } - - @Override - public Integer execServerParamWrite(IotSensorInfoBO sensor, IotNodeInfoBO node) { - if(node.getIot_node_status() == CodeIot.DEVICE_STATUS.ONLINE+0){ - MqttService.pubMessage("{\"sensor_device_id\":"+ sensor.getSensor_device_id() - +",\"port_id\":"+sensor.getPort_id()+",\"sdata\":"+sensor.getRequest_sdata() +"}", - "/server/coo/" + node.getDevice_code() ); - return 0 ; - }else{ - return -1 ; - } - } - - @Override - public Integer execServerParamRead(IotSensorInfoBO sensorInfo, IotNodeInfoBO node) { - if(node.getIot_node_status() == CodeIot.DEVICE_STATUS.ONLINE+0){ - MqttService.pubMessage("{\"sensor_device_id\":"+ sensorInfo.getSensor_device_id() - +",\"port_id\":"+sensorInfo.getPort_id() +"}", - "/server/coo/" + node.getDevice_code() ); - return 0; - }else{ - return -1 ; - } - } - -} diff --git a/src/main/java/com/lp/service/impl/IotSensorInfoServiceImpl.java b/src/main/java/com/lp/service/impl/IotSensorInfoServiceImpl.java index d27857b..7ac962c 100644 --- a/src/main/java/com/lp/service/impl/IotSensorInfoServiceImpl.java +++ b/src/main/java/com/lp/service/impl/IotSensorInfoServiceImpl.java @@ -32,8 +32,8 @@ public class IotSensorInfoServiceImpl extends BaseServiceImpl implements IotSens @Override public Map updateRealTimeData(IotSensorInfoBO obj) { - LogUtil4j.debugLogger.debug("updateRealTimeData is called with ({}---{}---{})" - , obj.getDevice_code() + "-" + obj.getSensor_device_id(), obj.getPort_id(), obj.getRequest_sdata()); +// LogUtil4j.debugLogger.debug("updateRealTimeData is called with ({}---{}---{})" +// , obj.getDevice_code() + "-" + obj.getSensor_device_id(), obj.getPort_id(), obj.getRequest_sdata()); Map resultMap = getResultMap(); try{ // 通过网关缓存 deviceCode -> nodeInfo -> id (or nodeId) @@ -70,7 +70,8 @@ public class IotSensorInfoServiceImpl extends BaseServiceImpl implements IotSens obj.setNode_id( iotNodeInfo.getId() ); // LogUtil4j.debugLogger.debug("get sensorinfo with key: {}", obj.getNode_id()+"-"+obj.getSensor_device_id()+"-"+obj.getPort_id()); // 获取传感器缓存信息 - IotSensorInfoBO sensorInfo = ProCacheUtil.getCache(CacheName.SENSORINFO_NSP, obj.getNode_id()+"-"+obj.getSensor_device_id()+"-"+obj.getPort_id(), obj); + String port_id = ObjectUtil.isEmpty(obj.getPort_id()) ? "" : "-" + obj.getPort_id(); + IotSensorInfoBO sensorInfo = ProCacheUtil.getCache(CacheName.SENSORINFO_NSP, obj.getNode_id()+"-"+obj.getSensor_device_id()+port_id, obj); if( ObjectUtil.isEmpty(sensorInfo) ){ LOGGER.warn("updateRealTimeData is called with {} no IotSensorInfoBO", obj.getNode_id()+"-"+obj.getSensor_device_id()+"-"+obj.getPort_id()); @@ -101,10 +102,8 @@ public class IotSensorInfoServiceImpl extends BaseServiceImpl implements IotSens obj.setSdata(ft); obj.setRequest_sdata(obj.getSdata()); } -// LogUtil4j.debugLogger.debug("updateRealTimeData is called with {} with {}", obj.getDevice_code(), sensorInfo.getSensor_device_id()); //*** 20190405 如果是配置数据,则直接更新数据库,并更新缓存,数据不进入历史表里面 if( sensorInfo.getData_type() == 1 ){ - // 配置数据 Integer num = dao.update("IotSensorInfo.updateRealTimeData", obj); if( num >0){ diff --git a/src/main/java/com/lp/util/iot/ProtocolUtil.java b/src/main/java/com/lp/util/iot/ProtocolUtil.java index 24d9601..6d9299e 100644 --- a/src/main/java/com/lp/util/iot/ProtocolUtil.java +++ b/src/main/java/com/lp/util/iot/ProtocolUtil.java @@ -9,6 +9,7 @@ import com.lp.common.CodeIot; import com.lp.mqtt.MqttService; import com.lp.mqtt.protocol.ProtocalFactory; import com.lp.util.Calculator; +import com.lp.util.LogUtil4j; import com.lp.util.ObjectUtil; @@ -25,6 +26,7 @@ public class ProtocolUtil { * @return */ public static Integer sendControlSensorCommand(IotSensorInfoBO obj ){ + LogUtil4j.debugLogger.debug("called with {}", obj); //IOT_SERVER_LPM:TYPE,deviceCode,SENSOR_DEVICE_ID,PORT_ID,DATA,FORMULATE StringBuffer strBuffer = new StringBuffer(); @@ -41,7 +43,7 @@ public class ProtocolUtil { obj.setSensor_device_id(sensorInfo.getSensor_device_id()); obj.setPort_id(sensorInfo.getPort_id()); - + obj.setSdata_degree(sensorInfo.getSdata_degree()); return ProtocalFactory.getInstance(iotNodeInfo.getIot_protocal_category()).execServerControll(obj, iotNodeInfo); }else if(iotNodeInfo.getIot_node_type() == CodeIot.IOT_NODE_STATUS.TCP){ @@ -83,6 +85,7 @@ public class ProtocolUtil { // 设备重启命令 public static Integer sendGatewayRestart(IotNodeInfoBO obj){ + LogUtil4j.debugLogger.debug("called with {}", obj); StringBuffer strBuffer = new StringBuffer(); if(obj.getIot_node_type() == CodeIot.IOT_NODE_STATUS.MQTT){ return 0 ; @@ -115,6 +118,7 @@ public class ProtocolUtil { * @return */ public static Integer sendSensorParamRead(IotSensorInfoBO obj ){ +// LogUtil4j.debugLogger.debug("called with {}", obj); //IOT_SERVER_LPM:TYPE,deviceCode,param_type // TYPE = param_read StringBuffer strBuffer = new StringBuffer(); @@ -125,7 +129,7 @@ public class ProtocolUtil { if(iotNodeInfo.getIot_node_type() == CodeIot.IOT_NODE_STATUS.MQTT){ obj.setSensor_device_id(sensorInfo.getSensor_device_id()); obj.setPort_id(sensorInfo.getPort_id()); - + obj.setSdata_degree(sensorInfo.getSdata_degree()); return ProtocalFactory.getInstance(iotNodeInfo.getIot_protocal_category()).execServerParamRead(obj, iotNodeInfo); }else if(iotNodeInfo.getIot_node_type() == CodeIot.IOT_NODE_STATUS.TCP ){ // tcp @@ -157,6 +161,7 @@ public class ProtocolUtil { * @return */ public static Integer sendSensorParamDown(IotSensorInfoBO obj ){ +// LogUtil4j.debugLogger.debug("called with {}---{}", obj.getId(), obj.getSdata()); //IOT_SERVER_LPM:TYPE,deviceCode,param_type:values // TYPE = param_write // @@ -165,6 +170,7 @@ public class ProtocolUtil { IotSensorInfo sensorInfo = ProCacheUtil.getCache(CacheName.SENSORINFO, obj.getId().toString() ); IotNodeInfoBO iotNodeInfo = ProCacheUtil.getCache(CacheName.NODEINFO, sensorInfo.getNode_id().toString() ); +// LogUtil4j.debugLogger.debug("sensor is {} and node is {}---{}---{}", sensorInfo.getNode_id(), iotNodeInfo.getDevice_code(), iotNodeInfo.getIot_node_type(), iotNodeInfo.getIot_protocal_category()); if(iotNodeInfo.getIot_node_type() == CodeIot.IOT_NODE_STATUS.MQTT){ float sdata_info = obj.getSdata() ; @@ -174,6 +180,7 @@ public class ProtocolUtil { obj.setSensor_device_id(sensorInfo.getSensor_device_id()); obj.setPort_id(sensorInfo.getPort_id()); obj.setRequest_sdata(sdata_info); + obj.setSdata_degree(sensorInfo.getSdata_degree()); return ProtocalFactory.getInstance(iotNodeInfo.getIot_protocal_category()).execServerParamWrite(obj, iotNodeInfo);