Pārlūkot izejas kodu

mqtt 接收 dev 消息

chejianzheng 1 mēnesi atpakaļ
vecāks
revīzija
c0b2f77577

+ 15 - 0
portal-service-common/src/main/java/com/hfln/portal/common/constant/mqtt/topic/TopicConstants.java

@@ -57,4 +57,19 @@ public interface TopicConstants {
      * 与 las 交互
      */
     String TOPIC_LAS_ALARM_PLAN_UPDATE = "/las/alarm_plan_update";
+
+
+    /**
+     * dev设备 接收主题交互
+     */
+    // 设备登录
+    String TOPIC_DEV_LOGIN = "/dev/+/login";
+    // 设备离线遗嘱消息
+    String TOPIC_DEV_OFFLINE = "/dev/+/offline";
+    // 设备检测存在改变事件
+    String TOPIC_DEV_PRESENCE_CHANGE = "/dev/+/presence_change";
+    // 设备检测到跌倒改变事件
+    String TOPIC_DEV_FALLING_CHANGE = "/dev/+/falling_event_change";
+    // 收到设备上报配置信息
+    String TOPIC_DEV_DEVICE_PARAM = "/dev/+/report_device_param";
 }

+ 9 - 1
portal-service-infrastructure/src/main/java/com/hfln/portal/infrastructure/config/MqttConfig.java

@@ -123,6 +123,13 @@ public class MqttConfig {
 //                TopicConstants.TOPIC_DAS_REALTIME_POS_2
 
                 , TopicConstants.TOPIC_MQTT_CLIENT_CONNECT
+
+                // 设备主题相关 /dev/*
+                , TopicConstants.TOPIC_DEV_LOGIN
+                , TopicConstants.TOPIC_DEV_OFFLINE
+                , TopicConstants.TOPIC_DEV_PRESENCE_CHANGE
+                , TopicConstants.TOPIC_DEV_FALLING_CHANGE
+                , TopicConstants.TOPIC_DEV_DEVICE_PARAM
         };
         
         MqttPahoMessageDrivenChannelAdapter adapter =
@@ -130,7 +137,8 @@ public class MqttConfig {
         adapter.setCompletionTimeout(5000);
         adapter.setConverter(new DefaultPahoMessageConverter());
         adapter.setQos(2, 2, 0, 0
-        , 2);
+        , 2
+        , 2, 2, 1, 2, 1);
         adapter.setOutputChannel(inputChannel());
         adapter.setTaskScheduler(taskScheduler());
         return adapter;

+ 300 - 12
portal-service-infrastructure/src/main/java/com/hfln/portal/infrastructure/mqtt/MqttSubHandle.java

@@ -20,6 +20,9 @@ import org.springframework.stereotype.Component;
 import org.springframework.util.CollectionUtils;
 
 import java.math.BigDecimal;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -109,6 +112,24 @@ public class MqttSubHandle {
                 case "connect":
                     subMqttClientConnect(topic, payload);
                     break;
+
+                // 接收dev 主题消息
+                case "login":
+                    subDevLogin(topic, payload);
+                    break;
+                case "offline":
+                    subDevOffline(topic, payload);
+                    break;
+                case "presence_change":
+                    subDevPresenceChange(topic, payload);
+                    break;
+                case "falling_event_change":
+                    subDevFallingEventChange(topic, payload);
+                    break;
+                case "report_device_param":
+                    subDevReportDeviceParam(topic, payload);
+                    break;
+
                 default:
                     log.debug("Unhandled device topic: {}", topic);
                     break;
@@ -118,6 +139,268 @@ public class MqttSubHandle {
         }
     }
 
+    private void subDevReportDeviceParam(String topic, String payload) {
+        log.info("Received device message: topic={}, payload={}", topic, payload);
+        String clientId = getDevId(topic);
+        DevInfo dev = devInfoService.queryByClientId(clientId);
+        if (dev == null) {
+            log.warn("Device not found for clientId: {}", clientId);
+            return;
+        }
+
+        JSONObject obj = JSONObject.parseObject(payload);
+        String deviceInfoStr = obj.getString("device_info");
+        JSONObject deviceInfo = JSONObject.parseObject(deviceInfoStr);
+        String software = deviceInfo.getString("firmware");
+        dev.setSoftware(software);
+        int indicatorLed = obj.getIntValue("indicator_led");
+        dev.setStatusLight(indicatorLed == 0 ? 1 : 0);
+
+        devInfoService.updateById(dev);
+    }
+
+    private void subDevFallingEventChange(String topic, String payload) {
+        log.info("Received device message: topic={}, payload={}", topic, payload);
+        String clientId = getDevId(topic);
+        DevInfo dev = devInfoService.queryByClientId(clientId);
+        if (dev == null) {
+            log.warn("Device not found for clientId: {}", clientId);
+            return;
+        }
+
+        JSONObject obj = JSONObject.parseObject(payload);
+        int falling = obj.getIntValue("falling");
+        //0:no fall 1:detected 2: confirmed 3:calling
+
+//        String event = obj.getString("event");
+//        byte messageType = obj.getByteValue("message_type");
+//        byte pose = obj.getByteValue("pose");
+//        JSONArray targetPointArray = obj.getJSONArray("target_point");
+//        if (targetPointArray == null || targetPointArray.isEmpty()) {
+//            log.info("mqttsub target_point is Empty, do nothing");
+//            return;
+//        }
+
+
+        // 2 有跌倒事件发生
+        if (falling == 2) {
+
+//            BigDecimal[][] targetPoints = new BigDecimal[targetPointArray.size()][3];
+//            for (int i = 0; i < targetPointArray.size(); i++) {
+//                JSONArray targetPoint = targetPointArray.getJSONArray(i);
+//                targetPoints[i] = new BigDecimal[]{targetPoint.getBigDecimal(0), targetPoint.getBigDecimal(1), targetPoint.getBigDecimal(2)};
+//            }
+//            String targetPointsStr = JSON.toJSONString(targetPoints);
+
+            //存储跌倒时间到数据库
+            EventList eventListVO = new EventList();
+            eventListVO.setDevId(dev.getDevId());
+//            eventListVO.setPose((int) pose);
+            eventListVO.setIsHandle(0);
+//            eventListVO.setTargetPoints(targetPointsStr);
+            eventListVO.setEventType(falling);
+            eventListVO.setTenantId(dev.getTenantId());
+            eventListService.save(eventListVO);
+
+            // 整理需要接收消息的人员列表
+            log.info("mqttutil--有跌倒事件");
+            List<SendMsgUserDto> sendList = new ArrayList<>();
+            String devName = dev.getDevName();
+            Long userId = dev.getUserId();
+
+            // 需要发送提示的 有 当前设备拥有者, 被分享者, 以及 对当前设备 具有管理权限的 web管理用户
+            //  小程序拥有者
+            if (userId != null) {
+                UserInfo userInfo = userService.queryById(userId);
+                SendMsgUserDto msgUserDto = new SendMsgUserDto();
+                msgUserDto.setUserId(userId);
+                msgUserDto.setPhone(userInfo.getPhone());
+                msgUserDto.setUnionId(userInfo.getUnionId());
+                msgUserDto.setMessageFlag(true);
+                msgUserDto.setServiceNumberFlag(true);
+                WxRelation wxRelation = wxRelationService.queryOneByUnionId(userInfo.getUnionId());
+                if (wxRelation != null) {
+                    msgUserDto.setFwhOpenId(wxRelation.getFwhOpenId());
+                }
+                sendList.add(msgUserDto);
+
+                // 对于 小程序拥有者,给出全局跌倒提醒,-- 被分享者是否需要
+
+                JSONObject wxMsg = new JSONObject();
+//                wxMsg.put("targetPoints", targetPointArray);
+//                wxMsg.put("x", x);
+//                wxMsg.put("y", y);
+//                wxMsg.put("zt", pose);
+                wxMsg.put("clientId", clientId);
+                wxMsg.put("event", "fall_confirmed");
+                wxMsg.put("msgType", "fall");
+                wxMsg.put("devName", devName.toString());
+                wxMsg.put("eventListId", String.valueOf(eventListVO.getEventListId()));
+
+                log.info("发送微信跌倒主题消息:topic:{}, msg:{}", String.format(TopicConstants.TOPIC_MPS_NOTIC, RedisCacheConstant.WX_USER_PRE + userInfo.getUserId()), wxMsg.toString());
+                mqttClient.sendMessage(String.format(TopicConstants.TOPIC_MPS_NOTIC, RedisCacheConstant.WX_USER_PRE + userInfo.getUserId()), wxMsg.toString(), 2, false);
+
+                //  设备的被分享者 - 根据标志位筛选并添加到发送列表
+                List<DevShare> shares = devShareService.queryConfirmedByDevId(dev.getDevId());
+                if (!CollectionUtils.isEmpty(shares)) {
+                    for (DevShare share : shares) {
+                        // 发送微信跌倒主题消息
+                        log.info("发送微信跌倒主题消息:topic:{}, msg:{}", String.format(TopicConstants.TOPIC_MPS_NOTIC, RedisCacheConstant.WX_USER_PRE + share.getSharedUserId()), wxMsg.toString());
+                        mqttClient.sendMessage(String.format(TopicConstants.TOPIC_MPS_NOTIC, RedisCacheConstant.WX_USER_PRE + share.getSharedUserId()), wxMsg.toString(), 2, false);
+
+                        // 创建被分享者的消息用户DTO
+                        SendMsgUserDto msgUserShared = new SendMsgUserDto();
+                        msgUserShared.setUserId(share.getSharedUserId());
+                        msgUserShared.setPhone(share.getSharedPhone());
+
+                        // 获取完整的分享信息(包含标志位)
+                        DevShare devShare = devShareService.getById(share.getShareId());
+                        if (devShare != null) {
+                            // 将Integer类型的标志位转换为boolean类型:0-授权(true),1-拒绝(false)
+                            boolean messageFlag = devShare.getMessageFlag() != null && devShare.getMessageFlag() == 0;
+                            boolean serviceNumberFlag = devShare.getServiceNumberFlag() != null && devShare.getServiceNumberFlag() == 0;
+
+                            msgUserShared.setMessageFlag(messageFlag);
+                            msgUserShared.setServiceNumberFlag(serviceNumberFlag);
+
+                            // 根据标志位决定是否添加到发送列表
+                            // 只有当短信通知或服务号通知任一被授权时,才添加到发送列表
+                            if (messageFlag || serviceNumberFlag) {
+                                // 获取被分享者的微信关系信息
+                                UserInfo sharedUserInfo = userService.queryById(share.getSharedUserId());
+                                if (sharedUserInfo != null) {
+                                    msgUserShared.setUnionId(sharedUserInfo.getUnionId());
+                                    WxRelation sharedWxRelation = wxRelationService.queryOneByUnionId(sharedUserInfo.getUnionId());
+                                    if (sharedWxRelation != null) {
+                                        msgUserShared.setFwhOpenId(sharedWxRelation.getFwhOpenId());
+                                    }
+                                    sendList.add(msgUserShared);
+                                    log.info("被分享者已添加到发送列表: userId={}, messageFlag={}, serviceNumberFlag={}",
+                                            share.getSharedUserId(), messageFlag, serviceNumberFlag);
+                                }
+                            } else {
+                                log.info("被分享者拒绝所有通知: userId={}, messageFlag={}, serviceNumberFlag={}",
+                                        share.getSharedUserId(), messageFlag, serviceNumberFlag);
+                            }
+                        }
+                    }
+                }
+            }
+
+            // 针对跌倒事件 发送 网页 主题消息提示
+            String tenantName = "";
+            // 网页告警提示 需要当前设备绑定到租户,会发送到当前租户已经登录的管理员页面
+            if (dev.getTenantId() != null) {
+                TblTenant tblTenant = tblTenantService.getById(dev.getTenantId());
+                tenantName = tblTenant.getTenantName();
+
+                JSONObject webMsg = new JSONObject();
+//                webMsg.put("targetPoints", targetPointArray);
+//                webMsg.put("x", x);
+//                webMsg.put("y", y);
+//                webMsg.put("zt", pose);
+                webMsg.put("clientId", clientId);
+                webMsg.put("event", "fall_confirmed");
+                webMsg.put("msgType", "fall");
+                webMsg.put("devName", devName.toString());
+                webMsg.put("tenantName", tenantName);
+                webMsg.put("eventListId", String.valueOf(eventListVO.getEventListId()));
+
+                // 查询当前需要发送的userId
+                List<AdminUserInfo> adminUserInfos = adminUserService.queryByTenantIdAndUserType(dev.getTenantId(), null);
+                if (!CollectionUtils.isEmpty(adminUserInfos)) {
+                    for (AdminUserInfo adminUserInfo : adminUserInfos) {
+
+                        // 3 对当前设备 具有管理权限的 web管理用户
+                        // todo 确定网页 管理用户 是否有 短信 服务号发送 功能
+//                        SendMsgUserDto sendDto = new SendMsgUserDto();
+//                        sendDto.setUserId(adminUserInfo.getUserId());
+//                        sendDto.setPhone(adminUserInfo.getPhone());
+////                        sendDto.setUnionId(adminUserInfo.getUnionId());
+////                        WxRelation wxRelation = wxRelationService.queryOneByUnionId(userInfo.getUnionId());
+////                        if (wxRelation != null) {
+////                            msgUserDto.setFwhOpenId(wxRelation.getFwhOpenId());
+////                        }
+//                        sendList.add(sendDto);
+
+                        // 判断当前用户是否登录网页
+                        if (redisUtil.sIsMember(RedisCacheConstant.MQTT_CLIENT_USERID, RedisCacheConstant.WEB_USER_PRE + adminUserInfo.getUserId())) {
+                            log.info("发送网页跌倒主题消息:topic:{}, msg:{}", String.format(TopicConstants.TOPIC_MPS_NOTIC, RedisCacheConstant.WEB_USER_PRE + adminUserInfo.getUserId()), webMsg.toString());
+                            mqttClient.sendMessage(String.format(TopicConstants.TOPIC_MPS_NOTIC, RedisCacheConstant.WEB_USER_PRE + adminUserInfo.getUserId()), webMsg.toString(), 2, false);
+                        }
+                    }
+                }
+            }
+            //3.调用 发送短信和微信公众号通知 功能
+            sendMesAndWxService(sendList, devName, clientId);
+        }
+    }
+
+    private void subDevPresenceChange(String topic, String payload) {
+
+        log.info("Received device message: topic={}, payload={}", topic, payload);
+        String clientId = getDevId(topic);
+        DevInfo dev = devInfoService.queryByClientId(clientId);
+        if (dev == null) {
+            log.warn("Device not found for clientId: {}", clientId);
+            return;
+        }
+        JSONObject obj = JSONObject.parseObject(payload);
+        if (obj.get("presence") == null) {
+            log.warn("presence is null");
+            return;
+        }
+        //0:no presense 1:presence
+        int presence = obj.getIntValue("presence");
+
+        devInfoService.update(
+                new LambdaUpdateWrapper<DevInfo>()
+                        .eq(DevInfo::getClientId, clientId)
+                        .set(DevInfo::getExistFlag, presence == 1 ? DevInfo.Constants.ExistFlag.EXIST : DevInfo.Constants.ExistFlag.NOT_EXIST)
+                        .set(DevInfo::getPresenceChangeTime,
+                                obj.get("timestamp") != null
+                                        ? LocalDateTime.ofInstant(Instant.ofEpochMilli(obj.getLongValue("timestamp")), ZoneId.of("UTC"))
+                                        : LocalDateTime.now())
+        );
+    }
+
+    private void subDevOffline(String topic, String payload) {
+
+        log.info("Received device message: topic={}, payload={}", topic, payload);
+        String clientId = getDevId(topic);
+        DevInfo dev = devInfoService.queryByClientId(clientId);
+        if (dev == null) {
+            log.warn("Device not found for clientId: {}", clientId);
+            return;
+        }
+
+        devInfoService.update(
+                new LambdaUpdateWrapper<DevInfo>()
+                        .eq(DevInfo::getClientId, clientId)
+                        .set(DevInfo::getOnline, DevInfo.Constants.OnlineStatus.OFFLINE)
+                        .set(DevInfo::getOfflineTime, LocalDateTime.now())
+        );
+    }
+
+    private void subDevLogin(String topic, String payload) {
+
+        log.info("Received device message: topic={}, payload={}", topic, payload);
+        String clientId = getDevId(topic);
+        DevInfo dev = devInfoService.queryByClientId(clientId);
+        if (dev == null) {
+            log.warn("Device not found for clientId: {}", clientId);
+            return;
+        }
+
+        devInfoService.update(
+                new LambdaUpdateWrapper<DevInfo>()
+                        .eq(DevInfo::getClientId, clientId)
+                        .set(DevInfo::getOnline, DevInfo.Constants.OnlineStatus.ONLINE)
+                        .set(DevInfo::getExistFlag, DevInfo.Constants.ExistFlag.EXIST)
+                        .set(DevInfo::getPresenceChangeTime, LocalDateTime.now())
+        );
+    }
+
     /**
      * 处理跌倒事件消息
      *
@@ -167,21 +450,21 @@ public class MqttSubHandle {
             eventListService.save(eventListVO);
 
 
-            //  向前端发送数据
+//            //  向前端发送数据
             JSONArray targetPointOne = targetPointArray.getJSONArray(0);
             BigDecimal x = targetPointOne.getBigDecimal(0);
             BigDecimal y = targetPointOne.getBigDecimal(1);
-            JSONObject msg = new JSONObject();
-            msg.put("target_points", targetPointArray);
-            msg.put("x", x);
-            msg.put("y", y);
-            msg.put("zt", pose);
-            msg.put("clientId", clientId);
-            msg.put("event", event);
-            msg.put("msgType", "event");
-            msg.put("eventListId", String.valueOf(eventListVO.getEventListId()));
-            // 发送socket
-            mqttClient.sendMessage(String.format("/mps/%s/event", clientId), msg.toString());
+//            JSONObject msg = new JSONObject();
+//            msg.put("target_points", targetPointArray);
+//            msg.put("x", x);
+//            msg.put("y", y);
+//            msg.put("zt", pose);
+//            msg.put("clientId", clientId);
+//            msg.put("event", event);
+//            msg.put("msgType", "event");
+//            msg.put("eventListId", String.valueOf(eventListVO.getEventListId()));
+//            // 发送socket
+//            mqttClient.sendMessage(String.format("/mps/%s/event", clientId), msg.toString());
 
             // 整理需要接收消息的人员列表
             log.info("mqttutil--有跌倒事件");
@@ -598,6 +881,11 @@ public class MqttSubHandle {
             redisUtil.sRemove(RedisCacheConstant.MQTT_CLIENT_USERID, deviceType + "_" + userId);
         }
     }
+
+    private String getDevId(String topic) {
+        String[] split = topic.split("/");
+        return split[split.length - 2];
+    }
 }
 
 

+ 21 - 0
portal-service-infrastructure/src/main/java/com/hfln/portal/infrastructure/po/DevInfo.java

@@ -201,6 +201,19 @@ public class DevInfo extends BasePO {
      */
     @TableField("falling_confirm")
     private BigDecimal fallingConfirm;
+
+    /**
+     * 当前检测是否有人:0-无人,1-有人
+     */
+    @TableField("exist_flag")
+    private Integer existFlag;
+
+    /**
+     *  存在改变时间
+     */
+    @TableField("存在改变时间")
+    private LocalDateTime presenceChangeTime;
+
     /**
      * 设备信息常量类
      */
@@ -213,6 +226,14 @@ public class DevInfo extends BasePO {
             public static final int OFFLINE = 0;  // 离线
             public static final int ONLINE = 1;   // 在线
         }
+
+        /**
+         * 存在状态
+         */
+        public static class ExistFlag {
+            public static final int NOT_EXIST = 0;  // 不存在
+            public static final int EXIST = 1;   // 存在
+        }
         
         /**
          * 设备报警状态

+ 5 - 0
portal-service-infrastructure/src/main/java/com/hfln/portal/infrastructure/po/EventList.java

@@ -27,6 +27,11 @@ public class EventList extends BasePO {
     private Long devId;
 
     /**
+     * 租户表主键id
+     */
+    private Long tenantId;
+
+    /**
      * 姿态
      */
     private Integer pose;