# EMQX Spring Boot Starter ## 简介 EMQX Spring Boot Starter 是一个基于 Spring Boot 的 EMQX 客户端集成工具,提供注解驱动的 MQTT 消息监听、发布以及自定义消息处理等功能,帮助开发者快速集成 EMQX 消息中间件。通过该工具,开发者可以轻松实现 MQTT 消息的订阅、发布和处理,提升开发效率。 ## 功能特性 - **注解驱动的 MQTT 消息监听**:通过 `@MqttListener` 注解,快速实现消息监听。 - **注解驱动的 MQTT 消息发布**:通过 `MqttPublisher` 类,方便地发布消息。 - **灵活的消息处理机制**:支持自定义消息处理器,满足复杂业务需求。 - **支持异步处理**:消息处理支持异步执行,提高系统响应能力。 - **支持消息重试**:内置消息重试机制,确保消息可靠传递。 - **支持消息处理链**:通过消息处理链,实现消息的链式处理。 ## 快速开始 ### 依赖配置 在项目的 pom.xml 中添加以下依赖: ```xml cn.hfln.framework emqx-spring-boot-starter 1.0.0-SNAPSHOT ``` ### EMQX 连接配置 在 application.yml 中配置 EMQX 连接信息: ```yaml emqx: server-uris: tcp://localhost:1883 client-id: your-client-id username: your-username password: your-password connection-timeout: 30 keep-alive-interval: 60 ``` ## 使用指南 ### 1. 使用 @MqttListener 注解监听消息 在需要监听 MQTT 消息的方法上添加 `@MqttListener` 注解,指定要订阅的主题。 示例代码: ```java import cn.hfln.emqx.annotation.MqttListener; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; @Slf4j @Component public class MyMqttListener { @MqttListener(topic = "my/topic") public void onMessage(String message) { log.info("Received message: {}", message); } } ``` ### 接收通配符消息 使用 `@MqttListener` 注解时,可以通过指定通配符主题来接收匹配的消息。例如,使用 `+` 表示单层通配符,使用 `#` 表示多层通配符。 示例代码: ```java import cn.hfln.emqx.annotation.MqttListener; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; @Slf4j @Component public class WildcardMqttListener { @MqttListener(topic = "my/+/topic") public void onSingleLevelWildcard(String message) { log.info("Received message from single level wildcard: {}", message); } @MqttListener(topic = "my/#") public void onMultiLevelWildcard(String message) { log.info("Received message from multi level wildcard: {}", message); } } ``` ### 2. 使用 MqttPublisher 发布消息 通过注入 `MqttPublisher` 实例,调用其方法发布消息。 示例代码: ```java import cn.hfln.emqx.publisher.MqttPublisher; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @Service public class MyMqttService { @Autowired private MqttPublisher publisher; public void sendMessage(String topic, String message) { publisher.send(topic, message); } public void sendRetainedMessage(String topic, String message) { publisher.sendRetained(topic, message); } } ``` ### 3. 自定义消息处理 通过实现 `AbstractMessageHandler` 类,可以自定义消息处理逻辑。 示例代码: ```java import cn.hfln.emqx.handler.AbstractMessageHandler; import cn.hfln.emqx.context.MessageContext; import org.springframework.stereotype.Component; @Component public class MyCustomHandler extends AbstractMessageHandler { @Override public boolean handle(MessageContext context) { // 自定义消息处理逻辑 return true; } @Override public String getName() { return "my-custom-handler"; } } ``` ### 4. 使用 MessageContext 处理消息 `MessageContext` 提供了消息处理的上下文信息,包括消息内容、主题、QoS 等。通过 `MessageContext`,可以更灵活地处理消息。 示例代码: ```java import cn.hfln.emqx.annotation.MqttListener; import cn.hfln.emqx.context.MessageContext; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; @Slf4j @Component public class MessageContextListener { @MqttListener(topic = "my/topic") public void onMessage(MessageContext context) { String topic = context.getTopic(); String message = new String(context.getMessage().getPayload()); int qos = context.getMessage().getQos(); log.info("Received message from topic: {}, message: {}, QoS: {}", topic, message, qos); } } ``` ### 通配符主题使用说明 MQTT 支持两种通配符: - `+` (单层通配符):匹配任意一个层级 - `#` (多层通配符):匹配任意多个层级 #### 通配符使用规则 1. `+` 必须占据一个完整的层级,不能出现在层级中间 - ✅ `my/+/topic` - 匹配 `my/1/topic`, `my/2/topic` 等 - ❌ `my/t+pic` - 不合法 2. `#` 必须是主题的最后一个字符 - ✅ `my/#` - 匹配 `my/1`, `my/1/2`, `my/1/2/3` 等 - ❌ `my/#/topic` - 不合法 3. 通配符不能出现在主题的开头 - ❌ `+/my/topic` - 不合法 - ❌ `#/my/topic` - 不合法 #### 示例代码 ```java @Slf4j @Component public class WildcardMqttListener { // 匹配 my/1/topic, my/2/topic 等 @MqttListener(topic = "my/+/topic") public void onSingleLevelWildcard(String message) { log.info("Received message from single level wildcard: {}", message); } // 匹配 my/1, my/1/2, my/1/2/3 等 @MqttListener(topic = "my/#") public void onMultiLevelWildcard(String message) { log.info("Received message from multi level wildcard: {}", message); } // 匹配 device/+/status, device/+/data 等 @MqttListener(topic = "device/+/status") public void onDeviceStatus(String message) { log.info("Received device status: {}", message); } } ``` #### 注意事项 1. 通配符主题的订阅会消耗更多资源,建议谨慎使用 2. 多层通配符 `#` 会匹配所有子主题,可能导致接收到不需要的消息 3. 建议在发布消息时使用具体的主题,避免使用通配符 4. 在使用通配符时,建议在日志中记录完整的主题信息,方便调试 ## 示例代码 更多示例代码请参考 `