feat:增加讯铙新奥报文协议支撑

dev
kale 2023-05-30 14:55:18 +08:00
parent c53127782a
commit ea455938a4
11 changed files with 404 additions and 173 deletions

View File

@ -0,0 +1,24 @@
package com.lp.bean;
import com.lp.bo.IotTriggerInfoBO;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.codehaus.jackson.map.annotate.JsonSerialize;
import java.util.ArrayList;
import java.util.List;
/**
* mqtt json
* @author chenrj
*
*/
@Data
@NoArgsConstructor
@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
public class XRsasnDev {
private List<XRsasnTag> tagList = new ArrayList<>();
private Object d;
private String dev ;
}

View File

@ -0,0 +1,23 @@
package com.lp.bean;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.codehaus.jackson.map.annotate.JsonSerialize;
/**
* mqtt json
* @author chenrj
*
*/
@Data
@NoArgsConstructor
@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
public class XRsasnTag {
private String m ;
private Long ts ;
private Float v ;
}

View File

@ -0,0 +1,28 @@
package com.lp.bean;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.codehaus.jackson.map.annotate.JsonSerialize;
import java.util.ArrayList;
import java.util.List;
/**
* mqtt json
* @author chenrj
*
*/
@Data
@NoArgsConstructor
@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
public class XRxasnProtocolMqtt {
private List<XRsasnDev> devList = new ArrayList<>();
private Object devs;
private String pKey ;
private String sn ;
private Long ts ;
private String ver ;
}

View File

@ -0,0 +1,29 @@
package com.lp.bean;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.codehaus.jackson.map.annotate.JsonSerialize;
/**
* mqtt json
* @author chenrj
*
*/
@Data
@NoArgsConstructor
@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
public class XRxasnTagWriteMqtt {
// 系统 ID可自定义
// 如果无需记录控制命令可视
// 为无效参数
private String sysid;
// 设备名称
private String dev ;
// 标签名称
private String m ;
private Float v ;
}

View File

@ -0,0 +1,38 @@
package com.lp.bean;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.codehaus.jackson.map.annotate.JsonSerialize;
import java.util.ArrayList;
import java.util.List;
/**
* mqtt json
* @author chenrj
*
*/
@Data
@NoArgsConstructor
@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
public class XRxasnWriteMqtt {
// 产品系列编号
private String pKey ;
// 数据帧序号
private Integer seq ;
// 包类型 功能码,指令类型
private String type ;
private XRxasnTagWriteMqtt data ;
// 网关编号
private String sn ;
// 绝对时间
private Long ts ;
// ver:数据格式版本协议版本
private String ver ;
}

View File

@ -36,12 +36,12 @@ public class ProCache extends ResultMapUtils {
taskExecutor.execute(new DictionaryThread(baseDao)); taskExecutor.execute(new DictionaryThread(baseDao));
// 用户缓存 // 用户缓存
taskExecutor.execute(new UserInfoThread(baseDao)); taskExecutor.execute(new UserInfoThread(baseDao));
// 网关缓存
taskExecutor.execute(new IotNodeInfoThread(baseDao));
// 传感器缓存 // 传感器缓存
taskExecutor.execute(new IotSensorInfoThread(baseDao)); taskExecutor.execute(new IotSensorInfoThread(baseDao));
// LPM 中间件缓存 // LPM 中间件缓存
// taskExecutor.execute(new LpmInfoThread(baseDao)); // taskExecutor.execute(new LpmInfoThread(baseDao));
// 网关缓存
taskExecutor.execute(new IotNodeInfoThread(baseDao));
// 传感器触发列表缓存 // 传感器触发列表缓存
taskExecutor.execute(new IotSensorTriggerThread(baseDao)); taskExecutor.execute(new IotSensorTriggerThread(baseDao));
// 场景缓存 // 场景缓存
@ -245,13 +245,20 @@ public class ProCache extends ResultMapUtils {
// 所有传感器的包含配置和数据 // 所有传感器的包含配置和数据
objt.setData_type(-1); objt.setData_type(-1);
List<IotSensorInfoBO> iotSensorInfoList = baseDao.selectList("IotSensorInfo.select", objt); List<IotSensorInfoBO> iotSensorInfoList = baseDao.selectList("IotSensorInfo.select", objt);
if( ObjectUtil.isNotEmpty(iotSensorInfoList) ){ if( ObjectUtil.isNotEmpty(iotSensorInfoList) ){
for(IotSensorInfoBO obj: iotSensorInfoList){ for(IotSensorInfoBO obj: iotSensorInfoList){
IotNodeInfoBO nobj = new IotNodeInfoBO(obj.getNode_id());
nobj = ProCacheUtil.getCache(CacheName.NODEINFO, obj.getNode_id().toString(), nobj);
ProCacheUtil.addCache(CacheName.SENSORINFO, obj.getId()+"", obj); ProCacheUtil.addCache(CacheName.SENSORINFO, obj.getId()+"", obj);
// LogUtil4j.debugLogger.debug("add cache {}---{}", CacheName.SENSORINFO_NSP // LogUtil4j.debugLogger.debug("add cache {}---{}", CacheName.SENSORINFO_NSP
// , obj.getNode_id()+"-"+obj.getSensor_device_id()+"-"+obj.getPort_id()); // , obj.getNode_id()+"-"+obj.getSensor_device_id()+"-"+obj.getPort_id());
ProCacheUtil.addCache(CacheName.SENSORINFO_NSP, String port_id = (ObjectUtil.isNotEmpty(nobj) && nobj.getIot_protocal_category().equalsIgnoreCase("ProtocalXRxasn")) ? "" : "-"+obj.getPort_id();
obj.getNode_id()+"-"+obj.getSensor_device_id()+"-"+obj.getPort_id(), obj); ProCacheUtil.addCache(CacheName.SENSORINFO_NSP,
obj.getNode_id()+"-"+obj.getSensor_device_id()+port_id, obj);
if(obj.getNode_id() == 22) {
System.out.println("aaa");
}
} }
} }
} }

View File

@ -92,14 +92,15 @@ public class IotSensorInfoController extends BaseController {
obj.setMtime(new Date()); obj.setMtime(new Date());
resultMap = service.insert("IotSensorInfo.insert", obj) ; resultMap = service.insert("IotSensorInfo.insert", obj) ;
if(isOk(resultMap)){ if(isOk(resultMap)){
IotNodeInfoBO nodeDt = ProCacheUtil.getCache(CacheName.NODEINFO, obj.getNode_id().toString());
ProCacheUtil.addCache(CacheName.SENSORINFO, obj.getId().toString(), obj); ProCacheUtil.addCache(CacheName.SENSORINFO, obj.getId().toString(), obj);
// LogUtil4j.debugLogger.debug("add cache {}---{}", CacheName.SENSORINFO_NSP // LogUtil4j.debugLogger.debug("add cache {}---{}", CacheName.SENSORINFO_NSP
// , obj.getNode_id()+"-"+obj.getSensor_device_id()+"-"+obj.getPort_id()); // , obj.getNode_id()+"-"+obj.getSensor_device_id()+"-"+obj.getPort_id());
ProCacheUtil.addCache(CacheName.SENSORINFO_NSP, obj.getNode_id()+"-"+obj.getSensor_device_id()+"-"+obj.getPort_id(), obj); String port_id = (ObjectUtil.isNotEmpty(nodeDt) && nodeDt.getIot_protocal_category().equalsIgnoreCase("ProtocalXRxasn")) ? "" : "-"+obj.getPort_id();
ProCacheUtil.addCache(CacheName.SENSORINFO_NSP, obj.getNode_id()+"-"+obj.getSensor_device_id()+port_id, obj);
// 这边获取到网关的缓存信息,如果是modbus设备则更新下发的缓存 // 这边获取到网关的缓存信息,如果是modbus设备则更新下发的缓存
IotNodeInfoBO nodeDt = ProCacheUtil.getCache(CacheName.NODEINFO, obj.getNode_id().toString());
if(ObjectUtil.isNotEmpty(nodeDt)){ if(ObjectUtil.isNotEmpty(nodeDt)){
if( nodeDt.getIot_node_type()+0 == 83 && nodeDt.getIot_protocal_category().contains("ProtocalModbus") ){ if( nodeDt.getIot_node_type()+0 == 83 && nodeDt.getIot_protocal_category().contains("ProtocalModbus") ){
// 修改网关信息后,则设置设备重新连接 TCP协议 // 修改网关信息后,则设置设备重新连接 TCP协议
@ -247,6 +248,7 @@ public class IotSensorInfoController extends BaseController {
tmp.setSdata(obj.getSdata()); tmp.setSdata(obj.getSdata());
tmp.setRequest_sdata(obj.getRequest_sdata()); tmp.setRequest_sdata(obj.getRequest_sdata());
tmp.setUser_id(user.getId()); tmp.setUser_id(user.getId());
tmp.setData_type(-1);
// 发送消息给LPM // 发送消息给LPM
if( ProtocolUtil.sendControlSensorCommand(obj) >-1){ if( ProtocolUtil.sendControlSensorCommand(obj) >-1){
resultMap = service.update("IotSensorInfo.update", tmp) ; resultMap = service.update("IotSensorInfo.update", tmp) ;
@ -314,6 +316,7 @@ public class IotSensorInfoController extends BaseController {
tmp.setData_type(-1); tmp.setData_type(-1);
tmp.setMtime(new Date()); tmp.setMtime(new Date());
// 发送消息给LPM // 发送消息给LPM
obj.setData_type(-1);
if( ProtocolUtil.sendSensorParamDown(obj) >-1){ if( ProtocolUtil.sendSensorParamDown(obj) >-1){
ProCacheUtil.addCache(CacheName.SENSOR_PARAM_SETTING, obj.getId()+"", tmp ); ProCacheUtil.addCache(CacheName.SENSOR_PARAM_SETTING, obj.getId()+"", tmp );
}else{ }else{
@ -373,8 +376,6 @@ public class IotSensorInfoController extends BaseController {
resultMap = service.update("IotSensorInfo.update", obj) ; resultMap = service.update("IotSensorInfo.update", obj) ;
if(isOk(resultMap)){ if(isOk(resultMap)){
ProCacheUtil.addCache(CacheName.SENSORINFO, obj.getId().toString(), obj); ProCacheUtil.addCache(CacheName.SENSORINFO, obj.getId().toString(), obj);
// LogUtil4j.debugLogger.debug("add cache {}---{}", CacheName.SENSORINFO_NSP
// , obj.getNode_id()+"-"+obj.getSensor_device_id()+"-"+obj.getPort_id());
ProCacheUtil.addCache(CacheName.SENSORINFO_NSP, obj.getNode_id()+"-"+obj.getSensor_device_id()+"-"+obj.getPort_id(), obj); ProCacheUtil.addCache(CacheName.SENSORINFO_NSP, obj.getNode_id()+"-"+obj.getSensor_device_id()+"-"+obj.getPort_id(), obj);
// 这边获取到网关的缓存信息,如果是modbus设备则更新下发的缓存 // 这边获取到网关的缓存信息,如果是modbus设备则更新下发的缓存

View File

@ -0,0 +1,232 @@
package com.lp.mqtt.protocol;
import com.alibaba.fastjson.JSON;
import com.lp.bean.*;
import com.lp.bo.IotNodeInfoBO;
import com.lp.bo.IotSensorInfoBO;
import com.lp.cache.CacheName;
import com.lp.cache.ProCacheUtil;
import com.lp.common.CodeIot;
import com.lp.mqtt.MqttService;
import com.lp.service.impl.IotNodeInfoServerImpl;
import com.lp.service.impl.IotSensorInfoServiceImpl;
import com.lp.util.LogUtil4j;
import com.lp.util.ObjectUtil;
import com.lp.util.SpringApplicationContext;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.Array;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Random;
import static com.lp.util.CommonUtil.UUIDString.getUUIDString;
//接收mqtt消息
public class ProtocalXRxasn implements Iprotocal {
protected final static Logger LOGGER = LoggerFactory.getLogger(ProtocalXRxasn.class);
@Override
public void analysisData(String topic, byte[] data, String msg) {
// LogUtil4j.debugLogger.debug("analysisData get {}---{}---{}", topic, data, msg);
// 这边可以保证主题都是 /dev/coo/device_id
msg =msg.replaceAll("\r|\n", "");
// 交互的
List<IotSensorInfoBO> list = new ArrayList<>();
XRxasnProtocolMqtt xRxasnProtocolMqtt = JSON.parseObject(msg, XRxasnProtocolMqtt.class);
List<XRsasnDev> xRsasnDevs = JSON.parseArray(xRxasnProtocolMqtt.getDevs().toString(), XRsasnDev.class);
xRsasnDevs.forEach(dev -> {
dev.getDev();
List<XRsasnTag> xRsasnTags = JSON.parseArray(dev.getD().toString(), XRsasnTag.class);
xRsasnTags.forEach(tag -> {
IotSensorInfoBO tp = new IotSensorInfoBO();
tp.setSensor_device_id(dev.getDev() + "." + tag.getM());
// tp.setPort_id(0);
tp.setSdata(tag.getV());
list.add(tp);
// LogUtil4j.debugLogger.debug("get sensor {} from topic {}", dev.getDev() + "." + tag.getM(), topic);
});
});
String deviceCode = xRxasnProtocolMqtt.getSn();
IotNodeInfoBO nodeInfo = ProCacheUtil.getCache(CacheName.NODEINFO_DEVICECODE, deviceCode );
if(ObjectUtil.isEmpty(nodeInfo)){
LogUtil4j.debugLogger.warn("get no nodeInfo with {}", deviceCode);
return ;
}
if(nodeInfo.getIot_node_status() != CodeIot.DEVICE_STATUS.ONLINE){
IotNodeInfoBO nodB = new IotNodeInfoBO() ;
nodB.setDevice_code(deviceCode);
nodB.setIot_node_status(CodeIot.DEVICE_STATUS.ONLINE);
IotNodeInfoServerImpl nodeTmp = SpringApplicationContext.getBeanType("iotNodeInfoServerImpl") ;
nodeTmp.updateNodeStatus(nodB);
}
// LogUtil4j.debugLogger.debug("get IotSensorInfoBO list {}", list.toString());
for(IotSensorInfoBO sensorInfo : list){
sensorInfo.setDevice_code(deviceCode);
sensorInfo.setRequest_sdata(sensorInfo.getSdata());
// if(sensorInfo.getPort_id() == null){
// sensorInfo.setPort_id( Integer.parseInt(sensorInfo.getSensor_device_id()) );
// }
if(ObjectUtil.isEmpty(sensorInfo.getMtime())){
sensorInfo.setMtime(new Date());
}
// LogUtil4j.debugLogger.debug("get IotSensorInfoBO from list {}", sensorInfo.toString());
// 传感器处理
IotSensorInfoServiceImpl app = (IotSensorInfoServiceImpl) SpringApplicationContext.getBean("iotSensorInfoServiceImpl") ;
app.updateRealTimeData(sensorInfo) ;
}
// 下发通知消息
IotNodeInfoBO iotNodeInfo = ProCacheUtil.getCache(CacheName.NODEINFO_DEVICECODE, deviceCode);
LogUtil4j.debugLogger.debug("sending update to web for {}", ObjectUtil.isNotEmpty(iotNodeInfo) ? iotNodeInfo.getScene_id() : null);
// websocket 发送消息
if(ObjectUtil.isNotEmpty(iotNodeInfo)){
String scene_id = ProCacheUtil.getCache(CacheName.SCENE_IPDATE_FLAG , "scene_" + iotNodeInfo.getScene_id() );
if(ObjectUtil.isNotEmpty(scene_id)){
MqttService.pubMessage( "1", "/scene/update/"+ iotNodeInfo.getScene_id() );
}
String node_id = ProCacheUtil.getCache(CacheName.SCENE_IPDATE_FLAG , "node_" + iotNodeInfo.getId() );
if(ObjectUtil.isNotEmpty(node_id)){
MqttService.pubMessage( "1", "/node/update/"+ iotNodeInfo.getId() );
}
}
}
@Override
public void handbert(String topic) {
// TODO Auto-generated method stub
}
@Override
public void loginProtocal(Object obj) {
// TODO Auto-generated method stub
IotNodeInfoServerImpl tmp = SpringApplicationContext.getBeanType("iotNodeInfoServerImpl") ;
tmp.updateNodeStatus(((IotNodeInfoBO) obj));
}
@Override
public void logout(Object obj) {
// TODO Auto-generated method stub
IotNodeInfoServerImpl tmp = SpringApplicationContext.getBeanType("iotNodeInfoServerImpl") ;
tmp.updateNodeStatus(((IotNodeInfoBO) obj));
}
@Override
public Integer execServerControll(IotSensorInfoBO sensor, IotNodeInfoBO node) {
if(node.getIot_node_status() == CodeIot.DEVICE_STATUS.ONLINE+0){
String[] split = sensor.getSensor_device_id().split("\\.", 2);
String value = null;
if(sensor.getSdata_degree() == null || sensor.getSdata_degree() == 0 ){
value = String.valueOf(sensor.getRequest_sdata().intValue());
} else value = String.valueOf(sensor.getRequest_sdata());
String msg = "{\n" +
"\"ver\":\"v2.0.0\",\n" +
"\"pKey\":\"iotadmin\",\n" +
"\"sn\":\""+node.getDevice_code()+"\",\n" +
"\"seq\":"+(new Random()).nextInt(65535)+",\n" +
"\"type\":\"cmd/set\",\n" +
"\"ts\":"+(new Date()).getTime()+",\n" +
"\"data\":{\n" +
"\"sysid\":\""+getUUIDString()+"\",\n" +
"\"dev\":\""+split[0]+"\",\n" +
"\"m\":\""+split[1]+"\",\n" +
"\"v\":"+value+"\n" +
"}\n" +
"}";
// LogUtil4j.debugLogger.debug("sending {} to {}", msg, node.getDevice_code());
MqttService.pubMessage(msg,
"/cloud" + "/iotadmin" + "/" + node.getDevice_code() + "/cmd/set" );
return 0;
}else{
return -1 ;
}
}
@Override
public Integer execServerParamWrite(IotSensorInfoBO sensor, IotNodeInfoBO node) {
// LogUtil4j.debugLogger.debug("{} to {}---{}---{}", sensor.getSensor_device_id(), sensor.getSdata(), node.getIot_node_status(), sensor.getSdata_degree());
if(node.getIot_node_status() == CodeIot.DEVICE_STATUS.ONLINE+0){
String[] split = sensor.getSensor_device_id().split("\\.", 2);
String value = null;
if(sensor.getSdata_degree() == null || sensor.getSdata_degree() == 0 ){
value = String.valueOf(sensor.getRequest_sdata().intValue());
} else value = String.valueOf(sensor.getRequest_sdata());
String msg = "{\n" +
"\"ver\":\"v2.0.0\",\n" +
"\"pKey\":\"iotadmin\",\n" +
"\"sn\":\""+node.getDevice_code()+"\",\n" +
"\"seq\":"+(new Random()).nextInt(65535)+",\n" +
"\"type\":\"cmd/set\",\n" +
"\"ts\":"+(new Date()).getTime()+",\n" +
"\"data\":{\n" +
"\"sysid\":\""+getUUIDString()+"\",\n" +
"\"dev\":\""+split[0]+"\",\n" +
"\"m\":\""+split[1]+"\",\n" +
"\"v\":"+value+"\n" +
"}\n" +
"}";
// XRxasnWriteMqtt xRxasnWriteMqtt = new XRxasnWriteMqtt();
// xRxasnWriteMqtt.setVer("v2.0.0");
// xRxasnWriteMqtt.setPKey("/iotadmin");
// xRxasnWriteMqtt.setSeq((new Random()).nextInt(65535));
// xRxasnWriteMqtt.setSn(node.getDevice_code());
// xRxasnWriteMqtt.setTs((new Date()).getTime());
// xRxasnWriteMqtt.setType("cmd/set");
//
// XRxasnTagWriteMqtt xRxasnTagWriteMqtt = new XRxasnTagWriteMqtt();
// xRxasnTagWriteMqtt.setSysid(getUUIDString());
// xRxasnTagWriteMqtt.setDev(split[0]);
// xRxasnTagWriteMqtt.setM(split[1]);
// xRxasnTagWriteMqtt.setV(sensor.getRequest_sdata());
// xRxasnWriteMqtt.setData(xRxasnTagWriteMqtt);
// LogUtil4j.debugLogger.debug("sending {} to {}", msg, node.getDevice_code());
MqttService.pubMessage(msg,
"/cloud" + "/iotadmin" + "/" + node.getDevice_code() + "/cmd/set" );
return 0 ;
}else{
return -1 ;
}
}
@Override
public Integer execServerParamRead(IotSensorInfoBO sensor, IotNodeInfoBO node) {
if(node.getIot_node_status() == CodeIot.DEVICE_STATUS.ONLINE+0){
String[] split = sensor.getSensor_device_id().split("\\.", 2);
String value = null;
if(sensor.getSdata_degree() == null || sensor.getSdata_degree() == 0 ){
value = String.valueOf(sensor.getRequest_sdata().intValue());
} else value = String.valueOf(sensor.getRequest_sdata());
String msg = "{\n" +
"\"ver\":\"v2.0.0\",\n" +
"\"pKey\":\"iotadmin\",\n" +
"\"sn\":\""+node.getDevice_code()+"\",\n" +
"\"seq\":"+(new Random()).nextInt(65535)+",\n" +
"\"type\":\"cmd/set\",\n" +
"\"ts\":"+(new Date()).getTime()+",\n" +
"\"data\":{\n" +
"\"sysid\":\""+getUUIDString()+"\",\n" +
"\"dev\":\""+split[0]+"\",\n" +
"\"m\":\""+split[1]+"\",\n" +
"\"v\":"+value+"\n" +
"}\n" +
"}";
// LogUtil4j.debugLogger.debug("sending {} to {}", msg, node.getDevice_code());
MqttService.pubMessage(msg,
"/cloud" + "/iotadmin" + "/" + node.getDevice_code() + "/cmd/set" );
return 0 ;
}else{
return -1 ;
}
}
}

View File

@ -1,157 +0,0 @@
package com.lp.mqtt.protocol;
import com.alibaba.fastjson.JSON;
import com.lp.bean.SimpleProtocolMqtt;
import com.lp.bo.IotNodeInfoBO;
import com.lp.bo.IotSensorInfoBO;
import com.lp.cache.CacheName;
import com.lp.cache.ProCacheUtil;
import com.lp.common.CodeIot;
import com.lp.mqtt.MqttService;
import com.lp.service.impl.IotNodeInfoServerImpl;
import com.lp.service.impl.IotSensorInfoServiceImpl;
import com.lp.util.LogUtil4j;
import com.lp.util.ObjectUtil;
import com.lp.util.SpringApplicationContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
//接收mqtt消息
public class ProtocalXinaoV1 implements Iprotocal {
protected final static Logger LOGGER = LoggerFactory.getLogger(ProtocalXinaoV1.class);
@Override
public void analysisData(String topic, byte[] data, String msg) {
LogUtil4j.debugLogger.debug("analysisData get {}---{}---{}", topic, data, msg);
// 这边可以保证主题都是 /dev/coo/device_id
msg =msg.replaceAll("\r|\n", "");
// 交互的
List<IotSensorInfoBO> list = new ArrayList<>();
// 判断mqtt json 标识,兼容多种格式
if(msg.contains("sid")){
List<SimpleProtocolMqtt> tmpList = JSON.parseArray(msg, SimpleProtocolMqtt.class);
// 对象转换
for(SimpleProtocolMqtt dt : tmpList ){
LogUtil4j.debugLogger.debug("analysisData msg from json {}", dt.toString());
IotSensorInfoBO tp = new IotSensorInfoBO();
tp.setSensor_device_id(dt.getSid());
tp.setPort_id(dt.getPid());
tp.setSdata(dt.getDat());
list.add(tp);
}
}else{
list = JSON.parseArray(msg, IotSensorInfoBO.class);
}
String[] tmp = topic.split("/");
String deviceCode = tmp[tmp.length -1];
IotNodeInfoBO nodeInfo = ProCacheUtil.getCache(CacheName.NODEINFO_DEVICECODE, deviceCode );
LogUtil4j.debugLogger.debug("get nodeInfo with {}---{}", deviceCode, nodeInfo.toString());
if(ObjectUtil.isEmpty(nodeInfo)){
return ;
}
if(nodeInfo.getIot_node_status() != CodeIot.DEVICE_STATUS.ONLINE){
IotNodeInfoBO nodB = new IotNodeInfoBO() ;
nodB.setDevice_code(deviceCode);
nodB.setIot_node_status(CodeIot.DEVICE_STATUS.ONLINE);
IotNodeInfoServerImpl nodeTmp = SpringApplicationContext.getBeanType("iotNodeInfoServerImpl") ;
nodeTmp.updateNodeStatus(nodB);
}
LogUtil4j.debugLogger.debug("get IotSensorInfoBO list {}", list.toString());
for(IotSensorInfoBO sensorInfo : list){
sensorInfo.setDevice_code(deviceCode);
sensorInfo.setRequest_sdata(sensorInfo.getSdata());
if(sensorInfo.getPort_id() == null){
sensorInfo.setPort_id( Integer.parseInt(sensorInfo.getSensor_device_id()) );
}
if(ObjectUtil.isEmpty(sensorInfo.getMtime())){
sensorInfo.setMtime(new Date());
}
LogUtil4j.debugLogger.debug("get IotSensorInfoBO from list {}", sensorInfo.toString());
// 传感器处理
IotSensorInfoServiceImpl app = (IotSensorInfoServiceImpl) SpringApplicationContext.getBean("iotSensorInfoServiceImpl") ;
app.updateRealTimeData(sensorInfo) ;
}
// 下发通知消息
IotNodeInfoBO iotNodeInfo = ProCacheUtil.getCache(CacheName.NODEINFO_DEVICECODE, deviceCode);
// websocket 发送消息
if(ObjectUtil.isNotEmpty(iotNodeInfo)){
String scene_id = ProCacheUtil.getCache(CacheName.SCENE_IPDATE_FLAG , "scene_" + iotNodeInfo.getScene_id() );
if(ObjectUtil.isNotEmpty(scene_id)){
MqttService.pubMessage( "1", "/scene/update/"+ iotNodeInfo.getScene_id() );
}
String node_id = ProCacheUtil.getCache(CacheName.SCENE_IPDATE_FLAG , "node_" + iotNodeInfo.getId() );
if(ObjectUtil.isNotEmpty(node_id)){
MqttService.pubMessage( "1", "/node/update/"+ iotNodeInfo.getId() );
}
}
}
@Override
public void handbert(String topic) {
// TODO Auto-generated method stub
}
@Override
public void loginProtocal(Object obj) {
// TODO Auto-generated method stub
IotNodeInfoServerImpl tmp = SpringApplicationContext.getBeanType("iotNodeInfoServerImpl") ;
tmp.updateNodeStatus(((IotNodeInfoBO) obj));
}
@Override
public void logout(Object obj) {
// TODO Auto-generated method stub
IotNodeInfoServerImpl tmp = SpringApplicationContext.getBeanType("iotNodeInfoServerImpl") ;
tmp.updateNodeStatus(((IotNodeInfoBO) obj));
}
@Override
public Integer execServerControll(IotSensorInfoBO sensor, IotNodeInfoBO node) {
if(node.getIot_node_status() == CodeIot.DEVICE_STATUS.ONLINE+0){
MqttService.pubMessage("{\"sensor_device_id\":"+ sensor.getSensor_device_id()
+",\"port_id\":"+sensor.getPort_id()+",\"sdata\":"+sensor.getRequest_sdata() +"}",
"/server/coo/" + node.getDevice_code() );
return 0 ;
}else{
return -1 ;
}
}
@Override
public Integer execServerParamWrite(IotSensorInfoBO sensor, IotNodeInfoBO node) {
if(node.getIot_node_status() == CodeIot.DEVICE_STATUS.ONLINE+0){
MqttService.pubMessage("{\"sensor_device_id\":"+ sensor.getSensor_device_id()
+",\"port_id\":"+sensor.getPort_id()+",\"sdata\":"+sensor.getRequest_sdata() +"}",
"/server/coo/" + node.getDevice_code() );
return 0 ;
}else{
return -1 ;
}
}
@Override
public Integer execServerParamRead(IotSensorInfoBO sensorInfo, IotNodeInfoBO node) {
if(node.getIot_node_status() == CodeIot.DEVICE_STATUS.ONLINE+0){
MqttService.pubMessage("{\"sensor_device_id\":"+ sensorInfo.getSensor_device_id()
+",\"port_id\":"+sensorInfo.getPort_id() +"}",
"/server/coo/" + node.getDevice_code() );
return 0;
}else{
return -1 ;
}
}
}

View File

@ -32,8 +32,8 @@ public class IotSensorInfoServiceImpl extends BaseServiceImpl implements IotSens
@Override @Override
public Map<String, Object> updateRealTimeData(IotSensorInfoBO obj) { public Map<String, Object> updateRealTimeData(IotSensorInfoBO obj) {
LogUtil4j.debugLogger.debug("updateRealTimeData is called with ({}---{}---{})" // LogUtil4j.debugLogger.debug("updateRealTimeData is called with ({}---{}---{})"
, obj.getDevice_code() + "-" + obj.getSensor_device_id(), obj.getPort_id(), obj.getRequest_sdata()); // , obj.getDevice_code() + "-" + obj.getSensor_device_id(), obj.getPort_id(), obj.getRequest_sdata());
Map<String, Object> resultMap = getResultMap(); Map<String, Object> resultMap = getResultMap();
try{ try{
// 通过网关缓存 deviceCode -> nodeInfo -> id (or nodeId) // 通过网关缓存 deviceCode -> nodeInfo -> id (or nodeId)
@ -70,7 +70,8 @@ public class IotSensorInfoServiceImpl extends BaseServiceImpl implements IotSens
obj.setNode_id( iotNodeInfo.getId() ); obj.setNode_id( iotNodeInfo.getId() );
// LogUtil4j.debugLogger.debug("get sensorinfo with key: {}", obj.getNode_id()+"-"+obj.getSensor_device_id()+"-"+obj.getPort_id()); // LogUtil4j.debugLogger.debug("get sensorinfo with key: {}", obj.getNode_id()+"-"+obj.getSensor_device_id()+"-"+obj.getPort_id());
// 获取传感器缓存信息 // 获取传感器缓存信息
IotSensorInfoBO sensorInfo = ProCacheUtil.getCache(CacheName.SENSORINFO_NSP, obj.getNode_id()+"-"+obj.getSensor_device_id()+"-"+obj.getPort_id(), obj); String port_id = ObjectUtil.isEmpty(obj.getPort_id()) ? "" : "-" + obj.getPort_id();
IotSensorInfoBO sensorInfo = ProCacheUtil.getCache(CacheName.SENSORINFO_NSP, obj.getNode_id()+"-"+obj.getSensor_device_id()+port_id, obj);
if( ObjectUtil.isEmpty(sensorInfo) ){ if( ObjectUtil.isEmpty(sensorInfo) ){
LOGGER.warn("updateRealTimeData is called with {} no IotSensorInfoBO", obj.getNode_id()+"-"+obj.getSensor_device_id()+"-"+obj.getPort_id()); LOGGER.warn("updateRealTimeData is called with {} no IotSensorInfoBO", obj.getNode_id()+"-"+obj.getSensor_device_id()+"-"+obj.getPort_id());
@ -101,10 +102,8 @@ public class IotSensorInfoServiceImpl extends BaseServiceImpl implements IotSens
obj.setSdata(ft); obj.setSdata(ft);
obj.setRequest_sdata(obj.getSdata()); obj.setRequest_sdata(obj.getSdata());
} }
// LogUtil4j.debugLogger.debug("updateRealTimeData is called with {} with {}", obj.getDevice_code(), sensorInfo.getSensor_device_id());
//*** 20190405 如果是配置数据,则直接更新数据库,并更新缓存,数据不进入历史表里面 //*** 20190405 如果是配置数据,则直接更新数据库,并更新缓存,数据不进入历史表里面
if( sensorInfo.getData_type() == 1 ){ if( sensorInfo.getData_type() == 1 ){
// 配置数据 // 配置数据
Integer num = dao.update("IotSensorInfo.updateRealTimeData", obj); Integer num = dao.update("IotSensorInfo.updateRealTimeData", obj);
if( num >0){ if( num >0){

View File

@ -9,6 +9,7 @@ import com.lp.common.CodeIot;
import com.lp.mqtt.MqttService; import com.lp.mqtt.MqttService;
import com.lp.mqtt.protocol.ProtocalFactory; import com.lp.mqtt.protocol.ProtocalFactory;
import com.lp.util.Calculator; import com.lp.util.Calculator;
import com.lp.util.LogUtil4j;
import com.lp.util.ObjectUtil; import com.lp.util.ObjectUtil;
@ -25,6 +26,7 @@ public class ProtocolUtil {
* @return * @return
*/ */
public static Integer sendControlSensorCommand(IotSensorInfoBO obj ){ public static Integer sendControlSensorCommand(IotSensorInfoBO obj ){
LogUtil4j.debugLogger.debug("called with {}", obj);
//IOT_SERVER_LPM:TYPE,deviceCode,SENSOR_DEVICE_ID,PORT_ID,DATA,FORMULATE //IOT_SERVER_LPM:TYPE,deviceCode,SENSOR_DEVICE_ID,PORT_ID,DATA,FORMULATE
StringBuffer strBuffer = new StringBuffer(); StringBuffer strBuffer = new StringBuffer();
@ -41,7 +43,7 @@ public class ProtocolUtil {
obj.setSensor_device_id(sensorInfo.getSensor_device_id()); obj.setSensor_device_id(sensorInfo.getSensor_device_id());
obj.setPort_id(sensorInfo.getPort_id()); obj.setPort_id(sensorInfo.getPort_id());
obj.setSdata_degree(sensorInfo.getSdata_degree());
return ProtocalFactory.getInstance(iotNodeInfo.getIot_protocal_category()).execServerControll(obj, iotNodeInfo); return ProtocalFactory.getInstance(iotNodeInfo.getIot_protocal_category()).execServerControll(obj, iotNodeInfo);
}else if(iotNodeInfo.getIot_node_type() == CodeIot.IOT_NODE_STATUS.TCP){ }else if(iotNodeInfo.getIot_node_type() == CodeIot.IOT_NODE_STATUS.TCP){
@ -83,6 +85,7 @@ public class ProtocolUtil {
// 设备重启命令 // 设备重启命令
public static Integer sendGatewayRestart(IotNodeInfoBO obj){ public static Integer sendGatewayRestart(IotNodeInfoBO obj){
LogUtil4j.debugLogger.debug("called with {}", obj);
StringBuffer strBuffer = new StringBuffer(); StringBuffer strBuffer = new StringBuffer();
if(obj.getIot_node_type() == CodeIot.IOT_NODE_STATUS.MQTT){ if(obj.getIot_node_type() == CodeIot.IOT_NODE_STATUS.MQTT){
return 0 ; return 0 ;
@ -115,6 +118,7 @@ public class ProtocolUtil {
* @return * @return
*/ */
public static Integer sendSensorParamRead(IotSensorInfoBO obj ){ public static Integer sendSensorParamRead(IotSensorInfoBO obj ){
// LogUtil4j.debugLogger.debug("called with {}", obj);
//IOT_SERVER_LPM:TYPE,deviceCode,param_type //IOT_SERVER_LPM:TYPE,deviceCode,param_type
// TYPE = param_read // TYPE = param_read
StringBuffer strBuffer = new StringBuffer(); StringBuffer strBuffer = new StringBuffer();
@ -125,7 +129,7 @@ public class ProtocolUtil {
if(iotNodeInfo.getIot_node_type() == CodeIot.IOT_NODE_STATUS.MQTT){ if(iotNodeInfo.getIot_node_type() == CodeIot.IOT_NODE_STATUS.MQTT){
obj.setSensor_device_id(sensorInfo.getSensor_device_id()); obj.setSensor_device_id(sensorInfo.getSensor_device_id());
obj.setPort_id(sensorInfo.getPort_id()); obj.setPort_id(sensorInfo.getPort_id());
obj.setSdata_degree(sensorInfo.getSdata_degree());
return ProtocalFactory.getInstance(iotNodeInfo.getIot_protocal_category()).execServerParamRead(obj, iotNodeInfo); return ProtocalFactory.getInstance(iotNodeInfo.getIot_protocal_category()).execServerParamRead(obj, iotNodeInfo);
}else if(iotNodeInfo.getIot_node_type() == CodeIot.IOT_NODE_STATUS.TCP ){ }else if(iotNodeInfo.getIot_node_type() == CodeIot.IOT_NODE_STATUS.TCP ){
// tcp // tcp
@ -157,6 +161,7 @@ public class ProtocolUtil {
* @return * @return
*/ */
public static Integer sendSensorParamDown(IotSensorInfoBO obj ){ public static Integer sendSensorParamDown(IotSensorInfoBO obj ){
// LogUtil4j.debugLogger.debug("called with {}---{}", obj.getId(), obj.getSdata());
//IOT_SERVER_LPM:TYPE,deviceCode,param_type:values //IOT_SERVER_LPM:TYPE,deviceCode,param_type:values
// TYPE = param_write // TYPE = param_write
// //
@ -165,6 +170,7 @@ public class ProtocolUtil {
IotSensorInfo sensorInfo = ProCacheUtil.getCache(CacheName.SENSORINFO, obj.getId().toString() ); IotSensorInfo sensorInfo = ProCacheUtil.getCache(CacheName.SENSORINFO, obj.getId().toString() );
IotNodeInfoBO iotNodeInfo = ProCacheUtil.getCache(CacheName.NODEINFO, sensorInfo.getNode_id().toString() ); IotNodeInfoBO iotNodeInfo = ProCacheUtil.getCache(CacheName.NODEINFO, sensorInfo.getNode_id().toString() );
// LogUtil4j.debugLogger.debug("sensor is {} and node is {}---{}---{}", sensorInfo.getNode_id(), iotNodeInfo.getDevice_code(), iotNodeInfo.getIot_node_type(), iotNodeInfo.getIot_protocal_category());
if(iotNodeInfo.getIot_node_type() == CodeIot.IOT_NODE_STATUS.MQTT){ if(iotNodeInfo.getIot_node_type() == CodeIot.IOT_NODE_STATUS.MQTT){
float sdata_info = obj.getSdata() ; float sdata_info = obj.getSdata() ;
@ -174,6 +180,7 @@ public class ProtocolUtil {
obj.setSensor_device_id(sensorInfo.getSensor_device_id()); obj.setSensor_device_id(sensorInfo.getSensor_device_id());
obj.setPort_id(sensorInfo.getPort_id()); obj.setPort_id(sensorInfo.getPort_id());
obj.setRequest_sdata(sdata_info); obj.setRequest_sdata(sdata_info);
obj.setSdata_degree(sensorInfo.getSdata_degree());
return ProtocalFactory.getInstance(iotNodeInfo.getIot_protocal_category()).execServerParamWrite(obj, iotNodeInfo); return ProtocalFactory.getInstance(iotNodeInfo.getIot_protocal_category()).execServerParamWrite(obj, iotNodeInfo);