123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246 |
- import mqtt from './mqtt.js';
- class MqttService {
- constructor() {
- if (!MqttService.instance) {
- this.cmdClient = null;
- this.dataClient = null;
- this.cmdConnected = false;
- this.dataConnected = false;
- this.connections = new Map(); // 内部管理多连接
- MqttService.instance = this;
- }
- return MqttService.instance;
- }
- // ---------------- 连接 CMD ----------------
- connectCmd(userId) {
- if (this.cmdConnected && this.cmdClient) {
- console.log("CMD MQTT 已连接,复用现有实例");
- return Promise.resolve(this.cmdClient);
- }
- const brokerName = 'CMD';
- const url = 'wxs://cmd.radar-power.cn/mqtt/';
- const clientId = `xcx_mqtt_cmd1_${userId}_${Math.random().toString(16).substring(2, 8)}`;
- return this.connectToBroker(brokerName, { url, clientId, username: 'lnradar', password: 'lnradar' })
- .then(client => {
- this.cmdClient = client;
- this.cmdConnected = true;
- // 默认订阅 CMD 主题
- const topic = `/mps/wx_${userId}/notice`;
- this.subscribe('CMD', topic, (msg) => console.log('CMD 消息:', msg));
- uni.$emit('mqtt-ready', this.cmdClient);
- return client;
- });
- }
- // ---------------- 连接 DATA ----------------
- connectData(userId) {
- if (this.dataConnected && this.dataClient) {
- console.log("DATA MQTT 已连接,复用现有实例");
- return Promise.resolve(this.dataClient);
- }
- uni.showToast({ title: "平台连接中...", icon: "loading", duration: 1000 });
- const brokerName = 'DATA';
- let url = ""
- url = "wxs://data.radar-power.cn/mqtt/";
- // if (__wxConfig.envVersion == 'develop') {
- // url = "wxs://data.radar-power.asia:8084/mqtt/";
- // }
- // if (__wxConfig.envVersion == 'trial') {
- // url = "wxs://data.radar-power.asia:8084/mqtt/";
- // }
- // if (__wxConfig.envVersion == 'release') {
- // url = "wxs://data.radar-power.cn/mqtt/";
- // }
- const clientId = `xcx_mqtt_data1_${userId}_${Date.now()}`;
- return this.connectToBroker(brokerName, { url, clientId, username: 'lnradar', password: 'lnradar' })
- .then(client => {
- this.dataClient = client;
- this.dataConnected = true;
- uni.$emit('mqttData-ready', this.dataClient);
- return client;
- });
- }
- // ---------------- 内部通用连接方法 ----------------
- connectToBroker(brokerName, config) {
- return new Promise((resolve, reject) => {
- if (this.connections.has(brokerName)) {
- const existingConn = this.connections.get(brokerName);
- if (existingConn.connected) {
- resolve(existingConn.client);
- return;
- }
- }
- const client = mqtt.connect(config.url, {
- clientId: config.clientId,
- username: config.username,
- password: config.password,
- reconnectPeriod: config.reconnectPeriod || 500,
- wsOptions: {
- WebSocket: url => wx.connectSocket({
- url,
- header: { 'content-type': 'application/json' },
- protocols: ['mqtt'],
- })
- },
- });
- const connection = {
- client,
- connected: false,
- subscriptions: new Map(), // topic => [callbacks]
- };
- this.connections.set(brokerName, connection);
- // ---------------- 连接成功 ----------------
- client.on('connect', () => {
- console.log(`${brokerName} MQTT 连接成功`);
- connection.connected = true;
- // 重连时恢复所有订阅
- connection.subscriptions.forEach((callbacks, topic) => {
- connection.client.subscribe(topic, (err) => {
- if (err) console.error(`重连订阅失败: ${topic}`, err);
- else console.log(`✅ 重连订阅成功: ${topic}`);
- });
- });
- resolve(client);
- });
- // ---------------- 连接错误 ----------------
- client.on('error', (err) => {
- uni.showModal({
- title: '提示',
- content: '连接平台失败,请重新登录',
- showCancel: false,
- success: (res) => {
- if (res.confirm) {
- uni.clearStorageSync();
- uni.reLaunch({ url: "/pagesA/loginNew/loginNew" });
- }
- }
- });
- console.error(`${brokerName} MQTT 连接错误:`, err);
- connection.connected = false;
- reject(err);
- });
- // ---------------- 连接断开 ----------------
- client.on('close', () => {
- uni.showToast({ title: `平台已断开`, icon: "none", duration: 1500 });
- console.log(`${brokerName} MQTT 断开`);
- connection.connected = false;
- if (brokerName === 'DATA') this.dataConnected = false;
- if (brokerName === 'CMD') this.cmdConnected = false;
- });
- // ---------------- 接收消息 ----------------
- client.on('message', (topic, message) => {
- const callbacks = connection.subscriptions.get(topic) || [];
- callbacks.forEach(cb => {
- try { cb(message.toString(), topic, brokerName); }
- catch (err) { console.error('Message callback error:', err); }
- });
- });
- // ---------------- 连接超时 ----------------
- setTimeout(() => {
- if (!connection.connected) reject(new Error(`${brokerName} 连接超时`));
- }, 10000);
- });
- }
- // ---------------- 发布消息 ----------------
- publish(clientType, topic, message) {
- const client = clientType === 'CMD' ? this.cmdClient : this.dataClient;
- if (client) client.publish(topic, message);
- else console.warn(`${clientType} MQTT 未连接,无法 publish`);
- }
- // ---------------- 订阅 ----------------
- subscribe(clientType, topic, callback, onSubscribeDone) {
- const connection = clientType === 'CMD' ? this.connections.get('CMD') : this.connections.get('DATA');
- if (!connection || !connection.connected) {
- console.warn(`${clientType} MQTT 未连接,无法 subscribe`);
- if (onSubscribeDone) onSubscribeDone(new Error('未连接'));
- return;
- }
- // 保存回调
- if (!connection.subscriptions.has(topic)) connection.subscriptions.set(topic, []);
- const callbacks = connection.subscriptions.get(topic);
- if (!callbacks.includes(callback)) callbacks.push(callback);
- // 执行订阅
- connection.client.subscribe(topic, (err) => {
- if (err) console.error(`Subscribe error on ${clientType}:`, err);
- if (onSubscribeDone) onSubscribeDone(err);
- });
- // 返回取消订阅函数
- return () => this.unsubscribe(clientType, topic, callback);
- }
- // ---------------- 取消订阅 ----------------
- unsubscribe(clientType, topic, callback) {
- const connection = clientType === 'CMD' ? this.connections.get('CMD') : this.connections.get('DATA');
- if (!connection) return;
- const callbacks = connection.subscriptions.get(topic);
- if (callbacks) {
- const index = callbacks.indexOf(callback);
- if (index > -1) callbacks.splice(index, 1);
- if (callbacks.length === 0) {
- connection.subscriptions.delete(topic);
- connection.client.unsubscribe(topic);
- }
- }
- }
- // ---------------- 手动恢复订阅 ----------------
- resubscribeAll(brokerName) {
- const connection = this.connections.get(brokerName);
- if (!connection || !connection.connected) return;
- connection.subscriptions.forEach((callbacks, topic) => {
- connection.client.subscribe(topic, (err) => {
- if (err) console.error(`手动恢复订阅失败: ${topic}`, err);
- else console.log(`✅ 手动恢复订阅成功: ${topic}`);
- });
- });
- }
- // ---------------- 断开连接 ----------------
- disconnectAll() {
- if (this.cmdClient) { this.cmdClient.end(true); this.cmdClient = null; this.cmdConnected = false; }
- if (this.dataClient) { this.dataClient.end(true); this.dataClient = null; this.dataConnected = false; }
- this.connections.forEach(conn => conn.client.end(true));
- this.connections.clear();
- console.log('所有 MQTT 连接已关闭');
- }
- disconnectData() {
- if (this.dataClient) {
- this.dataClient.end(true);
- this.dataClient = null;
- this.dataConnected = false;
- this.connections.delete("DATA");
- console.log("DATA MQTT 连接已关闭");
- }
- }
- }
- // 单例导出
- const instance = new MqttService();
- export default instance;
|