|
@@ -1,25 +1,24 @@
|
|
|
package com.hfln.portal.infrastructure.mqtt;
|
|
|
|
|
|
+import cn.hfln.framework.redis.util.RedisUtil;
|
|
|
import com.alibaba.fastjson2.JSON;
|
|
|
import com.alibaba.fastjson2.JSONArray;
|
|
|
import com.alibaba.fastjson2.JSONObject;
|
|
|
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
|
|
|
import com.hfln.portal.common.constant.mqtt.topic.TopicConstants;
|
|
|
+import com.hfln.portal.common.constant.redis.RedisCacheConstant;
|
|
|
import com.hfln.portal.domain.customer.util.MsgClient;
|
|
|
import com.hfln.portal.domain.customer.util.WxOfficeAccountClient;
|
|
|
-import com.hfln.portal.infrastructure.po.DevInfo;
|
|
|
-import com.hfln.portal.infrastructure.po.EventList;
|
|
|
-import com.hfln.portal.infrastructure.po.UserInfo;
|
|
|
-import com.hfln.portal.infrastructure.po.WxRelation;
|
|
|
+import com.hfln.portal.infrastructure.po.*;
|
|
|
import com.hfln.portal.infrastructure.service.*;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
import org.apache.commons.lang3.StringUtils;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
import org.springframework.messaging.Message;
|
|
|
import org.springframework.stereotype.Component;
|
|
|
+import org.springframework.util.CollectionUtils;
|
|
|
|
|
|
import java.math.BigDecimal;
|
|
|
-import java.time.LocalDateTime;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.List;
|
|
@@ -43,9 +42,6 @@ public class MqttSubHandle {
|
|
|
private EventService eventService;
|
|
|
|
|
|
@Autowired
|
|
|
- private DevTargetService devTargetService;
|
|
|
-
|
|
|
- @Autowired
|
|
|
private UserService userService;
|
|
|
|
|
|
@Autowired
|
|
@@ -57,7 +53,14 @@ public class MqttSubHandle {
|
|
|
@Autowired
|
|
|
private WxOfficeAccountClient wxOfficeAccountClient;
|
|
|
|
|
|
+ @Autowired
|
|
|
+ private RedisUtil redisUtil;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private TblTenantService tblTenantService;
|
|
|
|
|
|
+ @Autowired
|
|
|
+ private AdminUserService adminUserService;
|
|
|
|
|
|
/**
|
|
|
* MQTT消息统一入口处理方法
|
|
@@ -80,36 +83,30 @@ public class MqttSubHandle {
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- log.debug("Received device message: topic={}, payload={}", topic, payload);
|
|
|
-
|
|
|
// 根据主题路由到不同的处理方法
|
|
|
- // 提取主题的操作部分(最后一段)
|
|
|
- String action = extractActionFromTopic02(topic);
|
|
|
- if (action != null) {
|
|
|
- switch (action) {
|
|
|
- case "event":
|
|
|
- subDasEvent(topic, payload);
|
|
|
- break;
|
|
|
- case "alarm_event":
|
|
|
- subDasAlarmEvent(topic, payload);
|
|
|
- break;
|
|
|
- case "realtime_pos":
|
|
|
- subDasRealtimePos(topic, payload);
|
|
|
- break;
|
|
|
- case "dev_status":
|
|
|
- subDasDevStatus(topic, payload);
|
|
|
- break;
|
|
|
- case "exist":
|
|
|
- subDasExist(topic, payload);
|
|
|
- break;
|
|
|
- default:
|
|
|
- log.debug("Unhandled device topic action: {} for topic: {}", action, topic);
|
|
|
- break;
|
|
|
- }
|
|
|
- } else {
|
|
|
- log.debug("Could not extract action from device topic: {}", topic);
|
|
|
+ switch (topic) {
|
|
|
+ case TopicConstants.TOPIC_DAS_EVENT:
|
|
|
+ subDasEvent(topic, payload);
|
|
|
+ break;
|
|
|
+ case TopicConstants.TOPIC_DAS_ALARM_EVENT:
|
|
|
+ subDasAlarmEvent(topic, payload);
|
|
|
+ break;
|
|
|
+ case TopicConstants.TOPIC_DAS_REALTIME_POS:
|
|
|
+ subDasRealtimePos(topic, payload);
|
|
|
+ break;
|
|
|
+ case TopicConstants.TOPIC_DAS_DEV_STATUS:
|
|
|
+ subDasDevStatus(topic, payload);
|
|
|
+ break;
|
|
|
+ case TopicConstants.TOPIC_DAS_EXIST:
|
|
|
+ subDasExist(topic, payload);
|
|
|
+ break;
|
|
|
+ case TopicConstants.TOPIC_MQTT_CLIENT_CONNECT:
|
|
|
+ subMqttClientConnect(topic, payload);
|
|
|
+ break;
|
|
|
+ default:
|
|
|
+ log.debug("Unhandled device topic: {}", topic);
|
|
|
+ break;
|
|
|
}
|
|
|
-
|
|
|
} catch (Exception e) {
|
|
|
log.error("Error handling device message: {}", e.getMessage(), e);
|
|
|
}
|
|
@@ -123,9 +120,9 @@ public class MqttSubHandle {
|
|
|
*/
|
|
|
public void subDasEvent(String topic, String payload) {
|
|
|
|
|
|
+ log.info("Received device message: topic={}, payload={}", topic, payload);
|
|
|
JSONObject obj = JSONObject.parseObject(payload);
|
|
|
String clientId = obj.getString("dev_id");
|
|
|
- log.info("mqttsub, topic:{}, clientId:{}", "/das/event", clientId);
|
|
|
String event = obj.getString("event");
|
|
|
// 跌倒确认返回
|
|
|
if (event.equals("fall_confirmed")) {
|
|
@@ -159,46 +156,28 @@ public class MqttSubHandle {
|
|
|
}
|
|
|
|
|
|
String targetPointsStr = JSON.toJSONString(targetPoints);
|
|
|
- List<DevInfo> devs = devInfoService.queryByClientId(clientId);
|
|
|
- if (devs != null && devs.size() > 0) {
|
|
|
- DevInfo dev = devs.get(0);
|
|
|
- if (!targetPointsStr.equals(dev.getTargetPoints())) {
|
|
|
- // 存储跌倒事件
|
|
|
- if (messageType == 3) {
|
|
|
-
|
|
|
- event = obj.getString("event");
|
|
|
- if (event.equals("fall_confirmed")) {
|
|
|
- EventList eventListVO = new EventList();
|
|
|
- eventListVO.setDevId(dev.getDevId());
|
|
|
- eventListVO.setPose(pose);
|
|
|
- eventListVO.setIsHandle(0);
|
|
|
- eventListVO.setTargetPoints(targetPointsStr);
|
|
|
- eventListVO.setEventType(messageType);
|
|
|
- eventService.save(eventListVO);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
-/* // target_points数据入库
|
|
|
- DevTarget target = new DevTarget();
|
|
|
- target.setDevId(dev.getDevId());
|
|
|
- target.setPose(pose);
|
|
|
- target.setIsHandle(false);
|
|
|
- if (DevPosFixUtil.containsKey(clientId)) {
|
|
|
- target.setFixPose(DevPosFixUtil.get(clientId));
|
|
|
- } else {
|
|
|
- target.setFixPose((byte) -1);
|
|
|
+ DevInfo dev = devInfoService.queryByClientId(clientId);
|
|
|
+ if (dev == null) {
|
|
|
+ log.warn("Device not found for clientId: {}", clientId);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ if (!targetPointsStr.equals(dev.getTargetPoints())) {
|
|
|
+ // 存储跌倒事件
|
|
|
+ if (messageType == 3) {
|
|
|
+
|
|
|
+ event = obj.getString("event");
|
|
|
+ if (event.equals("fall_confirmed")) {
|
|
|
+ EventList eventListVO = new EventList();
|
|
|
+ eventListVO.setDevId(dev.getDevId());
|
|
|
+ eventListVO.setPose(pose);
|
|
|
+ eventListVO.setIsHandle(0);
|
|
|
+ eventListVO.setTargetPoints(targetPointsStr);
|
|
|
+ eventListVO.setEventType(messageType);
|
|
|
+ eventService.save(eventListVO);
|
|
|
}
|
|
|
- target.setTargetPoints(targetPointsStr);
|
|
|
- target.setEvent(messageType);
|
|
|
- devTargetService.save(target);*/
|
|
|
-
|
|
|
- DevInfo devInfo = new DevInfo();
|
|
|
- devInfo.setTargetPoints(targetPointsStr);
|
|
|
- devInfo.setSignalTime(LocalDateTime.now());
|
|
|
- devInfoService.update(devInfo, new LambdaUpdateWrapper<DevInfo>().eq(DevInfo::getClientId, clientId));
|
|
|
- } else {
|
|
|
- log.info("此次targetPoints与上次相同!");
|
|
|
}
|
|
|
+ } else {
|
|
|
+ log.info("此次targetPoints与上次相同!");
|
|
|
}
|
|
|
|
|
|
//向前端发送数据
|
|
@@ -207,9 +186,9 @@ public class MqttSubHandle {
|
|
|
msg.put("x", x);
|
|
|
msg.put("y", y);
|
|
|
msg.put("zt", pose);
|
|
|
- msg.put("dev_id", clientId);
|
|
|
+ msg.put("clientId", clientId);
|
|
|
msg.put("event", event);
|
|
|
- msg.put("msg_type", "event");
|
|
|
+ msg.put("msgType", "event");
|
|
|
|
|
|
// todo 发送socket
|
|
|
mqttClient.sendMessage(String.format("/mps/%s/event", clientId), msg.toString());
|
|
@@ -222,20 +201,41 @@ public class MqttSubHandle {
|
|
|
StringBuffer devName = new StringBuffer("");
|
|
|
String devId = "";
|
|
|
// 设备拥有者openid
|
|
|
- if (devs != null && devs.size() > 0) {
|
|
|
- DevInfo dev = devs.get(0);
|
|
|
+ if (dev != null) {
|
|
|
userIds.add(dev.getUserId());
|
|
|
devName.append(dev.getDevName());
|
|
|
devId = dev.getClientId();
|
|
|
}
|
|
|
- // 被分享者openid
|
|
|
-// List<ShareVO> shares = baseDAO.queryAllByCondition(ShareVO.class, " and dev_id ='"+dev_id+"' ", null);
|
|
|
-// if(shares !=null && shares.size()>0) {
|
|
|
-// for(int i=0;i<shares.size();i++) {
|
|
|
-// ShareVO share =shares.get(i);
|
|
|
-// openids.add(share.getShared());
|
|
|
-// }
|
|
|
-// }
|
|
|
+
|
|
|
+ // 针对跌倒事件 发送 网页 主题消息提示
|
|
|
+ String tenantName = "";
|
|
|
+ if (dev.getTenantId() != null) {
|
|
|
+ TblTenant tblTenant = tblTenantService.getById(dev.getTenantId());
|
|
|
+ tenantName = tblTenant.getTenantName();
|
|
|
+
|
|
|
+ JSONObject webMsg = new JSONObject();
|
|
|
+ webMsg.put("targetPoints", targetPointArray);
|
|
|
+ webMsg.put("x", x);
|
|
|
+ webMsg.put("y", y);
|
|
|
+ webMsg.put("zt", pose);
|
|
|
+ webMsg.put("clientId", clientId);
|
|
|
+ webMsg.put("event", "fall_confirmed");
|
|
|
+ webMsg.put("msgType", "fall");
|
|
|
+ webMsg.put("devName", devName.toString());
|
|
|
+ webMsg.put("tenantName", tenantName);
|
|
|
+
|
|
|
+ // 查询当前需要发送的userId
|
|
|
+ List<AdminUserInfo> adminUserInfos = adminUserService.queryByTenantIdAndUserType(dev.getTenantId(), null);
|
|
|
+ if (!CollectionUtils.isEmpty(adminUserInfos)) {
|
|
|
+ for (AdminUserInfo adminUserInfo : adminUserInfos) {
|
|
|
+
|
|
|
+ // 判断当前用户是否登录网页
|
|
|
+ if (redisUtil.sIsMember(RedisCacheConstant.MQTT_CLIENT_USERID, RedisCacheConstant.WEB_USER_PRE + adminUserInfo.getUserId())) {
|
|
|
+ mqttClient.sendMessage(String.format(TopicConstants.TOPIC_MPS_WEB_NOTIC, RedisCacheConstant.WEB_USER_PRE + adminUserInfo.getUserId()), webMsg.toString(), 2, false);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
if (!userIds.isEmpty()) {
|
|
|
// 拥有者和被分享者phone
|
|
@@ -264,32 +264,22 @@ public class MqttSubHandle {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ public void subDasAlarmEvent(String topic, String payload) {
|
|
|
|
|
|
- public void subDasAlarmEvent(String topic, String payload) {
|
|
|
-
|
|
|
+ log.info("Received device message: topic={}, payload={}", topic, payload);
|
|
|
JSONObject obj = JSONObject.parseObject(payload);
|
|
|
String clientId = obj.getString("dev_id");
|
|
|
- log.info("mqttsub, topic:{}, clientId:{}", "/das/event", clientId);
|
|
|
|
|
|
- List<DevInfo> devs = devInfoService.queryByClientId(clientId);
|
|
|
+ DevInfo dev = devInfoService.queryByClientId(clientId);
|
|
|
List<Long> userIds = new ArrayList<>();
|
|
|
StringBuilder devName = new StringBuilder();
|
|
|
String devId = "";
|
|
|
// 设备拥有者openid
|
|
|
- if (devs != null && !devs.isEmpty()) {
|
|
|
- DevInfo dev = devs.get(0);
|
|
|
+ if (dev != null) {
|
|
|
userIds.add(dev.getUserId());
|
|
|
devName.append(dev.getDevName());
|
|
|
devId = dev.getClientId();
|
|
|
}
|
|
|
- // 被分享者openid
|
|
|
-// List<ShareVO> shares = baseDAO.queryAllByCondition(ShareVO.class, " and dev_id ='"+dev_id+"' ", null);
|
|
|
-// if(shares !=null && shares.size()>0) {
|
|
|
-// for(int i=0;i<shares.size();i++) {
|
|
|
-// ShareVO share =shares.get(i);
|
|
|
-// openids.add(share.getShared());
|
|
|
-// }
|
|
|
-// }
|
|
|
|
|
|
if (!userIds.isEmpty()) {
|
|
|
// 拥有者和被分享者phone
|
|
@@ -317,9 +307,9 @@ public class MqttSubHandle {
|
|
|
|
|
|
public void subDasRealtimePos(String topic, String payload) {
|
|
|
|
|
|
+ log.debug("Received device message: topic={}, payload={}", topic, payload);
|
|
|
JSONObject obj = JSONObject.parseObject(payload);
|
|
|
String clientId = obj.getString("dev_id");
|
|
|
- log.info("mqttsub, topic:{}, clientId:{}", "/das/realtime_pos", clientId);
|
|
|
byte messageType = obj.getByteValue("message_type");
|
|
|
byte pose = obj.getByteValue("pose");
|
|
|
JSONArray targetPointArray = obj.getJSONArray("target_point");
|
|
@@ -340,74 +330,55 @@ public class MqttSubHandle {
|
|
|
targetPoints[i] = new BigDecimal[]{targetPoint.getBigDecimal(0), targetPoint.getBigDecimal(1), targetPoint.getBigDecimal(2)};
|
|
|
}
|
|
|
|
|
|
- String targetPointsStr = JSON.toJSONString(targetPoints);
|
|
|
- List<DevInfo> devs = devInfoService.queryByClientId(clientId);
|
|
|
- if (devs != null && !devs.isEmpty()) {
|
|
|
- DevInfo dev = devs.get(0);
|
|
|
- if (!targetPointsStr.equals(dev.getTargetPoints())) {
|
|
|
- // 存储跌倒事件
|
|
|
- if (messageType == 3) {
|
|
|
-
|
|
|
- String event = obj.getString("event");
|
|
|
- if (event.equals("fall_confirmed")) {
|
|
|
- EventList eventListVO = new EventList();
|
|
|
- eventListVO.setDevId(dev.getDevId());
|
|
|
- eventListVO.setPose(pose);
|
|
|
- eventListVO.setIsHandle(0);
|
|
|
- eventListVO.setTargetPoints(targetPointsStr);
|
|
|
- eventListVO.setEventType(messageType);
|
|
|
- eventService.save(eventListVO);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
-/* // target_points数据入库
|
|
|
- DevTarget target = new DevTarget();
|
|
|
- target.setDevId(dev.getDevId());
|
|
|
- target.setPose(pose);
|
|
|
- target.setIsHandle(false);
|
|
|
- if (DevPosFixUtil.containsKey(clientId)) {
|
|
|
- target.setFixPose(DevPosFixUtil.get(clientId));
|
|
|
- } else {
|
|
|
- target.setFixPose((byte) -1);
|
|
|
- }
|
|
|
- target.setTargetPoints(targetPointsStr);
|
|
|
- target.setEvent(messageType);
|
|
|
- devTargetService.save(target);*/
|
|
|
-
|
|
|
- // todo 当前设备 监测 的最后一次 点位信息
|
|
|
-// redisservice.hget(RedisCacheConstant.KEY_DEVICE_pre + clientId, "lastTargetTime“)
|
|
|
-// redisservice.hget(RedisCacheConstant.KEY_DEVICE_pre + clientId, "lastTargetStr“)
|
|
|
-// DevInfo devInfo = new DevInfo();
|
|
|
-// devInfo.setTargetPoints(targetPointsStr);
|
|
|
-// devInfo.setSignalTime(LocalDateTime.now());
|
|
|
-// devInfoService.update(devInfo, new LambdaUpdateWrapper<DevInfo>().eq(DevInfo::getClientId, clientId));
|
|
|
- } else {
|
|
|
- log.info("此次targetPoints与上次相同!");
|
|
|
- }
|
|
|
- }
|
|
|
+// String targetPointsStr = JSON.toJSONString(targetPoints);
|
|
|
+// DevInfo dev = devInfoService.queryByClientId(clientId);
|
|
|
+// if (dev != null) {
|
|
|
+// if (!targetPointsStr.equals(dev.getTargetPoints())) {
|
|
|
+// // 存储跌倒事件
|
|
|
+// if (messageType == 3) {
|
|
|
+//
|
|
|
+// String event = obj.getString("event");
|
|
|
+// if (event.equals("fall_confirmed")) {
|
|
|
+// EventList eventListVO = new EventList();
|
|
|
+// eventListVO.setDevId(dev.getDevId());
|
|
|
+// eventListVO.setPose(pose);
|
|
|
+// eventListVO.setIsHandle(0);
|
|
|
+// eventListVO.setTargetPoints(targetPointsStr);
|
|
|
+// eventListVO.setEventType(messageType);
|
|
|
+// eventService.save(eventListVO);
|
|
|
+// }
|
|
|
+// }
|
|
|
+//
|
|
|
+// // todo 当前设备 监测 的最后一次 点位信息
|
|
|
+//// redisservice.hget(RedisCacheConstant.KEY_DEVICE_pre + clientId, "lastTargetTime“)
|
|
|
+//// redisservice.hget(RedisCacheConstant.KEY_DEVICE_pre + clientId, "lastTargetStr“)
|
|
|
+//// DevInfo devInfo = new DevInfo();
|
|
|
+//// devInfo.setTargetPoints(targetPointsStr);
|
|
|
+//// devInfo.setSignalTime(LocalDateTime.now());
|
|
|
+//// devInfoService.update(devInfo, new LambdaUpdateWrapper<DevInfo>().eq(DevInfo::getClientId, clientId));
|
|
|
+// } else {
|
|
|
+// log.info("此次targetPoints与上次相同!");
|
|
|
+// }
|
|
|
+// }
|
|
|
|
|
|
//向前端发送数据
|
|
|
JSONObject msg = new JSONObject();
|
|
|
- msg.put("target_points", targetPointArray);
|
|
|
+ msg.put("targetPoints", targetPointArray);
|
|
|
msg.put("x", x);
|
|
|
msg.put("y", y);
|
|
|
msg.put("zt", pose);
|
|
|
- msg.put("client_id", clientId);
|
|
|
- msg.put("msg_type", "target_point");
|
|
|
-// if (topic.equals("/das/realtime_pos")) {
|
|
|
-// msg.put("msg_type", "target_point");
|
|
|
-// } else {
|
|
|
-// msg.put("event", event);
|
|
|
-// msg.put("msg_type", "event");
|
|
|
-// }
|
|
|
+ msg.put("clientId", clientId);
|
|
|
+ msg.put("msgType", "target_point");
|
|
|
|
|
|
// todo websocket 发送msg
|
|
|
// PushMsgWebSocket.sendMessageTo(msg.toString(), dev_id);
|
|
|
mqttClient.sendMessage(String.format(TopicConstants.TOPIC_MPS_REALTIME_POS, clientId), msg.toString());
|
|
|
}
|
|
|
|
|
|
- public void subDasDevStatus(String topic, String payload) {
|
|
|
+ public void subDasDevStatus(String topic, String payload)
|
|
|
+ {
|
|
|
|
|
|
+ log.info("Received device message: topic={}, payload={}", topic, payload);
|
|
|
JSONObject obj = JSONObject.parseObject(payload);
|
|
|
String clientId = obj.getString("dev_id");
|
|
|
String devType = obj.getString("dev_type");
|
|
@@ -457,24 +428,24 @@ public class MqttSubHandle {
|
|
|
vo.setWidth(width);
|
|
|
vo.setLength(length);
|
|
|
|
|
|
- List<DevInfo> devs = devInfoService.queryByClientId(clientId);
|
|
|
- if (!devs.isEmpty()) {
|
|
|
+ DevInfo dev = devInfoService.queryByClientId(clientId);
|
|
|
+ if (dev != null) {
|
|
|
|
|
|
devInfoService.update(vo, new LambdaUpdateWrapper<DevInfo>().eq(DevInfo::getClientId, clientId));
|
|
|
|
|
|
// 向前端发送数据
|
|
|
JSONObject msg = new JSONObject();
|
|
|
- msg.put("mount_plain", mountPlain);
|
|
|
+ msg.put("mountPlain", mountPlain);
|
|
|
msg.put("height", height);
|
|
|
msg.put("software", software);
|
|
|
- msg.put("start_x", startX);
|
|
|
- msg.put("stop_x", stopX);
|
|
|
- msg.put("start_y", startY);
|
|
|
- msg.put("stop_y", stopY);
|
|
|
- msg.put("start_z", startZ);
|
|
|
- msg.put("stop_z", stopZ);
|
|
|
- msg.put("dev_type", devType);
|
|
|
- msg.put("msg_type", "dev_status");
|
|
|
+ msg.put("startX", startX);
|
|
|
+ msg.put("stopX", stopX);
|
|
|
+ msg.put("startY", startY);
|
|
|
+ msg.put("stopY", stopY);
|
|
|
+ msg.put("startZ", startZ);
|
|
|
+ msg.put("stopZ", stopZ);
|
|
|
+ msg.put("devType", devType);
|
|
|
+ msg.put("msgType", "dev_status");
|
|
|
|
|
|
// todo websocket 发送
|
|
|
// PushMsgWebSocket.sendMessageTo(msg.toString(), clientId);
|
|
@@ -487,44 +458,41 @@ public class MqttSubHandle {
|
|
|
|
|
|
public void subDasExist(String topic, String payload) {
|
|
|
|
|
|
+ log.info("Received device message: topic={}, payload={}", topic, payload);
|
|
|
JSONObject obj = JSONObject.parseObject(payload);
|
|
|
String clientId = obj.getString("dev_id");
|
|
|
String event = obj.getString("event");
|
|
|
//向前端发送数据
|
|
|
JSONObject msg = new JSONObject();
|
|
|
- msg.put("client_id", clientId);
|
|
|
+ msg.put("clientId", clientId);
|
|
|
msg.put("event", event);
|
|
|
- msg.put("msg_type", "exist");
|
|
|
+ msg.put("msgType", "exist");
|
|
|
|
|
|
// todo websocketservice
|
|
|
// PushMsgWebSocket.sendMessageTo(msg.toString(), dev_id);
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * 从MQTT主题中提取操作名称(最后一段路径)
|
|
|
- *
|
|
|
- * 主题格式:/dev/{device_id}/{action}
|
|
|
- * 例如:/dev/123456/login 返回 "login"
|
|
|
- *
|
|
|
- * @param topic MQTT主题字符串
|
|
|
- * @return 操作名称,如果解析失败返回null
|
|
|
- */
|
|
|
- private String extractActionFromTopic(String topic) {
|
|
|
- if (topic != null && topic.startsWith("/dev/")) {
|
|
|
- String[] parts = topic.split("/");
|
|
|
- if (parts.length >= 3) {
|
|
|
- return parts[parts.length - 1]; // 返回最后一段
|
|
|
- }
|
|
|
+ public void subMqttClientConnect(String topic, String payload) {
|
|
|
+
|
|
|
+ log.info("Received device message: topic={}, payload={}", topic, payload);
|
|
|
+ if (StringUtils.isEmpty(payload)) {
|
|
|
+ return;
|
|
|
}
|
|
|
- return null;
|
|
|
- }
|
|
|
|
|
|
- private String extractActionFromTopic02(String topic) {
|
|
|
- if (StringUtils.isNotBlank(topic)) {
|
|
|
- String[] parts = topic.split("/");
|
|
|
+ JSONObject obj = JSONObject.parseObject(payload);
|
|
|
+ String userId = obj.getString("userId");
|
|
|
+ String deviceType = obj.getString("deviceType");
|
|
|
+ String msgType = obj.getString("msgType");
|
|
|
+
|
|
|
+ if (StringUtils.isBlank(userId) || StringUtils.isBlank(deviceType) || StringUtils.isBlank(msgType)) {
|
|
|
+ log.error("userId or deviceType or msgType is null");
|
|
|
+ return;
|
|
|
+ }
|
|
|
|
|
|
- return parts[parts.length - 1]; // 返回最后一段
|
|
|
+ if (msgType.equals("connect")) {
|
|
|
+ redisUtil.sAdd(RedisCacheConstant.MQTT_CLIENT_USERID, deviceType + "_" + userId);
|
|
|
+ } else if (msgType.equals("disconnect")) {
|
|
|
+ redisUtil.sRemove(RedisCacheConstant.MQTT_CLIENT_USERID, deviceType + "_" + userId);
|
|
|
}
|
|
|
- return null;
|
|
|
}
|
|
|
}
|