yangliu b9a9c77f17 feat:项目git仓库迁移 4 luni în urmă
..
src b9a9c77f17 feat:项目git仓库迁移 4 luni în urmă
README.md b9a9c77f17 feat:项目git仓库迁移 4 luni în urmă
pom.xml b9a9c77f17 feat:项目git仓库迁移 4 luni în urmă

README.md

EMQX Spring Boot Starter

简介

EMQX Spring Boot Starter 是一个基于 Spring Boot 的 EMQX 客户端集成工具,提供注解驱动的 MQTT 消息监听、发布以及自定义消息处理等功能,帮助开发者快速集成 EMQX 消息中间件。通过该工具,开发者可以轻松实现 MQTT 消息的订阅、发布和处理,提升开发效率。

功能特性

  • 注解驱动的 MQTT 消息监听:通过 @MqttListener 注解,快速实现消息监听。
  • 注解驱动的 MQTT 消息发布:通过 MqttPublisher 类,方便地发布消息。
  • 灵活的消息处理机制:支持自定义消息处理器,满足复杂业务需求。
  • 支持异步处理:消息处理支持异步执行,提高系统响应能力。
  • 支持消息重试:内置消息重试机制,确保消息可靠传递。
  • 支持消息处理链:通过消息处理链,实现消息的链式处理。

快速开始

依赖配置

在项目的 pom.xml 中添加以下依赖:

<dependency>
    <groupId>cn.hfln.framework</groupId>
    <artifactId>emqx-spring-boot-starter</artifactId>
    <version>1.0.0-SNAPSHOT</version>
</dependency>

EMQX 连接配置

在 application.yml 中配置 EMQX 连接信息:

emqx:
  server-uris: tcp://localhost:1883
  client-id: your-client-id
  username: your-username
  password: your-password
  connection-timeout: 30
  keep-alive-interval: 60

使用指南

1. 使用 @MqttListener 注解监听消息

在需要监听 MQTT 消息的方法上添加 @MqttListener 注解,指定要订阅的主题。

示例代码:

import cn.hfln.emqx.annotation.MqttListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class MyMqttListener {

    @MqttListener(topic = "my/topic")
    public void onMessage(String message) {
        log.info("Received message: {}", message);
    }
}

接收通配符消息

使用 @MqttListener 注解时,可以通过指定通配符主题来接收匹配的消息。例如,使用 + 表示单层通配符,使用 # 表示多层通配符。

示例代码:

import cn.hfln.emqx.annotation.MqttListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class WildcardMqttListener {

    @MqttListener(topic = "my/+/topic")
    public void onSingleLevelWildcard(String message) {
        log.info("Received message from single level wildcard: {}", message);
    }

    @MqttListener(topic = "my/#")
    public void onMultiLevelWildcard(String message) {
        log.info("Received message from multi level wildcard: {}", message);
    }
}

2. 使用 MqttPublisher 发布消息

通过注入 MqttPublisher 实例,调用其方法发布消息。

示例代码:

import cn.hfln.emqx.publisher.MqttPublisher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class MyMqttService {

    @Autowired
    private MqttPublisher publisher;

    public void sendMessage(String topic, String message) {
        publisher.send(topic, message);
    }

    public void sendRetainedMessage(String topic, String message) {
        publisher.sendRetained(topic, message);
    }
}

3. 自定义消息处理

通过实现 AbstractMessageHandler 类,可以自定义消息处理逻辑。

示例代码:

import cn.hfln.emqx.handler.AbstractMessageHandler;
import cn.hfln.emqx.context.MessageContext;
import org.springframework.stereotype.Component;

@Component
public class MyCustomHandler extends AbstractMessageHandler {

    @Override
    public boolean handle(MessageContext context) {
        // 自定义消息处理逻辑
        return true;
    }

    @Override
    public String getName() {
        return "my-custom-handler";
    }
}

4. 使用 MessageContext 处理消息

MessageContext 提供了消息处理的上下文信息,包括消息内容、主题、QoS 等。通过 MessageContext,可以更灵活地处理消息。

示例代码:

import cn.hfln.emqx.annotation.MqttListener;
import cn.hfln.emqx.context.MessageContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class MessageContextListener {

    @MqttListener(topic = "my/topic")
    public void onMessage(MessageContext context) {
        String topic = context.getTopic();
        String message = new String(context.getMessage().getPayload());
        int qos = context.getMessage().getQos();
        log.info("Received message from topic: {}, message: {}, QoS: {}", topic, message, qos);
    }
}

通配符主题使用说明

MQTT 支持两种通配符:

  • + (单层通配符):匹配任意一个层级
  • # (多层通配符):匹配任意多个层级

通配符使用规则

  1. + 必须占据一个完整的层级,不能出现在层级中间

    • my/+/topic - 匹配 my/1/topic, my/2/topic
    • my/t+pic - 不合法
  2. # 必须是主题的最后一个字符

    • my/# - 匹配 my/1, my/1/2, my/1/2/3
    • my/#/topic - 不合法
  3. 通配符不能出现在主题的开头

    • +/my/topic - 不合法
    • #/my/topic - 不合法

示例代码

@Slf4j
@Component
public class WildcardMqttListener {

    // 匹配 my/1/topic, my/2/topic 等
    @MqttListener(topic = "my/+/topic")
    public void onSingleLevelWildcard(String message) {
        log.info("Received message from single level wildcard: {}", message);
    }

    // 匹配 my/1, my/1/2, my/1/2/3 等
    @MqttListener(topic = "my/#")
    public void onMultiLevelWildcard(String message) {
        log.info("Received message from multi level wildcard: {}", message);
    }

    // 匹配 device/+/status, device/+/data 等
    @MqttListener(topic = "device/+/status")
    public void onDeviceStatus(String message) {
        log.info("Received device status: {}", message);
    }
}

注意事项

  1. 通配符主题的订阅会消耗更多资源,建议谨慎使用
  2. 多层通配符 # 会匹配所有子主题,可能导致接收到不需要的消息
  3. 建议在发布消息时使用具体的主题,避免使用通配符
  4. 在使用通配符时,建议在日志中记录完整的主题信息,方便调试

示例代码

更多示例代码请参考 `