Files
dify_market_manager_gui/src/main/utils/WebSocketClient.js

235 lines
6.4 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import WebSocket from 'ws';
import logger from './logger'
import { getStoreValue } from '../store.js'
import { Notification } from "electron";
class WebSocketClient {
constructor(options = {}) {
this.reconnectInterval = options.reconnectInterval || 5000; // 重连间隔时间
this.lockReconnect = false;
this.ws = null;
this.pingTimeout = null;
this.reconnectAttempts = 0;
this.messageHandlers = new Map();
this.isConnected = false;
this.options = options;
// 延迟创建连接,避免在应用启动时立即连接
setTimeout(() => {
this.createConnect();
}, 1000);
}
createConnect() {
try {
const apiUrl = getStoreValue("apiUrl");
const userInfo = getStoreValue("userInfo");
const token = getStoreValue("token");
// 检查必要的连接信息是否存在
if (!apiUrl || !userInfo || !token) {
logger.info("WebSocket连接信息不完整等待重试");
logger.info("apiUrl:", apiUrl);
logger.info("userInfo:", userInfo);
logger.info("token:", token);
this.scheduleReconnect();
return;
}
// 处理基础URL
let baseUrl = apiUrl.replace("https://", "wss://").replace("http://", "ws://");
// 移除末尾的斜杠(如果有)
baseUrl = baseUrl.replace(/\/$/, '');
// 构建完整的WebSocket URL
this.url = `${baseUrl}/bqw-ai/websocket/${userInfo.id}`;
// this.url = `${baseUrl}/bqw-ai/${userInfo.id}`;
logger.info("==================== WebSocketClient create WebSocketClient");
logger.info("WebSocket URL: " + this.url);
// 设置WebSocket选项
this.options = {
headers: {
'User-Agent': 'WebSocketClient',
'Connection': 'Upgrade',
'Upgrade': 'websocket',
'Sec-WebSocket-Version': '13'
},
handshakeTimeout: 10000,
perMessageDeflate: false,
followRedirects: true,
rejectUnauthorized: false
};
logger.info("Token:", token); // 添加token日志
// 创建连接
this.connect();
} catch (error) {
logger.error("创建WebSocket连接时发生错误:", error);
this.scheduleReconnect();
}
}
scheduleReconnect() {
if (this.lockReconnect) return;
this.lockReconnect = true;
logger.info("==================== WebSocketClient schedule reconnect");
setTimeout(() => {
this.lockReconnect = false;
this.createConnect(); // 重新获取本地缓存并创建连接
}, this.reconnectInterval);
}
connect() {
logger.info("==================== WebSocketClient begin connect");
if (this.lockReconnect) return;
try {
const token = getStoreValue("token");
// 直接在构造函数中传递协议
this.ws = new WebSocket(this.url, token, this.options);
this.ws.on('open', () => {
logger.info('WebSocket 连接成功');
logger.info('WebSocket protocol:', this.ws.protocol);
this.isConnected = true;
this.reconnectAttempts = 0;
this.lockReconnect = false;
this.startHeartbeat();
this.emit('open');
});
this.ws.on('message', (data) => {
try {
const messageStr = data.toString();
const message = JSON.parse(messageStr);
// 减少心跳消息的日志打印
if (message.cmd === 'heartcheck') {
// 心跳消息不打印日志,或者只在调试模式下打印
if (process.env.NODE_ENV === 'development') {
logger.debug('收到心跳消息');
}
} else {
logger.info('收到消息:', messageStr);
}
this.handleMessage(message);
} catch (error) {
logger.error('消息解析错误:', error);
}
});
this.ws.on('close', (code, reason) => {
logger.info(`WebSocket连接已关闭代码: ${code}, 原因: ${reason}`);
this.isConnected = false;
this.stopHeartbeat();
this.emit('close', { code, reason });
this.scheduleReconnect();
});
this.ws.on('error', (error) => {
logger.error('WebSocket error:', error);
this.emit('error', error);
this.scheduleReconnect();
});
this.ws.on('unexpected-response', (request, response) => {
logger.info(`收到响应: ${response.statusCode} ${response.statusMessage}`);
logger.info('响应头:', response.headers);
if (response.statusCode === 101) {
logger.info('WebSocket升级成功');
return;
}
this.scheduleReconnect();
});
} catch (error) {
logger.error('WebSocket connect error:', error);
this.scheduleReconnect();
}
}
startHeartbeat() {
this.pingTimeout = setInterval(() => {
if (this.isConnected) {
this.ws.send("heartcheck");
}
}, 30000);
}
stopHeartbeat() {
if (this.pingTimeout) {
clearInterval(this.pingTimeout);
this.pingTimeout = null;
}
}
send(message) {
if (!this.isConnected) {
console.error('WebSocket未连接');
return false;
}
try {
const data = typeof message === 'string' ? message : JSON.stringify(message);
this.ws.send(data);
return true;
} catch (error) {
console.error('发送消息失败:', error);
return false;
}
}
on(event, handler) {
if (!this.messageHandlers.has(event)) {
this.messageHandlers.set(event, new Set());
}
this.messageHandlers.get(event).add(handler);
}
off(event, handler) {
if (this.messageHandlers.has(event)) {
this.messageHandlers.get(event).delete(handler);
}
}
emit(event, data) {
if (this.messageHandlers.has(event)) {
this.messageHandlers.get(event).forEach(handler => {
try {
handler(data);
} catch (error) {
console.error(`事件处理器错误 (${event}):`, error);
}
});
}
}
handleMessage(message) {
if (message.cmd==="Notification") {
logger.info("Notification:", message)
const msg=JSON.parse(message.msgTxt)
// systemMsgType
const isNotification = getStoreValue("Notification_"+msg.systemMsgType);
if (isNotification!=0){
new Notification({ title: msg.title, body: msg.content }).show()
}
}
this.emit('message', message);
}
close() {
if (this.ws) {
this.ws.close();
}
}
}
export default WebSocketClient;