浏览代码

feat(portal): 添加 MQTT 消息处理功能

- 新增多个 MQTT 消息处理类,实现不同类型的 MQTT 消息处理逻辑
- 添加基础处理类 BaseMqttHandler 提供通用方法
- 实现 MqttMessageHandler 接口,规范消息处理方法
- 新增 MqttSubscribe 类统一处理 MQTT 消息分发
- 调整 MqttConfig配置,优化 MQTT 消息订阅
- 更新 TopicConstants,移除未使用的主题常量
chejianzheng 1 月之前
父节点
当前提交
302b2a2248
共有 12 个文件被更改,包括 715 次插入24 次删除
  1. 0 11
      portal-service-common/src/main/java/com/hfln/portal/common/constant/mqtt/topic/TopicConstants.java
  2. 6 13
      portal-service-infrastructure/src/main/java/com/hfln/portal/infrastructure/config/MqttConfig.java
  3. 51 0
      portal-service-infrastructure/src/main/java/com/hfln/portal/infrastructure/mqtt/MqttSubscribe.java
  4. 101 0
      portal-service-infrastructure/src/main/java/com/hfln/portal/infrastructure/mqtt/handlers/AlarmEventHandler.java
  5. 32 0
      portal-service-infrastructure/src/main/java/com/hfln/portal/infrastructure/mqtt/handlers/BaseMqttHandler.java
  6. 50 0
      portal-service-infrastructure/src/main/java/com/hfln/portal/infrastructure/mqtt/handlers/ConnectHandler.java
  7. 259 0
      portal-service-infrastructure/src/main/java/com/hfln/portal/infrastructure/mqtt/handlers/FallingEventChangeHandler.java
  8. 48 0
      portal-service-infrastructure/src/main/java/com/hfln/portal/infrastructure/mqtt/handlers/LastWillHandler.java
  9. 43 0
      portal-service-infrastructure/src/main/java/com/hfln/portal/infrastructure/mqtt/handlers/LoginHandler.java
  10. 8 0
      portal-service-infrastructure/src/main/java/com/hfln/portal/infrastructure/mqtt/handlers/MqttMessageHandler.java
  11. 72 0
      portal-service-infrastructure/src/main/java/com/hfln/portal/infrastructure/mqtt/handlers/PresenceChangeHandler.java
  12. 45 0
      portal-service-infrastructure/src/main/java/com/hfln/portal/infrastructure/mqtt/handlers/ReportDeviceParamHandler.java

+ 0 - 11
portal-service-common/src/main/java/com/hfln/portal/common/constant/mqtt/topic/TopicConstants.java

@@ -33,21 +33,10 @@ public interface TopicConstants {
     String TOPIC_DAS_EVENT = "/das/event";
     // alarm事件
     String TOPIC_DAS_ALARM_EVENT = "/das/alarm_event";
-    // 实时点位
-    String TOPIC_DAS_REALTIME_POS = "/das/realtime_pos";
-    String TOPIC_DAS_REALTIME_POS_2 = "/das/+/realtime_pos";
-    // 信息更新
-    String TOPIC_DAS_DEV_STATUS = "/das/dev_status";
-    // 存在事件
-    String TOPIC_DAS_EXIST = "/das/exist";
-
 
     /**
      * 与前端交互 主题
      */
-    // 发送实时点位
-    String TOPIC_MPS_REALTIME_POS = "/mps/%s/realtime_pos";
-
     String TOPIC_MPS_NOTIC = "/mps/%s/notice";
     // 接收前端mqtt客户端连接 断开信息 主题
     String TOPIC_MQTT_CLIENT_CONNECT = "/mps/client/connect";

+ 6 - 13
portal-service-infrastructure/src/main/java/com/hfln/portal/infrastructure/config/MqttConfig.java

@@ -1,7 +1,7 @@
 package com.hfln.portal.infrastructure.config;
 
 import com.hfln.portal.common.constant.mqtt.topic.TopicConstants;
-import com.hfln.portal.infrastructure.mqtt.MqttSubHandle;
+import com.hfln.portal.infrastructure.mqtt.MqttSubscribe;
 import lombok.extern.slf4j.Slf4j;
 import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
 import org.springframework.beans.factory.annotation.Qualifier;
@@ -114,14 +114,7 @@ public class MqttConfig {
     @Bean
     public MessageProducer mpsInbound() {
         String[] topics = {
-                TopicConstants.TOPIC_DAS_EVENT,
-                TopicConstants.TOPIC_DAS_ALARM_EVENT,
-                TopicConstants.TOPIC_DAS_DEV_STATUS,
-                TopicConstants.TOPIC_DAS_EXIST
-//                ,
-//                TopicConstants.TOPIC_DAS_REALTIME_POS
-//                TopicConstants.TOPIC_DAS_REALTIME_POS_2
-
+                TopicConstants.TOPIC_DAS_ALARM_EVENT
                 , TopicConstants.TOPIC_MQTT_CLIENT_CONNECT
 
                 // 设备主题相关 /dev/*
@@ -139,9 +132,8 @@ public class MqttConfig {
                 new MqttPahoMessageDrivenChannelAdapter(clientId, mqttClientFactory(), topics);
         adapter.setCompletionTimeout(5000);
         adapter.setConverter(new DefaultPahoMessageConverter());
-        adapter.setQos(2, 2, 0, 0
-        , 2
-        , 2, 2, 1, 2, 1
+        adapter.setQos(2, 2
+        , 2, 2, 2, 2, 2
         , 2);
         adapter.setOutputChannel(inputChannel());
         adapter.setTaskScheduler(taskScheduler());
@@ -150,10 +142,11 @@ public class MqttConfig {
 
     @Bean
     @ServiceActivator(inputChannel = "inputChannel")
-    public MessageHandler dasMqttMessageHandler(MqttSubHandle handler, @Qualifier("mqttSubExecutor") ThreadPoolTaskExecutor mqttSubExecutor) {
+    public MessageHandler dasMqttMessageHandler(MqttSubscribe handler, @Qualifier("mqttSubExecutor") ThreadPoolTaskExecutor mqttSubExecutor) {
         return new MessageHandler() {
             @Override
             public void handleMessage(Message<?> message) throws MessagingException {
+//                mqttSubExecutor.execute(() -> handler.handleMessage(message));
                 mqttSubExecutor.execute(() -> handler.handleMessage(message));
             }
         };

+ 51 - 0
portal-service-infrastructure/src/main/java/com/hfln/portal/infrastructure/mqtt/MqttSubscribe.java

@@ -0,0 +1,51 @@
+package com.hfln.portal.infrastructure.mqtt;
+
+import com.hfln.portal.infrastructure.mqtt.handlers.MqttMessageHandler;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.messaging.Message;
+import org.springframework.stereotype.Component;
+
+import java.util.List;
+import java.util.Optional;
+
+@Slf4j
+@Component
+public class MqttSubscribe {
+
+    @Autowired
+    private List<MqttMessageHandler> messageHandlers;
+
+    public void handleMessage(Message<?> message) {
+        try {
+            String topic = (String) message.getHeaders().get("mqtt_receivedTopic");
+            String payload = message.getPayload().toString();
+
+            log.info("Received device message: topic={}, payload={}", topic, payload);
+            if (topic == null) {
+                log.warn("Received message without topic header");
+                return;
+            }
+
+            String action = extractActionFromTopic(topic);
+
+            Optional<MqttMessageHandler> handlerOpt = messageHandlers.stream()
+                    .filter(handler -> handler.supports(action))
+                    .findFirst();
+
+            if (handlerOpt.isPresent()) {
+                handlerOpt.get().handle(topic, payload);
+                log.info("Handled device message success: {}", topic);
+            } else {
+                log.debug("Unhandled device topic: {}", topic);
+            }
+        } catch (Exception e) {
+            log.error("Error handling device message", e);
+        }
+    }
+
+    private String extractActionFromTopic(String topic) {
+        String[] parts = topic.split("/");
+        return parts[parts.length - 1];
+    }
+}

+ 101 - 0
portal-service-infrastructure/src/main/java/com/hfln/portal/infrastructure/mqtt/handlers/AlarmEventHandler.java

@@ -0,0 +1,101 @@
+package com.hfln.portal.infrastructure.mqtt.handlers;
+
+import com.alibaba.fastjson2.JSONObject;
+import com.hfln.portal.domain.customer.AlarmEventType;
+import com.hfln.portal.domain.customer.util.WxOfficeAccountClient;
+import com.hfln.portal.infrastructure.po.AlarmPlan;
+import com.hfln.portal.infrastructure.po.DevInfo;
+import com.hfln.portal.infrastructure.po.UserInfo;
+import com.hfln.portal.infrastructure.po.WxRelation;
+import com.hfln.portal.infrastructure.service.AlarmPlanService;
+import com.hfln.portal.infrastructure.service.DevInfoService;
+import com.hfln.portal.infrastructure.service.UserService;
+import com.hfln.portal.infrastructure.service.WxRelationService;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+public class AlarmEventHandler extends BaseMqttHandler implements MqttMessageHandler {
+
+    private AlarmPlanService alarmPlanService;
+    private WxOfficeAccountClient wxOfficeAccountClient;
+    private WxRelationService wxRelationService;
+    private UserService userService;
+
+    private AlarmEventHandler(AlarmPlanService alarmPlanService,
+                              WxOfficeAccountClient wxOfficeAccountClient,
+                              WxRelationService wxRelationService,
+                              UserService userService,
+                              DevInfoService devInfoService) {
+        super(devInfoService);
+        this.alarmPlanService = alarmPlanService;
+        this.wxOfficeAccountClient = wxOfficeAccountClient;
+        this.wxRelationService = wxRelationService;
+        this.userService = userService;
+    }
+
+    @Override
+    public boolean supports(String action) {
+        return "alarm_event".equals(action);
+    }
+
+    @Override
+    public void handle(String topic, String payload) {
+
+        // 1.判断是否为新逻辑 从LAS获取
+        JSONObject obj = JSONObject.parseObject(payload);
+        String eventType = obj.getString("event_type");
+        // 1.1 判断 消息类型是否符合  起夜异常 or 异常滞留
+        if (AlarmEventType.isTargetEvent(eventType)) {
+            String clientId = obj.getString("dev_id");
+            String uuid = obj.getString("plan_uuid");
+
+            // 1.2 判断告警计划是否开启服务号推送
+            AlarmPlan alarmPlan = alarmPlanService.findByUuid(uuid);
+            if (alarmPlan.getLinkagePushWechatService() == 0) {
+                log.info("服务号通知发送失败,当前设备告警计划未开启服务号推送: clientId={}, uuid={}", clientId, uuid);
+                return;
+            }
+
+            // 1.3 查找设备信息,获取主绑人的userId
+            DevInfo dev = devInfoService.queryByClientId(clientId);
+
+            if (dev == null) {
+                log.info("服务号通知发送失败,当前设备不存在: clientId={}", clientId);
+                return;
+            }
+
+            Long userId = dev.getUserId();
+            String devName = dev.getDevName();
+            String devId = dev.getClientId();
+
+
+            // 1.4 如果userId为空,则没有绑定人
+            if (userId == null) {
+                log.info("服务号通知发送失败,设备未绑定用户: clientId={}", clientId);
+                return;
+            }
+
+            // 1.5 查询主绑人信息
+            UserInfo user = userService.queryById(userId);
+
+            // 1.6 获取用户微信服务号OpenId
+            WxRelation wxRelations = wxRelationService.queryOneByUnionId(user.getUnionId());
+
+            if (wxRelations == null) {
+                log.info("服务号通知发送失败,用户未绑定微信服务号: userId={}", userId);
+                return;
+            }
+            String fwhOpenId = wxRelations.getFwhOpenId();
+
+            log.info("mqttutil--当前useropenid=" + user.getOpenid() + ", fwhopenId=" + fwhOpenId);
+            log.info("发送微信公众号信息:devName=" + devName + ", phoneNo=" + user.getPhone() + "fwhOpenId=" + fwhOpenId);
+
+            // 1.7 发送微信公告号消息
+            wxOfficeAccountClient.sendMsg(devId, devName, user.getPhone(), fwhOpenId, "设备运行检测到告警异常,请前往小程序查看");
+            log.info("发送微信公众号消息发完了");
+            return;
+        }
+    }
+}

+ 32 - 0
portal-service-infrastructure/src/main/java/com/hfln/portal/infrastructure/mqtt/handlers/BaseMqttHandler.java

@@ -0,0 +1,32 @@
+package com.hfln.portal.infrastructure.mqtt.handlers;
+
+
+import com.hfln.portal.infrastructure.po.DevInfo;
+import com.hfln.portal.infrastructure.service.DevInfoService;
+import lombok.extern.slf4j.Slf4j;
+
+// 提取公共方法到基类或工具类
+@Slf4j
+public abstract class BaseMqttHandler {
+
+    protected DevInfoService devInfoService;
+
+    protected BaseMqttHandler(DevInfoService devInfoService) {
+        this.devInfoService = devInfoService;
+    }
+
+    protected String extractDeviceId(String topic) {
+        String[] parts = topic.split("/");
+        return parts[parts.length - 2];
+    }
+
+    protected DevInfo getDeviceByTopic(String topic) {
+        String clientId = extractDeviceId(topic);
+        DevInfo dev = devInfoService.queryByClientId(clientId);
+        if (dev == null) {
+            log.warn("Device not found for clientId: {}", clientId);
+            return null;
+        }
+        return dev;
+    }
+}

+ 50 - 0
portal-service-infrastructure/src/main/java/com/hfln/portal/infrastructure/mqtt/handlers/ConnectHandler.java

@@ -0,0 +1,50 @@
+package com.hfln.portal.infrastructure.mqtt.handlers;
+
+import cn.hfln.framework.redis.util.RedisUtil;
+import com.alibaba.fastjson2.JSONObject;
+import com.hfln.portal.common.constant.redis.RedisCacheConstant;
+import com.hfln.portal.infrastructure.service.DevInfoService;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+public class ConnectHandler extends BaseMqttHandler implements MqttMessageHandler {
+
+    private RedisUtil redisUtil;
+
+    private ConnectHandler(RedisUtil redisUtil,
+                           DevInfoService devInfoService) {
+        super(devInfoService);
+    }
+
+    @Override
+    public boolean supports(String action) {
+        return "connect".equals(action);
+    }
+
+    @Override
+    public void handle(String topic, String payload) {
+
+        if (StringUtils.isEmpty(payload)) {
+            return;
+        }
+
+        JSONObject obj = JSONObject.parseObject(payload);
+        String userId = obj.getString("userId");
+        String deviceType = obj.getString("deviceType");
+        String msgType = obj.getString("msgType");
+
+        if (StringUtils.isBlank(userId) || StringUtils.isBlank(deviceType) || StringUtils.isBlank(msgType)) {
+            log.error("userId or deviceType or msgType is null");
+            return;
+        }
+
+        if (msgType.equals("connect")) {
+            redisUtil.sAdd(RedisCacheConstant.MQTT_CLIENT_USERID, deviceType + "_" + userId);
+        } else if (msgType.equals("disconnect")) {
+            redisUtil.sRemove(RedisCacheConstant.MQTT_CLIENT_USERID, deviceType + "_" + userId);
+        }
+    }
+}

+ 259 - 0
portal-service-infrastructure/src/main/java/com/hfln/portal/infrastructure/mqtt/handlers/FallingEventChangeHandler.java

@@ -0,0 +1,259 @@
+package com.hfln.portal.infrastructure.mqtt.handlers;
+
+import cn.hfln.framework.redis.util.RedisUtil;
+import com.alibaba.fastjson2.JSONObject;
+import com.hfln.portal.common.constant.mqtt.topic.TopicConstants;
+import com.hfln.portal.common.constant.redis.RedisCacheConstant;
+import com.hfln.portal.common.dto.data.user.SendMsgUserDto;
+import com.hfln.portal.domain.customer.util.MsgClient;
+import com.hfln.portal.domain.customer.util.WxOfficeAccountClient;
+import com.hfln.portal.infrastructure.mqtt.MqttClient;
+import com.hfln.portal.infrastructure.po.*;
+import com.hfln.portal.infrastructure.service.*;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.springframework.stereotype.Component;
+import org.springframework.util.CollectionUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+
+@Slf4j
+@Component
+public class FallingEventChangeHandler extends BaseMqttHandler implements MqttMessageHandler {
+
+    private final EventListService eventListService;
+    private final UserService userService;
+    private final WxRelationService wxRelationService;
+    private final DevShareService devShareService;
+    private final MqttClient mqttClient;
+    private final TblTenantService tblTenantService;
+    private final AdminUserService adminUserService;
+    private final RedisUtil redisUtil;
+    private final MsgClient msgClient;
+    private final WxOfficeAccountClient wxOfficeAccountClient;
+
+    private FallingEventChangeHandler(DevInfoService devInfoService,
+                                      EventListService eventListService,
+                                      WxRelationService wxRelationService,
+                                      DevShareService devShareService,
+                                      MqttClient mqttClient,
+                                      UserService userService,
+                                      TblTenantService tblTenantService,
+                                      AdminUserService adminUserService,
+                                      RedisUtil redisUtil,
+                                      MsgClient msgClient,
+                                      WxOfficeAccountClient wxOfficeAccountClient) {
+        super(devInfoService);
+        this.eventListService = eventListService;
+        this.wxRelationService = wxRelationService;
+        this.devShareService = devShareService;
+        this.mqttClient = mqttClient;
+        this.userService = userService;
+        this.tblTenantService = tblTenantService;
+        this.adminUserService = adminUserService;
+        this.redisUtil = redisUtil;
+        this.msgClient = msgClient;
+        this.wxOfficeAccountClient = wxOfficeAccountClient;
+    }
+
+    @Override
+    public boolean supports(String action) {
+        return "falling_event_change".equals(action);
+    }
+
+    @Override
+    public void handle(String topic, String payload) {
+
+        String clientId = extractDeviceId(topic);
+        DevInfo dev = devInfoService.queryByClientId(clientId);
+        if (dev == null) {
+            log.warn("Device not found for clientId: {}", clientId);
+            return;
+        }
+
+        JSONObject obj = JSONObject.parseObject(payload);
+        int falling = obj.getIntValue("falling");
+        //0:no fall 1:detected 2: confirmed 3:calling
+
+        // 2 有跌倒事件发生
+        if (falling == 2) {
+
+            //存储跌倒时间到数据库
+            EventList eventListVO = new EventList();
+            eventListVO.setDevId(dev.getDevId());
+            eventListVO.setIsHandle(0);
+            eventListVO.setEventType(falling);
+            eventListVO.setTenantId(dev.getTenantId());
+            eventListService.save(eventListVO);
+
+            // 整理需要接收消息的人员列表
+            log.info("mqttutil--有跌倒事件");
+            List<SendMsgUserDto> sendList = new ArrayList<>();
+            String devName = dev.getDevName();
+            Long userId = dev.getUserId();
+
+            // 需要发送提示的 有 当前设备拥有者, 被分享者, 以及 对当前设备 具有管理权限的 web管理用户
+            //  小程序拥有者
+            if (userId != null) {
+                UserInfo userInfo = userService.queryById(userId);
+                SendMsgUserDto msgUserDto = new SendMsgUserDto();
+                msgUserDto.setUserId(userId);
+                msgUserDto.setPhone(userInfo.getPhone());
+                msgUserDto.setUnionId(userInfo.getUnionId());
+                msgUserDto.setMessageFlag(true);
+                msgUserDto.setServiceNumberFlag(true);
+                WxRelation wxRelation = wxRelationService.queryOneByUnionId(userInfo.getUnionId());
+                if (wxRelation != null) {
+                    msgUserDto.setFwhOpenId(wxRelation.getFwhOpenId());
+                }
+                sendList.add(msgUserDto);
+
+                // 对于 小程序拥有者,给出全局跌倒提醒,-- 被分享者是否需要
+                JSONObject wxMsg = new JSONObject();
+                wxMsg.put("clientId", clientId);
+                wxMsg.put("event", "fall_confirmed");
+                wxMsg.put("msgType", "fall");
+                wxMsg.put("devName", devName.toString());
+                wxMsg.put("eventListId", String.valueOf(eventListVO.getEventListId()));
+
+                log.info("发送微信跌倒主题消息:topic:{}, msg:{}", String.format(TopicConstants.TOPIC_MPS_NOTIC, RedisCacheConstant.WX_USER_PRE + userInfo.getUserId()), wxMsg.toString());
+                mqttClient.sendMessage(String.format(TopicConstants.TOPIC_MPS_NOTIC, RedisCacheConstant.WX_USER_PRE + userInfo.getUserId()), wxMsg.toString(), 2, false);
+
+                //  设备的被分享者 - 根据标志位筛选并添加到发送列表
+                List<DevShare> shares = devShareService.queryConfirmedByDevId(dev.getDevId());
+                if (!CollectionUtils.isEmpty(shares)) {
+                    for (DevShare share : shares) {
+                        // 发送微信跌倒主题消息
+                        log.info("发送微信跌倒主题消息:topic:{}, msg:{}", String.format(TopicConstants.TOPIC_MPS_NOTIC, RedisCacheConstant.WX_USER_PRE + share.getSharedUserId()), wxMsg.toString());
+                        mqttClient.sendMessage(String.format(TopicConstants.TOPIC_MPS_NOTIC, RedisCacheConstant.WX_USER_PRE + share.getSharedUserId()), wxMsg.toString(), 2, false);
+
+                        // 创建被分享者的消息用户DTO
+                        SendMsgUserDto msgUserShared = new SendMsgUserDto();
+                        msgUserShared.setUserId(share.getSharedUserId());
+                        msgUserShared.setPhone(share.getSharedPhone());
+
+                        // 获取完整的分享信息(包含标志位)
+                        DevShare devShare = devShareService.getById(share.getShareId());
+                        if (devShare != null) {
+                            // 将Integer类型的标志位转换为boolean类型:0-授权(true),1-拒绝(false)
+                            boolean messageFlag = devShare.getMessageFlag() != null && devShare.getMessageFlag() == 0;
+                            boolean serviceNumberFlag = devShare.getServiceNumberFlag() != null && devShare.getServiceNumberFlag() == 0;
+
+                            msgUserShared.setMessageFlag(messageFlag);
+                            msgUserShared.setServiceNumberFlag(serviceNumberFlag);
+
+                            // 根据标志位决定是否添加到发送列表
+                            // 只有当短信通知或服务号通知任一被授权时,才添加到发送列表
+                            if (messageFlag || serviceNumberFlag) {
+                                // 获取被分享者的微信关系信息
+                                UserInfo sharedUserInfo = userService.queryById(share.getSharedUserId());
+                                if (sharedUserInfo != null) {
+                                    msgUserShared.setUnionId(sharedUserInfo.getUnionId());
+                                    WxRelation sharedWxRelation = wxRelationService.queryOneByUnionId(sharedUserInfo.getUnionId());
+                                    if (sharedWxRelation != null) {
+                                        msgUserShared.setFwhOpenId(sharedWxRelation.getFwhOpenId());
+                                    }
+                                    sendList.add(msgUserShared);
+                                    log.info("被分享者已添加到发送列表: userId={}, messageFlag={}, serviceNumberFlag={}",
+                                            share.getSharedUserId(), messageFlag, serviceNumberFlag);
+                                }
+                            } else {
+                                log.info("被分享者拒绝所有通知: userId={}, messageFlag={}, serviceNumberFlag={}",
+                                        share.getSharedUserId(), messageFlag, serviceNumberFlag);
+                            }
+                        }
+                    }
+                }
+            }
+
+            // 针对跌倒事件 发送 网页 主题消息提示
+            String tenantName = "";
+            // 网页告警提示 需要当前设备绑定到租户,会发送到当前租户已经登录的管理员页面
+            if (dev.getTenantId() != null) {
+                TblTenant tblTenant = tblTenantService.getById(dev.getTenantId());
+                tenantName = tblTenant.getTenantName();
+
+                JSONObject webMsg = new JSONObject();
+                webMsg.put("clientId", clientId);
+                webMsg.put("event", "fall_confirmed");
+                webMsg.put("msgType", "fall");
+                webMsg.put("devName", devName.toString());
+                webMsg.put("tenantName", tenantName);
+                webMsg.put("eventListId", String.valueOf(eventListVO.getEventListId()));
+
+                // 查询当前需要发送的userId
+                List<AdminUserInfo> adminUserInfos = adminUserService.queryByTenantIdAndUserType(dev.getTenantId(), null);
+                if (!CollectionUtils.isEmpty(adminUserInfos)) {
+                    for (AdminUserInfo adminUserInfo : adminUserInfos) {
+
+                        // 判断当前用户是否登录网页
+                        if (redisUtil.sIsMember(RedisCacheConstant.MQTT_CLIENT_USERID, RedisCacheConstant.WEB_USER_PRE + adminUserInfo.getUserId())) {
+                            log.info("发送网页跌倒主题消息:topic:{}, msg:{}", String.format(TopicConstants.TOPIC_MPS_NOTIC, RedisCacheConstant.WEB_USER_PRE + adminUserInfo.getUserId()), webMsg.toString());
+                            mqttClient.sendMessage(String.format(TopicConstants.TOPIC_MPS_NOTIC, RedisCacheConstant.WEB_USER_PRE + adminUserInfo.getUserId()), webMsg.toString(), 2, false);
+                        }
+                    }
+                }
+            }
+            //3.调用 发送短信和微信公众号通知 功能
+            sendMesAndWxService(sendList, devName, clientId);
+        }
+    }
+
+    /**
+     * 发送短信和微信公众号通知
+     *
+     * @param sendList 需要发送通知的用户列表
+     * @param devName  设备名称
+     * @param devId    设备ID
+     */
+    private void sendMesAndWxService(List<SendMsgUserDto> sendList, String devName, String devId) {
+        if (CollectionUtils.isEmpty(sendList)) {
+            log.info("没有需要发送通知的用户");
+            return;
+        }
+
+        for (SendMsgUserDto sendDto : sendList) {
+            try {
+                log.info("mqttutil--开始处理用户通知: userId={}, messageFlag={}, serviceNumberFlag={}",
+                        sendDto.getUserId(), sendDto.isMessageFlag(), sendDto.isServiceNumberFlag());
+
+                // 根据短信标志位决定是否发送短信
+                if (sendDto.isMessageFlag()) {
+                    try {
+                        log.info("mqttutil--开始发送跌倒短信: phone={}, dev_name={}", sendDto.getPhone(), devName);
+                        msgClient.sendNotifyMsg(sendDto.getPhone(), devName);
+                        log.info("mqttUtil--短信发送完成");
+                    } catch (Exception e) {
+                        log.error("发送短信失败: userId={}, phone={}, error={}",
+                                sendDto.getUserId(), sendDto.getPhone(), e.getMessage(), e);
+                    }
+                } else {
+                    log.info("用户拒绝短信通知: userId={}", sendDto.getUserId());
+                }
+
+                // 根据服务号标志位决定是否发送微信公众号消息
+                if (sendDto.isServiceNumberFlag() && StringUtils.isNotBlank(sendDto.getFwhOpenId())) {
+                    try {
+                        log.info("发送微信公众号信息:devName={}, phoneNo={}, fwhOpenId={}",
+                                devName, sendDto.getPhone(), sendDto.getFwhOpenId());
+                        // 发送微信公众号消息
+                        wxOfficeAccountClient.sendMsg(devId, devName, sendDto.getPhone(),
+                                sendDto.getFwhOpenId(), "设备检测到跌倒,请前往小程序查看详细信息");
+                        log.info("微信公众号消息发送完成");
+                    } catch (Exception e) {
+                        log.error("发送微信公众号消息失败: userId={}, fwhOpenId={}, error={}",
+                                sendDto.getUserId(), sendDto.getFwhOpenId(), e.getMessage(), e);
+                    }
+                } else {
+                    log.info("用户拒绝服务号通知或未绑定微信: userId={}, serviceNumberFlag={}, fwhOpenId={}",
+                            sendDto.getUserId(), sendDto.isServiceNumberFlag(), sendDto.getFwhOpenId());
+                }
+
+            } catch (Exception e) {
+                log.error("处理用户通知异常: userId={}, error={}",
+                        sendDto.getUserId(), e.getMessage(), e);
+            }
+        }
+    }
+}

+ 48 - 0
portal-service-infrastructure/src/main/java/com/hfln/portal/infrastructure/mqtt/handlers/LastWillHandler.java

@@ -0,0 +1,48 @@
+package com.hfln.portal.infrastructure.mqtt.handlers;
+
+import com.alibaba.fastjson2.JSONObject;
+import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
+import com.hfln.portal.infrastructure.po.DevInfo;
+import com.hfln.portal.infrastructure.service.DevInfoService;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+import java.time.LocalDateTime;
+
+@Slf4j
+@Component
+public class LastWillHandler extends BaseMqttHandler implements MqttMessageHandler {
+
+    private LastWillHandler(DevInfoService devInfoService) {
+        super(devInfoService);
+    }
+
+    @Override
+    public boolean supports(String action) {
+        return "last_will".equals(action);
+    }
+
+    @Override
+    public void handle(String topic, String payload) {
+
+        String clientId = extractDeviceId(topic);
+        DevInfo dev = devInfoService.queryByClientId(clientId);
+        if (dev == null) {
+            log.warn("Device not found for clientId: {}", clientId);
+            return;
+        }
+
+        JSONObject obj = JSONObject.parseObject(payload);
+        if (obj.get("cmd_offline") == null || obj.getIntValue("cmd_offline") != 1) {
+            log.warn("cmd_offline is null or not 1");
+            return;
+        }
+
+        devInfoService.update(
+                new LambdaUpdateWrapper<DevInfo>()
+                        .eq(DevInfo::getClientId, clientId)
+                        .set(DevInfo::getOnline, DevInfo.Constants.OnlineStatus.OFFLINE)
+                        .set(DevInfo::getOnoffTime, LocalDateTime.now())
+        );
+    }
+}

+ 43 - 0
portal-service-infrastructure/src/main/java/com/hfln/portal/infrastructure/mqtt/handlers/LoginHandler.java

@@ -0,0 +1,43 @@
+package com.hfln.portal.infrastructure.mqtt.handlers;
+
+import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
+import com.hfln.portal.infrastructure.po.DevInfo;
+import com.hfln.portal.infrastructure.service.DevInfoService;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+import java.time.LocalDateTime;
+
+@Slf4j
+@Component
+public class LoginHandler extends BaseMqttHandler implements MqttMessageHandler {
+
+    private LoginHandler(DevInfoService devInfoService) {
+        super(devInfoService);
+    }
+
+    @Override
+    public boolean supports(String action) {
+        return "login".equals(action);
+    }
+
+    @Override
+    public void handle(String topic, String payload) {
+
+        String clientId = extractDeviceId(topic);
+        DevInfo dev = devInfoService.queryByClientId(clientId);
+        if (dev == null) {
+            log.warn("Device not found for clientId: {}", clientId);
+            return;
+        }
+
+        devInfoService.update(
+                new LambdaUpdateWrapper<DevInfo>()
+                        .eq(DevInfo::getClientId, clientId)
+                        .set(DevInfo::getOnline, DevInfo.Constants.OnlineStatus.ONLINE)
+                        .set(DevInfo::getExistFlag, DevInfo.Constants.ExistFlag.EXIST)
+                        .set(DevInfo::getOnoffTime, LocalDateTime.now())
+                        .set(DevInfo::getPresenceChangeTime, LocalDateTime.now())
+        );
+    }
+}

+ 8 - 0
portal-service-infrastructure/src/main/java/com/hfln/portal/infrastructure/mqtt/handlers/MqttMessageHandler.java

@@ -0,0 +1,8 @@
+package com.hfln.portal.infrastructure.mqtt.handlers;
+
+public interface MqttMessageHandler {
+
+    boolean supports(String action);
+
+    void handle(String topic, String payload);
+}

+ 72 - 0
portal-service-infrastructure/src/main/java/com/hfln/portal/infrastructure/mqtt/handlers/PresenceChangeHandler.java

@@ -0,0 +1,72 @@
+package com.hfln.portal.infrastructure.mqtt.handlers;
+
+import com.alibaba.fastjson2.JSONObject;
+import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
+import com.hfln.portal.infrastructure.po.DevInfo;
+import com.hfln.portal.infrastructure.po.PersonInOutInfo;
+import com.hfln.portal.infrastructure.service.DevInfoService;
+import com.hfln.portal.infrastructure.service.PersonInOutService;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+
+@Slf4j
+@Component
+public class PresenceChangeHandler extends BaseMqttHandler implements MqttMessageHandler {
+
+    private final PersonInOutService personInOutService;
+    private PresenceChangeHandler(DevInfoService devInfoService,
+                                  PersonInOutService personInOutService) {
+        super(devInfoService);
+        this.personInOutService = personInOutService;
+    }
+
+    @Override
+    public boolean supports(String action) {
+        return "presence_change".equals(action);
+    }
+
+    @Override
+    public void handle(String topic, String payload) {
+
+        String clientId = extractDeviceId(topic);
+        DevInfo dev = devInfoService.queryByClientId(clientId);
+        if (dev == null) {
+            log.warn("Device not found for clientId: {}", clientId);
+            return;
+        }
+        JSONObject obj = JSONObject.parseObject(payload);
+        if (obj.get("presence") == null) {
+            log.warn("presence is null");
+            return;
+        }
+        //0:no presense 1:presence
+        long presence = obj.getIntValue("presence");
+
+        devInfoService.update(
+                new LambdaUpdateWrapper<DevInfo>()
+                        .eq(DevInfo::getClientId, clientId)
+                        .set(DevInfo::getExistFlag, presence == 1 ? DevInfo.Constants.ExistFlag.EXIST : DevInfo.Constants.ExistFlag.NOT_EXIST)
+                        .set(DevInfo::getPresenceChangeTime, LocalDateTime.now())
+        );
+
+
+        PersonInOutInfo personInOut = personInOutService.queryOneByDevId(dev.getDevId(), DateTimeFormatter.ofPattern("yyyyMMdd").format(LocalDate.now()));
+        if (presence == 1) {
+            if (personInOut == null) {
+                personInOut = new PersonInOutInfo();
+                personInOut.setTenantId(dev.getTenantId());
+                personInOut.setDevId(dev.getDevId());
+                personInOut.setCountDate(DateTimeFormatter.ofPattern("yyyyMMdd").format(LocalDate.now()));
+                personInOut.setCount(1);
+                personInOutService.save(personInOut);
+            } else {
+                personInOut.setCount(personInOut.getCount() + 1);
+                personInOutService.updateById(personInOut);
+            }
+        }
+    }
+}

+ 45 - 0
portal-service-infrastructure/src/main/java/com/hfln/portal/infrastructure/mqtt/handlers/ReportDeviceParamHandler.java

@@ -0,0 +1,45 @@
+package com.hfln.portal.infrastructure.mqtt.handlers;
+
+import com.alibaba.fastjson2.JSONObject;
+import com.hfln.portal.infrastructure.po.DevInfo;
+import com.hfln.portal.infrastructure.service.DevInfoService;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+public class ReportDeviceParamHandler extends BaseMqttHandler implements MqttMessageHandler {
+
+    private ReportDeviceParamHandler(DevInfoService devInfoService) {
+        super(devInfoService);
+    }
+
+    @Override
+    public boolean supports(String action) {
+        return "report_device_param".equals(action);
+    }
+
+    @Override
+    public void handle(String topic, String payload) {
+
+        String clientId = extractDeviceId(topic);
+        DevInfo dev = devInfoService.queryByClientId(clientId);
+        if (dev == null) {
+            log.warn("Device not found for clientId: {}", clientId);
+            return;
+        }
+
+        JSONObject obj = JSONObject.parseObject(payload);
+        String deviceInfoStr = obj.getString("device_info");
+        JSONObject deviceInfo = JSONObject.parseObject(deviceInfoStr);
+
+        String hardware = deviceInfo.getString("firmware");
+        dev.setHardware(hardware);
+        int indicatorLed = obj.getIntValue("indicator_led");
+        dev.setStatusLight(indicatorLed == 0 ? 1 : 0);
+        int isCeiling = obj.getIntValue("isCeiling");
+        dev.setMountPlain(isCeiling == 1 ? DevInfo.Constants.MountPlain.CEILING : DevInfo.Constants.MountPlain.WALL);
+
+        devInfoService.updateById(dev);
+    }
+}