|  | 4 bulan lalu | |
|---|---|---|
| .. | ||
| src | 4 bulan lalu | |
| README.md | 4 bulan lalu | |
| pom.xml | 4 bulan lalu | |
EMQX Spring Boot Starter 是一个基于 Spring Boot 的 EMQX 客户端集成工具,提供注解驱动的 MQTT 消息监听、发布以及自定义消息处理等功能,帮助开发者快速集成 EMQX 消息中间件。通过该工具,开发者可以轻松实现 MQTT 消息的订阅、发布和处理,提升开发效率。
@MqttListener 注解,快速实现消息监听。MqttPublisher 类,方便地发布消息。在项目的 pom.xml 中添加以下依赖:
<dependency>
    <groupId>cn.hfln.framework</groupId>
    <artifactId>emqx-spring-boot-starter</artifactId>
    <version>1.0.0-SNAPSHOT</version>
</dependency>
在 application.yml 中配置 EMQX 连接信息:
emqx:
  server-uris: tcp://localhost:1883
  client-id: your-client-id
  username: your-username
  password: your-password
  connection-timeout: 30
  keep-alive-interval: 60
在需要监听 MQTT 消息的方法上添加 @MqttListener 注解,指定要订阅的主题。
示例代码:
import cn.hfln.emqx.annotation.MqttListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class MyMqttListener {
    @MqttListener(topic = "my/topic")
    public void onMessage(String message) {
        log.info("Received message: {}", message);
    }
}
使用 @MqttListener 注解时,可以通过指定通配符主题来接收匹配的消息。例如,使用 + 表示单层通配符,使用 # 表示多层通配符。
示例代码:
import cn.hfln.emqx.annotation.MqttListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class WildcardMqttListener {
    @MqttListener(topic = "my/+/topic")
    public void onSingleLevelWildcard(String message) {
        log.info("Received message from single level wildcard: {}", message);
    }
    @MqttListener(topic = "my/#")
    public void onMultiLevelWildcard(String message) {
        log.info("Received message from multi level wildcard: {}", message);
    }
}
通过注入 MqttPublisher 实例,调用其方法发布消息。
示例代码:
import cn.hfln.emqx.publisher.MqttPublisher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class MyMqttService {
    @Autowired
    private MqttPublisher publisher;
    public void sendMessage(String topic, String message) {
        publisher.send(topic, message);
    }
    public void sendRetainedMessage(String topic, String message) {
        publisher.sendRetained(topic, message);
    }
}
通过实现 AbstractMessageHandler 类,可以自定义消息处理逻辑。
示例代码:
import cn.hfln.emqx.handler.AbstractMessageHandler;
import cn.hfln.emqx.context.MessageContext;
import org.springframework.stereotype.Component;
@Component
public class MyCustomHandler extends AbstractMessageHandler {
    @Override
    public boolean handle(MessageContext context) {
        // 自定义消息处理逻辑
        return true;
    }
    @Override
    public String getName() {
        return "my-custom-handler";
    }
}
MessageContext 提供了消息处理的上下文信息,包括消息内容、主题、QoS 等。通过 MessageContext,可以更灵活地处理消息。
示例代码:
import cn.hfln.emqx.annotation.MqttListener;
import cn.hfln.emqx.context.MessageContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class MessageContextListener {
    @MqttListener(topic = "my/topic")
    public void onMessage(MessageContext context) {
        String topic = context.getTopic();
        String message = new String(context.getMessage().getPayload());
        int qos = context.getMessage().getQos();
        log.info("Received message from topic: {}, message: {}, QoS: {}", topic, message, qos);
    }
}
MQTT 支持两种通配符:
+ (单层通配符):匹配任意一个层级# (多层通配符):匹配任意多个层级+ 必须占据一个完整的层级,不能出现在层级中间
my/+/topic - 匹配 my/1/topic, my/2/topic 等my/t+pic - 不合法# 必须是主题的最后一个字符
my/# - 匹配 my/1, my/1/2, my/1/2/3 等my/#/topic - 不合法通配符不能出现在主题的开头
+/my/topic - 不合法#/my/topic - 不合法@Slf4j
@Component
public class WildcardMqttListener {
    // 匹配 my/1/topic, my/2/topic 等
    @MqttListener(topic = "my/+/topic")
    public void onSingleLevelWildcard(String message) {
        log.info("Received message from single level wildcard: {}", message);
    }
    // 匹配 my/1, my/1/2, my/1/2/3 等
    @MqttListener(topic = "my/#")
    public void onMultiLevelWildcard(String message) {
        log.info("Received message from multi level wildcard: {}", message);
    }
    // 匹配 device/+/status, device/+/data 等
    @MqttListener(topic = "device/+/status")
    public void onDeviceStatus(String message) {
        log.info("Received device status: {}", message);
    }
}
# 会匹配所有子主题,可能导致接收到不需要的消息更多示例代码请参考 `