globalMqtt.js 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. import mqtt from './mqtt.js';
  2. class MqttService {
  3. constructor() {
  4. if (!MqttService.instance) {
  5. this.cmdClient = null;
  6. this.dataClient = null;
  7. this.cmdConnected = false;
  8. this.dataConnected = false;
  9. this.connections = new Map(); // 内部管理多连接
  10. MqttService.instance = this;
  11. }
  12. return MqttService.instance;
  13. }
  14. // 连接 CMD
  15. connectCmd(userId) {
  16. if (this.cmdConnected && this.cmdClient) {
  17. console.log("CMD MQTT 已连接,复用现有实例");
  18. return Promise.resolve(this.cmdClient);
  19. }
  20. const brokerName = 'CMD';
  21. const url = 'wxs://cmd.radar-power.cn/mqtt/';
  22. const clientId = `xcx_mqtt_cmd1_${userId}_${Math.random().toString(16).substring(2, 8)}`;
  23. return this.connectToBroker(brokerName, { url, clientId, username: 'lnradar', password: 'lnradar' })
  24. .then(client => {
  25. this.cmdClient = client;
  26. this.cmdConnected = true;
  27. // 默认订阅 CMD 主题
  28. const topic = `/mps/wx_${userId}/notice`;
  29. this.subscribe('CMD', topic, (msg) => {
  30. console.log('CMD 消息:', msg);
  31. });
  32. // 打印订阅成功
  33. console.log(`✅ CMD MQTT 主题已订阅成功: ${topic}`);
  34. uni.$emit('mqtt-ready', this.cmdClient);
  35. return client;
  36. });
  37. }
  38. // 连接 DATA
  39. connectData(userId) {
  40. uni.showToast({
  41. title: "平台连接中...",
  42. icon: "loading",
  43. duration: 1000, //持续的时间
  44. });
  45. if (this.dataConnected && this.dataClient) {
  46. console.log("DATA MQTT 已连接,复用现有实例");
  47. return Promise.resolve(this.dataClient);
  48. }
  49. const brokerName = 'DATA';
  50. let url = ""
  51. // if (__wxConfig.envVersion == 'develop') {
  52. // url = 'wxs://data.radar-power.asia:8084/mqtt/'
  53. // }
  54. // if (__wxConfig.envVersion == 'trial') {
  55. // url = 'wxs://data.radar-power.cn/mqtt/';
  56. // }
  57. // if (__wxConfig.envVersion == 'release') {
  58. // url = 'wxs://data.radar-power.cn/mqtt/';
  59. // }
  60. url = 'wxs://data.radar-power.cn/mqtt/';
  61. const clientId = `xcx_mqtt_data1_${userId}_${Date.now()}`;
  62. return this.connectToBroker(brokerName, { url, clientId, username: 'lnradar', password: 'lnradar' })
  63. .then(client => {
  64. this.dataClient = client;
  65. this.dataConnected = true;
  66. uni.$emit('mqttData-ready', this.dataClient);
  67. return client;
  68. });
  69. }
  70. // 内部通用连接方法(多连接管理)
  71. connectToBroker(brokerName, config) {
  72. return new Promise((resolve, reject) => {
  73. if (this.connections.has(brokerName)) {
  74. const existingConn = this.connections.get(brokerName);
  75. if (existingConn.connected) {
  76. resolve(existingConn.client);
  77. return;
  78. }
  79. }
  80. const client = mqtt.connect(config.url, {
  81. clientId: config.clientId,
  82. username: config.username,
  83. password: config.password,
  84. reconnectPeriod: config.reconnectPeriod || 5000,
  85. wsOptions: {
  86. WebSocket: url => wx.connectSocket({
  87. url,
  88. header: { 'content-type': 'application/json' },
  89. protocols: ['mqtt'],
  90. })
  91. },
  92. });
  93. const connection = {
  94. client,
  95. connected: false,
  96. subscriptions: new Map(),
  97. };
  98. this.connections.set(brokerName, connection);
  99. client.on('connect', () => {
  100. console.log(`${brokerName} MQTT 连接成功`);
  101. connection.connected = true;
  102. resolve(client);
  103. });
  104. client.on('error', (err) => {
  105. uni.showModal({
  106. title: '提示',
  107. content: '连接平台失败,请重新登录',
  108. showCancel: false,
  109. success: (res) => {
  110. if (res.confirm) {
  111. uni.clearStorageSync();
  112. uni.reLaunch({
  113. url: "/pagesA/loginNew/loginNew"
  114. })
  115. }
  116. if (res.cancel) {
  117. }
  118. }
  119. })
  120. console.error(`${brokerName} MQTT 连接错误:`, err);
  121. connection.connected = false;
  122. reject(err);
  123. });
  124. client.on('close', () => {
  125. uni.showToast({
  126. title: `平台已经断开`,
  127. icon: "none",
  128. duration: 1500,
  129. });
  130. console.log(`${brokerName} MQTT 断开`);
  131. connection.connected = false;
  132. });
  133. client.on('message', (topic, message) => {
  134. const callbacks = connection.subscriptions.get(topic) || [];
  135. callbacks.forEach(cb => {
  136. try { cb(message.toString(), topic, brokerName); }
  137. catch (err) { console.error('Message callback error:', err); }
  138. });
  139. });
  140. // 连接超时
  141. setTimeout(() => {
  142. if (!connection.connected) reject(new Error(`${brokerName} 连接超时`));
  143. }, 10000);
  144. });
  145. }
  146. // 发布消息
  147. publish(clientType, topic, message) {
  148. const client = clientType === 'CMD' ? this.cmdClient : this.dataClient;
  149. if (client) client.publish(topic, message);
  150. else console.warn(`${clientType} MQTT 未连接,无法 publish`);
  151. }
  152. // 订阅
  153. subscribe(clientType, topic, callback, onSubscribeDone) {
  154. const connection = clientType === 'CMD' ? this.connections.get('CMD') : this.connections.get('DATA');
  155. if (!connection || !connection.connected) {
  156. console.warn(`${clientType} MQTT 未连接,无法 subscribe`);
  157. if (onSubscribeDone) onSubscribeDone(new Error('未连接'));
  158. return;
  159. }
  160. // 保存回调
  161. if (!connection.subscriptions.has(topic)) connection.subscriptions.set(topic, []);
  162. const callbacks = connection.subscriptions.get(topic);
  163. if (!callbacks.includes(callback)) callbacks.push(callback);
  164. // 执行订阅
  165. connection.client.subscribe(topic, (err) => {
  166. if (err) {
  167. console.error(`Subscribe error on ${clientType}:`, err);
  168. }
  169. // 订阅完成回调
  170. if (onSubscribeDone) onSubscribeDone(err);
  171. });
  172. // 返回取消订阅函数
  173. return () => this.unsubscribe(clientType, topic, callback);
  174. }
  175. // 取消订阅
  176. unsubscribe(clientType, topic, callback) {
  177. const connection = clientType === 'CMD' ? this.connections.get('CMD') : this.connections.get('DATA');
  178. if (!connection) return;
  179. const callbacks = connection.subscriptions.get(topic);
  180. if (callbacks) {
  181. const index = callbacks.indexOf(callback);
  182. if (index > -1) callbacks.splice(index, 1);
  183. if (callbacks.length === 0) {
  184. connection.subscriptions.delete(topic);
  185. connection.client.unsubscribe(topic);
  186. }
  187. }
  188. }
  189. // 断开所有连接
  190. disconnectAll() {
  191. if (this.cmdClient) { this.cmdClient.end(true); this.cmdClient = null; this.cmdConnected = false; }
  192. if (this.dataClient) { this.dataClient.end(true); this.dataClient = null; this.dataConnected = false; }
  193. this.connections.forEach(conn => conn.client.end());
  194. this.connections.clear();
  195. console.log('所有 MQTT 连接已关闭');
  196. }
  197. disconnectData() {
  198. if (this.dataClient) {
  199. this.dataClient.end(true);
  200. this.dataClient = null;
  201. this.dataConnected = false;
  202. this.connections.delete("DATA");
  203. console.log("DATA MQTT 连接已关闭");
  204. }
  205. }
  206. }
  207. // 单例导出
  208. const instance = new MqttService();
  209. export default instance;