import mqtt from './mqtt.js'; class MqttService { constructor() { if (!MqttService.instance) { this.cmdClient = null; this.dataClient = null; this.cmdConnected = false; this.dataConnected = false; this.connections = new Map(); // 内部管理多连接 MqttService.instance = this; } return MqttService.instance; } // ---------------- 连接 CMD ---------------- connectCmd(userId) { if (this.cmdConnected && this.cmdClient) { console.log("CMD MQTT 已连接,复用现有实例"); return Promise.resolve(this.cmdClient); } const brokerName = 'CMD'; const url = 'wxs://cmd.radar-power.cn/mqtt/'; const clientId = `xcx_mqtt_cmd1_${userId}_${Math.random().toString(16).substring(2, 8)}`; return this.connectToBroker(brokerName, { url, clientId, username: 'lnradar', password: 'lnradar' }) .then(client => { this.cmdClient = client; this.cmdConnected = true; // 默认订阅 CMD 主题 const topic = `/mps/wx_${userId}/notice`; this.subscribe('CMD', topic, (msg) => console.log('CMD 消息:', msg)); uni.$emit('mqtt-ready', this.cmdClient); return client; }); } // ---------------- 连接 DATA ---------------- connectData(userId) { if (this.dataConnected && this.dataClient) { console.log("DATA MQTT 已连接,复用现有实例"); return Promise.resolve(this.dataClient); } uni.showToast({ title: "平台连接中...", icon: "loading", duration: 1000 }); const brokerName = 'DATA'; let url = "" url = "wxs://data.radar-power.cn/mqtt/"; // if (__wxConfig.envVersion == 'develop') { // url = "wxs://data.radar-power.asia:8084/mqtt/"; // } // if (__wxConfig.envVersion == 'trial') { // url = "wxs://data.radar-power.asia:8084/mqtt/"; // } // if (__wxConfig.envVersion == 'release') { // url = "wxs://data.radar-power.cn/mqtt/"; // } const clientId = `xcx_mqtt_data1_${userId}_${Date.now()}`; return this.connectToBroker(brokerName, { url, clientId, username: 'lnradar', password: 'lnradar' }) .then(client => { this.dataClient = client; this.dataConnected = true; uni.$emit('mqttData-ready', this.dataClient); return client; }); } // ---------------- 内部通用连接方法 ---------------- connectToBroker(brokerName, config) { return new Promise((resolve, reject) => { if (this.connections.has(brokerName)) { const existingConn = this.connections.get(brokerName); if (existingConn.connected) { resolve(existingConn.client); return; } } const client = mqtt.connect(config.url, { clientId: config.clientId, username: config.username, password: config.password, reconnectPeriod: config.reconnectPeriod || 500, wsOptions: { WebSocket: url => wx.connectSocket({ url, header: { 'content-type': 'application/json' }, protocols: ['mqtt'], }) }, }); const connection = { client, connected: false, subscriptions: new Map(), // topic => [callbacks] }; this.connections.set(brokerName, connection); // ---------------- 连接成功 ---------------- client.on('connect', () => { console.log(`${brokerName} MQTT 连接成功`); connection.connected = true; // 重连时恢复所有订阅 connection.subscriptions.forEach((callbacks, topic) => { connection.client.subscribe(topic, (err) => { if (err) console.error(`重连订阅失败: ${topic}`, err); else console.log(`✅ 重连订阅成功: ${topic}`); }); }); resolve(client); }); // ---------------- 连接错误 ---------------- client.on('error', (err) => { uni.showModal({ title: '提示', content: '连接平台失败,请重新登录', showCancel: false, success: (res) => { if (res.confirm) { uni.clearStorageSync(); uni.reLaunch({ url: "/pagesA/loginNew/loginNew" }); } } }); console.error(`${brokerName} MQTT 连接错误:`, err); connection.connected = false; reject(err); }); // ---------------- 连接断开 ---------------- client.on('close', () => { uni.showToast({ title: `平台已断开`, icon: "none", duration: 1500 }); console.log(`${brokerName} MQTT 断开`); connection.connected = false; if (brokerName === 'DATA') this.dataConnected = false; if (brokerName === 'CMD') this.cmdConnected = false; }); // ---------------- 接收消息 ---------------- client.on('message', (topic, message) => { const callbacks = connection.subscriptions.get(topic) || []; callbacks.forEach(cb => { try { cb(message.toString(), topic, brokerName); } catch (err) { console.error('Message callback error:', err); } }); }); // ---------------- 连接超时 ---------------- setTimeout(() => { if (!connection.connected) reject(new Error(`${brokerName} 连接超时`)); }, 10000); }); } // ---------------- 发布消息 ---------------- publish(clientType, topic, message) { const client = clientType === 'CMD' ? this.cmdClient : this.dataClient; if (client) client.publish(topic, message); else console.warn(`${clientType} MQTT 未连接,无法 publish`); } // ---------------- 订阅 ---------------- subscribe(clientType, topic, callback, onSubscribeDone) { const connection = clientType === 'CMD' ? this.connections.get('CMD') : this.connections.get('DATA'); if (!connection || !connection.connected) { console.warn(`${clientType} MQTT 未连接,无法 subscribe`); if (onSubscribeDone) onSubscribeDone(new Error('未连接')); return; } // 保存回调 if (!connection.subscriptions.has(topic)) connection.subscriptions.set(topic, []); const callbacks = connection.subscriptions.get(topic); if (!callbacks.includes(callback)) callbacks.push(callback); // 执行订阅 connection.client.subscribe(topic, (err) => { if (err) console.error(`Subscribe error on ${clientType}:`, err); if (onSubscribeDone) onSubscribeDone(err); }); // 返回取消订阅函数 return () => this.unsubscribe(clientType, topic, callback); } // ---------------- 取消订阅 ---------------- unsubscribe(clientType, topic, callback) { const connection = clientType === 'CMD' ? this.connections.get('CMD') : this.connections.get('DATA'); if (!connection) return; const callbacks = connection.subscriptions.get(topic); if (callbacks) { const index = callbacks.indexOf(callback); if (index > -1) callbacks.splice(index, 1); if (callbacks.length === 0) { connection.subscriptions.delete(topic); connection.client.unsubscribe(topic); } } } // ---------------- 手动恢复订阅 ---------------- resubscribeAll(brokerName) { const connection = this.connections.get(brokerName); if (!connection || !connection.connected) return; connection.subscriptions.forEach((callbacks, topic) => { connection.client.subscribe(topic, (err) => { if (err) console.error(`手动恢复订阅失败: ${topic}`, err); else console.log(`✅ 手动恢复订阅成功: ${topic}`); }); }); } // ---------------- 断开连接 ---------------- disconnectAll() { if (this.cmdClient) { this.cmdClient.end(true); this.cmdClient = null; this.cmdConnected = false; } if (this.dataClient) { this.dataClient.end(true); this.dataClient = null; this.dataConnected = false; } this.connections.forEach(conn => conn.client.end(true)); this.connections.clear(); console.log('所有 MQTT 连接已关闭'); } disconnectData() { if (this.dataClient) { this.dataClient.end(true); this.dataClient = null; this.dataConnected = false; this.connections.delete("DATA"); console.log("DATA MQTT 连接已关闭"); } } } // 单例导出 const instance = new MqttService(); export default instance;