globalMqtt.js 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246
  1. import mqtt from './mqtt.js';
  2. class MqttService {
  3. constructor() {
  4. if (!MqttService.instance) {
  5. this.cmdClient = null;
  6. this.dataClient = null;
  7. this.cmdConnected = false;
  8. this.dataConnected = false;
  9. this.connections = new Map(); // 内部管理多连接
  10. MqttService.instance = this;
  11. }
  12. return MqttService.instance;
  13. }
  14. // ---------------- 连接 CMD ----------------
  15. connectCmd(userId) {
  16. if (this.cmdConnected && this.cmdClient) {
  17. console.log("CMD MQTT 已连接,复用现有实例");
  18. return Promise.resolve(this.cmdClient);
  19. }
  20. const brokerName = 'CMD';
  21. const url = 'wxs://cmd.radar-power.cn/mqtt/';
  22. const clientId = `xcx_mqtt_cmd1_${userId}_${Math.random().toString(16).substring(2, 8)}`;
  23. return this.connectToBroker(brokerName, { url, clientId, username: 'lnradar', password: 'lnradar' })
  24. .then(client => {
  25. this.cmdClient = client;
  26. this.cmdConnected = true;
  27. // 默认订阅 CMD 主题
  28. const topic = `/mps/wx_${userId}/notice`;
  29. this.subscribe('CMD', topic, (msg) => console.log('CMD 消息:', msg));
  30. uni.$emit('mqtt-ready', this.cmdClient);
  31. return client;
  32. });
  33. }
  34. // ---------------- 连接 DATA ----------------
  35. connectData(userId) {
  36. if (this.dataConnected && this.dataClient) {
  37. console.log("DATA MQTT 已连接,复用现有实例");
  38. return Promise.resolve(this.dataClient);
  39. }
  40. uni.showToast({ title: "平台连接中...", icon: "loading", duration: 1000 });
  41. const brokerName = 'DATA';
  42. let url = ""
  43. url = "wxs://data.radar-power.cn/mqtt/";
  44. // if (__wxConfig.envVersion == 'develop') {
  45. // url = "wxs://data.radar-power.asia:8084/mqtt/";
  46. // }
  47. // if (__wxConfig.envVersion == 'trial') {
  48. // url = "wxs://data.radar-power.asia:8084/mqtt/";
  49. // }
  50. // if (__wxConfig.envVersion == 'release') {
  51. // url = "wxs://data.radar-power.cn/mqtt/";
  52. // }
  53. const clientId = `xcx_mqtt_data1_${userId}_${Date.now()}`;
  54. return this.connectToBroker(brokerName, { url, clientId, username: 'lnradar', password: 'lnradar' })
  55. .then(client => {
  56. this.dataClient = client;
  57. this.dataConnected = true;
  58. uni.$emit('mqttData-ready', this.dataClient);
  59. return client;
  60. });
  61. }
  62. // ---------------- 内部通用连接方法 ----------------
  63. connectToBroker(brokerName, config) {
  64. return new Promise((resolve, reject) => {
  65. if (this.connections.has(brokerName)) {
  66. const existingConn = this.connections.get(brokerName);
  67. if (existingConn.connected) {
  68. resolve(existingConn.client);
  69. return;
  70. }
  71. }
  72. const client = mqtt.connect(config.url, {
  73. clientId: config.clientId,
  74. username: config.username,
  75. password: config.password,
  76. reconnectPeriod: config.reconnectPeriod || 500,
  77. wsOptions: {
  78. WebSocket: url => wx.connectSocket({
  79. url,
  80. header: { 'content-type': 'application/json' },
  81. protocols: ['mqtt'],
  82. })
  83. },
  84. });
  85. const connection = {
  86. client,
  87. connected: false,
  88. subscriptions: new Map(), // topic => [callbacks]
  89. };
  90. this.connections.set(brokerName, connection);
  91. // ---------------- 连接成功 ----------------
  92. client.on('connect', () => {
  93. console.log(`${brokerName} MQTT 连接成功`);
  94. connection.connected = true;
  95. // 重连时恢复所有订阅
  96. connection.subscriptions.forEach((callbacks, topic) => {
  97. connection.client.subscribe(topic, (err) => {
  98. if (err) console.error(`重连订阅失败: ${topic}`, err);
  99. else console.log(`✅ 重连订阅成功: ${topic}`);
  100. });
  101. });
  102. resolve(client);
  103. });
  104. // ---------------- 连接错误 ----------------
  105. client.on('error', (err) => {
  106. uni.showModal({
  107. title: '提示',
  108. content: '连接平台失败,请重新登录',
  109. showCancel: false,
  110. success: (res) => {
  111. if (res.confirm) {
  112. uni.clearStorageSync();
  113. uni.reLaunch({ url: "/pagesA/loginNew/loginNew" });
  114. }
  115. }
  116. });
  117. console.error(`${brokerName} MQTT 连接错误:`, err);
  118. connection.connected = false;
  119. reject(err);
  120. });
  121. // ---------------- 连接断开 ----------------
  122. client.on('close', () => {
  123. uni.showToast({ title: `平台已断开`, icon: "none", duration: 1500 });
  124. console.log(`${brokerName} MQTT 断开`);
  125. connection.connected = false;
  126. if (brokerName === 'DATA') this.dataConnected = false;
  127. if (brokerName === 'CMD') this.cmdConnected = false;
  128. });
  129. // ---------------- 接收消息 ----------------
  130. client.on('message', (topic, message) => {
  131. const callbacks = connection.subscriptions.get(topic) || [];
  132. callbacks.forEach(cb => {
  133. try { cb(message.toString(), topic, brokerName); }
  134. catch (err) { console.error('Message callback error:', err); }
  135. });
  136. });
  137. // ---------------- 连接超时 ----------------
  138. setTimeout(() => {
  139. if (!connection.connected) reject(new Error(`${brokerName} 连接超时`));
  140. }, 10000);
  141. });
  142. }
  143. // ---------------- 发布消息 ----------------
  144. publish(clientType, topic, message) {
  145. const client = clientType === 'CMD' ? this.cmdClient : this.dataClient;
  146. if (client) client.publish(topic, message);
  147. else console.warn(`${clientType} MQTT 未连接,无法 publish`);
  148. }
  149. // ---------------- 订阅 ----------------
  150. subscribe(clientType, topic, callback, onSubscribeDone) {
  151. const connection = clientType === 'CMD' ? this.connections.get('CMD') : this.connections.get('DATA');
  152. if (!connection || !connection.connected) {
  153. console.warn(`${clientType} MQTT 未连接,无法 subscribe`);
  154. if (onSubscribeDone) onSubscribeDone(new Error('未连接'));
  155. return;
  156. }
  157. // 保存回调
  158. if (!connection.subscriptions.has(topic)) connection.subscriptions.set(topic, []);
  159. const callbacks = connection.subscriptions.get(topic);
  160. if (!callbacks.includes(callback)) callbacks.push(callback);
  161. // 执行订阅
  162. connection.client.subscribe(topic, (err) => {
  163. if (err) console.error(`Subscribe error on ${clientType}:`, err);
  164. if (onSubscribeDone) onSubscribeDone(err);
  165. });
  166. // 返回取消订阅函数
  167. return () => this.unsubscribe(clientType, topic, callback);
  168. }
  169. // ---------------- 取消订阅 ----------------
  170. unsubscribe(clientType, topic, callback) {
  171. const connection = clientType === 'CMD' ? this.connections.get('CMD') : this.connections.get('DATA');
  172. if (!connection) return;
  173. const callbacks = connection.subscriptions.get(topic);
  174. if (callbacks) {
  175. const index = callbacks.indexOf(callback);
  176. if (index > -1) callbacks.splice(index, 1);
  177. if (callbacks.length === 0) {
  178. connection.subscriptions.delete(topic);
  179. connection.client.unsubscribe(topic);
  180. }
  181. }
  182. }
  183. // ---------------- 手动恢复订阅 ----------------
  184. resubscribeAll(brokerName) {
  185. const connection = this.connections.get(brokerName);
  186. if (!connection || !connection.connected) return;
  187. connection.subscriptions.forEach((callbacks, topic) => {
  188. connection.client.subscribe(topic, (err) => {
  189. if (err) console.error(`手动恢复订阅失败: ${topic}`, err);
  190. else console.log(`✅ 手动恢复订阅成功: ${topic}`);
  191. });
  192. });
  193. }
  194. // ---------------- 断开连接 ----------------
  195. disconnectAll() {
  196. if (this.cmdClient) { this.cmdClient.end(true); this.cmdClient = null; this.cmdConnected = false; }
  197. if (this.dataClient) { this.dataClient.end(true); this.dataClient = null; this.dataConnected = false; }
  198. this.connections.forEach(conn => conn.client.end(true));
  199. this.connections.clear();
  200. console.log('所有 MQTT 连接已关闭');
  201. }
  202. disconnectData() {
  203. if (this.dataClient) {
  204. this.dataClient.end(true);
  205. this.dataClient = null;
  206. this.dataConnected = false;
  207. this.connections.delete("DATA");
  208. console.log("DATA MQTT 连接已关闭");
  209. }
  210. }
  211. }
  212. // 单例导出
  213. const instance = new MqttService();
  214. export default instance;