Commit be4f679a authored by 003's avatar 003

20201013 影子服务跟设备之间做状态同步

parent 39398b1d
package com.shadow.mq.nats; package com.shadow.mq.nats;
import com.baomidou.mybatisplus.core.toolkit.StringUtils;
import com.component.base.util.LocalAddressUtil; import com.component.base.util.LocalAddressUtil;
import com.shadow.mq.nats.consumer.NatsConsumer; import com.shadow.mq.nats.consumer.NatsConsumer;
import com.shadow.mq.nats.consumer.NatsMQProcess; import com.shadow.mq.nats.consumer.NatsMQProcess;
...@@ -31,6 +32,20 @@ public class NatsMQ { ...@@ -31,6 +32,20 @@ public class NatsMQ {
consumer.subscribe(address, consumer.subscribe(address,
"cmd_shadow", //多台使用集群消费 "cmd_shadow", //多台使用集群消费
(NatsMQProcess) SpringUtil.getBean("deviceCmdMQ")); (NatsMQProcess) SpringUtil.getBean("deviceCmdMQ"));
//提高性能,开多个消费者。
int size = 0;
String parseConsumerMqSize = PropsUtil.get("parseConsumerMqSize");
if(StringUtils.isEmpty(parseConsumerMqSize)){
size = 8;
}else{
size=Integer.parseInt(parseConsumerMqSize);
}
for (int i = 0; i < size; i++) {
consumer.subscribe(address,
"parse_parse", //多台使用集群消费
(NatsMQProcess)SpringUtil.getBean("deviceParseMQ"));
}
} }
public static String getTopicCMDs() { public static String getTopicCMDs() {
......
package com.shadow.mq.nats.consumer.process;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.component.base.bean.PublishTopicMessage;
import com.shadow.mq.nats.NatsMQ;
import com.shadow.mq.nats.consumer.NatsMQProcess;
import com.shadow.service.ParseService;
import com.shadow.spring.SpringUtil;
import io.nats.client.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
/**
* 设备数据解析核心
*
* @author cavin
*/
@Service
public class DeviceParseMQ implements NatsMQProcess {
private final static Logger logger = LoggerFactory
.getLogger(DeviceParseMQ.class);
@Override
public String doProcess(Message msg) {
try {
PublishTopicMessage bean = JSON.parseObject(new String(msg.getData(), "UTF-8"),
PublishTopicMessage.class);
JSONObject json = JSON.parseObject(bean.getData());
String method = json.getString("method");
String springbean = "service" + getMethod(method, bean.getDeviceId());
ParseService service = (ParseService) SpringUtil.getBean(springbean);
if (service == null) {//如果没有定义,默认使用1002的协议
logger.error("not found springbean", springbean);
return null;
}
// 做核心业务逻辑
service.process(bean);
} catch (Exception e) {
logger.error("", e);
}
return null;
}
private String getMethod(String method, String deviceId) {
String[] names = method.split("\\.");
if (names.length == 1) {
logger.error(deviceId + " not support method " + method);
return method;
}
StringBuffer sb = new StringBuffer();
for (int i = 1; i < names.length; i++) {
sb.append(names[i].substring(0, 1).toUpperCase());
sb.append(names[i].substring(1));
}
return sb.toString();
}
@Override
public String getTopic() {
return NatsMQ.T_PARSE;
}
}
package com.shadow.service;
import com.component.base.bean.PublishTopicMessage;
public interface ParseService {
public void process(PublishTopicMessage bean);
}
...@@ -6,6 +6,7 @@ import com.component.base.bean.PublishTopicMessage; ...@@ -6,6 +6,7 @@ import com.component.base.bean.PublishTopicMessage;
import com.dubbo.client.device.DeviceCMDBean; import com.dubbo.client.device.DeviceCMDBean;
import com.dubbo.client.device.DeviceCmdJsonService; import com.dubbo.client.device.DeviceCmdJsonService;
import com.shadow.service.CmdShadowService; import com.shadow.service.CmdShadowService;
import com.shadow.service.ParseService;
import com.shadow.util.RedisUtil; import com.shadow.util.RedisUtil;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -18,7 +19,7 @@ import org.springframework.stereotype.Service; ...@@ -18,7 +19,7 @@ import org.springframework.stereotype.Service;
* @author cavin * @author cavin
*/ */
@Service @Service
public class ServiceStatusAll implements CmdShadowService { public class ServiceStatusAll implements ParseService {
private static final Logger logger = LoggerFactory private static final Logger logger = LoggerFactory
.getLogger(ServiceStatusAll.class); .getLogger(ServiceStatusAll.class);
@Autowired @Autowired
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment