# 和风联能设备接入服务 (Device Access Service) ## 项目简介 和风联能设备接入服务(DAS)是一个基于Java Spring Boot开发的物联网设备接入平台,采用领域驱动设计(DDD)架构,主要用于管理和处理雷达设备的接入、数据处理和命令下发等功能。该服务通过MQTT协议与设备进行通信,支持设备登录、保活、状态上报、跌倒事件处理等功能。 ## 项目架构 本项目基于领域驱动设计(DDD)架构,分为以下几层: - **应用层** (device-service-application): 协调和编排领域对象,以完成用户的特定用例 - **领域层** (device-service-domain): 包含业务逻辑和规则的核心层 - **基础设施层** (device-service-infrastructure): 提供技术实现,如数据库访问、消息队列等 - **公共层** (device-service-common): 包含共享的通用代码 ### 架构依赖关系图 ```mermaid graph TD; subgraph "应用层 Application" A["device-service-application"] A1["DeviceCommandServiceImpl"] A2["DeviceEventService"] A --包含--> A1 A --包含--> A2 end subgraph "领域层 Domain" B["device-service-domain"] B1["实体 Entity"] B11["Device"] B2["端口 Port"] B21["DeviceCommandService"] B22["DeviceEventPort"] B3["网关接口 Gateway"] B31["DeviceGateway"] B32["MqttGateway"] B4["事件 Event"] B41["MqttHandler"] B --包含--> B1 B --包含--> B2 B --包含--> B3 B --包含--> B4 B1 --包含--> B11 B2 --包含--> B21 B2 --包含--> B22 B3 --包含--> B31 B3 --包含--> B32 B4 --包含--> B41 end subgraph "基础设施层 Infrastructure" C["device-service-infrastructure"] C1["网关实现 Gateway Impl"] C11["DeviceGatewayImpl"] C12["MqttGatewayImpl"] C2["MQTT订阅处理 Subscriber"] C21["DeviceMessageSubscriber"] C22["DasMessageSubscriber"] C23["OpcMessageSubscriber"] C24["AppMessageSubscriber"] C3["数据访问 Repository"] C31["DevInfoService"] C --包含--> C1 C --包含--> C2 C --包含--> C3 C1 --包含--> C11 C1 --包含--> C12 C2 --包含--> C21 C2 --包含--> C22 C2 --包含--> C23 C2 --包含--> C24 C3 --包含--> C31 end subgraph "公共层 Common" D["device-service-common"] D1["常量 Constants"] D11["MqttTopics"] D2["数据传输对象 DTO"] D3["异常处理 Exception"] D31["BizException"] D4["请求对象 Request"] D --包含--> D1 D --包含--> D2 D --包含--> D3 D --包含--> D4 D1 --包含--> D11 D3 --包含--> D31 end subgraph "Python版本" P["das"] P1["mqtt_recv.py"] P2["mqtt_send.py"] P3["dev_mng.py"] P --包含--> P1 P --包含--> P2 P --包含--> P3 end %% 依赖关系 A --> B C --> B C --> D A --> D B --> D %% 实现关系 A1 -.实现.-> B21 A2 -.实现.-> B22 C11 -.实现.-> B31 C12 -.实现.-> B32 %% 调用关系 C21 -.调用.-> A2 C22 -.调用.-> A2 C23 -.调用.-> A1 C24 -.调用.-> A1 A1 -.调用.-> B41 A1 -.调用.-> B31 C11 -.调用.-> C31 %% 参考关系 P1 -.参考.-> C21 P1 -.参考.-> C22 P1 -.参考.-> C23 P1 -.参考.-> C24 P2 -.参考.-> B41 P3 -.参考.-> C11 ``` ### 核心组件说明 - **设备实体 (Device)**: 领域层的核心实体,表示设备的状态和行为 - **端口接口**: DeviceCommandService和DeviceEventPort定义了系统与外部交互的契约 - **网关接口**: DeviceGateway和MqttGateway定义了系统与数据库和消息队列交互的方式 - **MQTT处理器**: 负责处理不同类型的MQTT消息(设备消息、DAS消息、OPC消息和App消息) - **网关实现**: 实现了领域层定义的网关接口,负责具体的数据访问和消息发送 ### 依赖关系 按照DDD的设计原则,各层之间的依赖关系为: - 应用层依赖领域层和公共层 - 基础设施层依赖应用层、领域层和公共层 - 领域层依赖公共层 - 基础设施层的MQTT订阅处理器调用应用层服务 - 应用层服务调用领域层网关和事件处理 - 领域层网关实现由基础设施层提供 这种依赖关系确保了领域层作为核心业务逻辑不依赖外部实现,遵循了依赖倒置原则。 ### MQTT消息处理流程 1. 基础设施层的MQTT订阅处理器接收MQTT消息 2. 订阅处理器调用应用层服务方法处理业务逻辑 3. 应用层服务协调领域对象和网关 4. 领域层网关通过基础设施层实现类访问数据库或其他资源 这种设计避免了循环依赖,使架构更加清晰和可维护。 ### 与Python版本的参考关系 - Python版本的mqtt_recv.py对应Java版本的MQTT订阅处理器 - Python版本的mqtt_send.py对应Java版本的MqttHandler - Python版本的dev_mng.py对应Java版本的DeviceGatewayImpl ## 项目目录结构 ``` hfln-device-service/ ├── device-service-application/ # 应用服务模块 ├── device-service-client-starter/ # 客户端启动器模块 ├── device-service-common/ # 公共模块 ├── device-service-domain/ # 领域模块 ├── device-service-infrastructure/ # 基础设施模块 ├── device-service-server/ # 服务启动模块 ├── das/ # 原Python版DAS代码参考 ├── .gitignore # Git忽略文件 ├── Dockerfile # Docker构建文件 ├── Jenkinsfile # Jenkins CI/CD配置 ├── pom.xml # Maven项目配置 └── README.md # 项目说明文档 ``` ## MQTT通信 ### MQTT配置 MQTT服务器配置信息位于`application.yml`文件中: ```yaml mqtt: server: uri: tcp://8.130.28.21:1883 client: id: device-service- username: admin password: public connect: timeout: 30 keep: alive: interval: 60 clean: session: true ``` ### MQTT主题 系统使用以下主题格式: - 设备上行消息: `/dev/{deviceId}/{messageType}` - 应用下行消息: `/app/{messageType}` 主要消息类型包括: - 设备登录: `/dev/{deviceId}/report_device_info` - 设备保活: `/dev/{deviceId}/keepalive` - 设备状态上报: `/dev/{deviceId}/report_status` - 设备跌倒事件上报: `/dev/{deviceId}/report_fall` - 设备参数设置响应: `/dev/{deviceId}/set_param_response` - 应用绑定设备: `/app/bind_device` - 应用解绑设备: `/app/unbind_device` - 应用设置设备参数: `/app/set_device_param` - 应用确认跌倒事件: `/app/fall_event/ack` ## 消息订阅处理 系统使用`@MqttSubscriber`注解标记消息处理方法,通过`MqttSubscriberProcessor`自动扫描并注册消息处理器。 主要的消息订阅处理器包括: - `DeviceMessageSubscriber`: 处理设备上行消息 - `AppMessageSubscriber`: 处理应用下行消息 ## 测试 ### 单元测试 系统包含以下单元测试: - `DeviceMessageSubscriberTest`: 测试设备消息订阅处理器 - `AppMessageSubscriberTest`: 测试应用消息订阅处理器 - `MqttSubscriberProcessorTest`: 测试MQTT订阅处理器核心类 ### 集成测试 系统包含以下集成测试: - `MqttConnectionTest`: 测试与MQTT服务器的基本连接 - `MqttConnectionWithAuthTest`: 测试带认证的MQTT连接 - `MqttSubscriberIntegrationTest`: 测试MQTT消息订阅处理流程 ### 运行测试 使用Maven运行测试: ```bash # 运行所有测试 mvn test # 运行特定测试 mvn test -Dtest="com.hfln.device.infrastructure.mqtt.MqttConnectionTest" mvn test -Dtest="com.hfln.device.infrastructure.mqtt.MqttSubscriberIntegrationTest" ``` ## 部署 系统打包为可执行JAR文件,可通过以下命令构建: ```bash mvn clean package ``` 运行服务: ```bash java -jar device-service-server/target/device-service-server-1.0.0-SNAPSHOT.jar ```