Commit 39398b1d authored by 003's avatar 003

20201013 影子服务跟设备之间做状态同步

parent d8b03c68
package com.shadow.mq.nats;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.component.base.util.LocalAddressUtil;
import com.shadow.mq.nats.consumer.NatsConsumer;
import com.shadow.mq.nats.consumer.NatsMQProcess;
import com.shadow.spring.SpringUtil;
import com.shadow.util.PropsUtil;
import java.util.HashMap;
import java.util.Map;
public class NatsMQ {
public static final String T_SMS = "nats.sms";
public static final String T_CMD = "nats.dev.cmd";
public static final String T_PARSE = "nats.dev.parse";
public static final String T_STATUS = "nats.dev.status";
public static final String MQTT_SERVER_ADDRESS = "mqtt.server.address";
public static void init(){
String[] address = PropsUtil.get("nats_address").split(",");
NatsConsumer consumer = new NatsConsumer();
//订阅状态
consumer.subscribe(address,
"status_shadow", //多台使用集群消费
(NatsMQProcess)SpringUtil.getBean("deviceStatusMQ"));
//订阅命令
List<String> topics = new ArrayList<>();
topics.add(T_CMD + "." +"172.19.48.65");
topics.add(T_CMD + "." +"172.19.48.56");
consumer.subscribe(address,
"cmd_shadow", //多台使用集群消费
(NatsMQProcess)SpringUtil.getBean("deviceCmdMQ"),topics);
}
public static String getTopicCMDs(){
return T_CMD + "." + getHostName(LocalAddressUtil.getIp());
}
public static String getHostName(String ip){
StringBuffer sb = new StringBuffer();
for (int i = 0; i < ip.length(); i++) {
sb.append(map.get(String.valueOf(ip.charAt(i))));
}
return sb.toString();
}
static Map<String, String> map = new HashMap<String, String>();
static{
map.put("1", "a");
map.put("2", "b");
map.put("3", "c");
map.put("4", "d");
map.put("5", "e");
map.put("6", "f");
map.put("7", "g");
map.put("8", "h");
map.put("9", "i");
map.put("0", "o");
map.put(".", ".");
}
public static final String T_SMS = "nats.sms";
public static final String T_CMD = "nats.dev.cmd";
public static final String T_PARSE = "nats.dev.parse";
public static final String T_STATUS = "nats.dev.status";
public static final String MQTT_SERVER_ADDRESS = "mqtt.server.address";
public static void init() {
String[] address = PropsUtil.get("nats_address").split(",");
NatsConsumer consumer = new NatsConsumer();
//订阅状态
consumer.subscribe(address,
"status_shadow", //多台使用集群消费
(NatsMQProcess) SpringUtil.getBean("deviceStatusMQ"));
//订阅命令
// List<String> topics = new ArrayList<>();
// topics.add(T_CMD + "." +"172.19.48.65");
// topics.add(T_CMD + "." +"172.19.48.56");
consumer.subscribe(address,
"cmd_shadow", //多台使用集群消费
(NatsMQProcess) SpringUtil.getBean("deviceCmdMQ"));
}
public static String getTopicCMDs() {
return T_CMD + "." + getHostName(LocalAddressUtil.getIp());
}
public static String getHostName(String ip) {
StringBuffer sb = new StringBuffer();
for (int i = 0; i < ip.length(); i++) {
sb.append(map.get(String.valueOf(ip.charAt(i))));
}
return sb.toString();
}
static Map<String, String> map = new HashMap<String, String>();
static {
map.put("1", "a");
map.put("2", "b");
map.put("3", "c");
map.put("4", "d");
map.put("5", "e");
map.put("6", "f");
map.put("7", "g");
map.put("8", "h");
map.put("9", "i");
map.put("0", "o");
map.put(".", ".");
}
}
package com.shadow.mq.nats.consumer;
import com.component.base.util.StringUtil;
import io.nats.client.Connection;
import io.nats.client.ConnectionFactory;
import io.nats.client.Message;
import io.nats.client.MessageHandler;
import java.io.UnsupportedEncodingException;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.component.base.util.StringUtil;
import java.io.UnsupportedEncodingException;
public class NatsConsumer {
......@@ -55,45 +52,6 @@ public class NatsConsumer {
}
}
/**
* 订阅的方式
*
* @param address 地址url
* @param topics 主题
* @param group 如果使用集群消费,group名字一致;否则为广播消费
* @param process
*/
public void subscribe(String[] address, String group, final NatsMQProcess process, List<String> topics) {
try {
ConnectionFactory cf = new ConnectionFactory(address);
final Connection nc = cf.createConnection();
nc.flush(10 * 1000);
topics.forEach(topic -> {
logger.info(topic + " consumer start...");
if (StringUtil.isEmpty(group)) {
nc.subscribe(topic, new MessageHandler() {
@Override
public void onMessage(Message msg) {
printMessage(msg);
process.doProcess(msg);
}
});
} else {
nc.subscribe(topic, group, new MessageHandler() {
@Override
public void onMessage(Message msg) {
printMessage(msg);
process.doProcess(msg);
}
});
}
});
} catch (Exception e) {
logger.error("", e);
}
}
public void reponse(String[] address, final NatsMQProcess process) {
try {
String topic = process.getTopic();
......
......@@ -22,13 +22,15 @@ public class ServiceDevicePower implements CmdShadowService {
@Override
public void process(PublishTopicMessage bean) {
logger.info("ServiceDevicePower :" + bean.getData());
//影子服务的命令忽略
String id = bean.getId();
if ("shadowCmd".equals(id)) return;
//获取数据
JSONObject json = JSON.parseObject(bean.getData());
String deviceId = bean.getDeviceId();
String key = "desiredData" + deviceId;
//缓存数据
RedisUtil.set(key, json);
return;
}
......
......@@ -41,6 +41,7 @@ public class ServiceStatusAll implements CmdShadowService {
//比较影子的状态跟设备上报状态,不一致则下发控制命令
if (!desiredPower.equals(realPower)) {
DeviceCMDBean cmd = new DeviceCMDBean();
cmd.setId("shadowCmd");
cmd.setDevice_id(deviceId);
cmd.setData(jsonString);
cmd.setTopic(deviceId);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment