import log from "../log/Logger"; import ByteBuffer from '../codec/ByteBuffer'; import { LoginPack } from '../pack/LoginPack'; import WebToolkit from '../common/WebToolkit'; import { w3cwebsocket, IMessageEvent, ICloseEvent } from 'websocket'; import { RequestBase } from '../model/RequestBase'; import { RequestParams } from '../model/RequestParams'; import HttpApi from './HttpApi'; import Beans from '../common/utils'; import { reverse } from 'dns'; import { MessageCommand, FriendShipCommand, GroupCommand, SystemCommand, UserEventCommand, ConversationEventCommand } from '../common/Command'; import { MessagePack } from '../pack/MessagePack'; import { MessageContent } from '../model/dto/MessageContent'; const loginTimeout = 10 * 1000 // 10 seconds const heartbeatInterval = 10 * 1000 // seconds let firstMonitorSocket: boolean = false;// 第一次监听socket export enum State { INIT, CONNECTING, CONNECTED, RECONNECTING, CLOSEING, CLOSED, } enum TimeUnit { Second = 1000, Millisecond = 1, } export let sleep = async (second: number, Unit: TimeUnit = TimeUnit.Second): Promise => { return new Promise((resolve, _) => { setTimeout(() => { resolve() }, second * Unit) }) } export interface IListener { onLogin(userId: string): void; // 登录成功事件 onSocketConnectEvent(url: string, data: any): void; // socket 连接事件 onSocketErrorEvent(e: any): void;// 异常回调 onSocketReConnectEvent(): void;// 重连事件 onSocketReConnectSuccessEvent(): void;// 重连事件 onSocketCloseEvent(): void;//连接关闭事件 onP2PMessage(e: any): void;//收到单聊消息事件 onTestMessage(e: any): void;//收到消息事件 测试用 onOfflineMessage(data):void; // 拉取到离线消息事件 } export class ImClient { url: string = "" userId!: string version: number = 1 clientType: number = 1 imei!: string; listeners: IListener | any = null; appId!: number userSign!: string; imeiLength?: number state = State.INIT lastOfflineMessageSequence: number = 0; offlineMessageList: Array = new Array() httpUrl: string = ""//http://127.0.0.1:8000/v1 private conn?: w3cwebsocket constructor() { } public getRequestBase(): RequestBase { return new RequestBase(this.appId, this.clientType, this.imei); } public isInit(): boolean { return this.state == State.CONNECTED; } public getRequestParams(): RequestParams { return new RequestParams(this.appId, this.userId, this.userSign); } public async init(httpUrl: string, appId: number, userId: string, userSign: string, listeners: any, callback: (sdk: ImClient) => void) { var self = this; self.httpUrl = httpUrl self.appId = appId; self.listeners = listeners self.imei = WebToolkit.getDeviceInfo().system; self.imeiLength = getLen(self.imei); self.userId = userId; this.userSign = userSign this.imeiLength = self.imeiLength; if (Beans.isEmpty(this.url)) { log.info("获取im地址") let api = new HttpApi(this.httpUrl); let resp = await api.call("/user/login", {}, { clientType: this.clientType, appId: this.appId, userId: this.userId }) // console.log(resp) if (resp.isFailed()) { log.info("登录失败:获取im地址失败") return; } let ip = resp.data.ip; let port = resp.data.port; var protocolStr = document.location.protocol; if(protocolStr == "http:") { this.url = "ws://" + ip + ":" + port + "/ws"; } else if(protocolStr == "https:") { this.url = "wss://" + ip + "/hs_ws"; } } var req = new LoginPack(self.appId, self.userId, 1); let { success, err, conn } = await limLogin(self.url, req, self); if (success) { if (!firstMonitorSocket) { firstMonitorSocket = true; } //登录成功以后操作 conn.onerror = (error) => { log.info("websocket error: ", error) // 加入socket 连接事件 if (typeof imClient.listeners.onSocketErrorEvent === 'function') { imClient.listeners.onSocketErrorEvent(error); } //异步方法,自动重连 this.errorHandler(error, req) } conn.onclose = (e: ICloseEvent) => { log.info("event[onclose] fired") if (self.state == State.CLOSEING) { this.onclose("logout") return } // socket断开事件 连接事件 if (typeof imClient.listeners.onSocketCloseEvent === 'function') { imClient.listeners.onSocketCloseEvent(); } // 异步方法 自动重连 this.errorHandler(new Error(e.reason), req) } conn.onmessage = (evt) => { var bytebuf = new ByteBuffer(evt.data); let byteBuffer = bytebuf.int32().int32().unpack(); let command = byteBuffer[0];//解析command let bodyLen = byteBuffer[1];//解析bodylen let unpack = bytebuf.vstring(null, bodyLen).unpack();//解析出字符串 let msgBody = unpack[2]; console.log("sdk收到服务端数据 command:" + command) console.log("sdk收到服务端数据:" + msgBody) if (command === MessageCommand.MSG_P2P) { //单聊消息收发 if (typeof imClient.listeners.onP2PMessage === 'function') { imClient.listeners.onP2PMessage(msgBody); } } else { if (typeof imClient.listeners.onTestMessage === 'function') { imClient.listeners.onTestMessage(msgBody); } } } this.conn = conn; this.state = State.CONNECTED //拉取离线消息 this.loadOfflineMessage(); //心跳包 this.heartbeatLoop(this.conn); if (typeof imClient.listeners.onLogin === 'function') { imClient.listeners.onLogin(this.userId); } callback(self); } else { log.error(err?.message) } } public buildMessagePack(command: number, messagePack: any) { var jsonData = JSON.stringify(messagePack); // console.log("=============buildMessagePack=========") // log.info(jsonData); let bodyLen = getLen(jsonData); let pack = new ByteBuffer(null, 0); pack.int32(command).int32(this.version).int32(this.clientType) .int32(0x0) .int32(this.appId) .int32(this.imeiLength) .int32(bodyLen) .vstring(this.imei, this.imeiLength) .vstring(jsonData, bodyLen); return pack; } // 4. 自动重连 private async errorHandler(error: Error, req: LoginPack) { // 如果是主动断开连接,就没有必要自动重连 // 比如收到被踢,或者主动调用logout()方法 if (this.state == State.CLOSED || this.state == State.CLOSEING) { return } this.state = State.RECONNECTING if (typeof imClient.listeners.onSocketReConnectEvent === 'function') { imClient.listeners.onSocketReConnectEvent(); } // 重连10次 for (let index = 0; index < 10; index++) { await sleep(3) try { log.info("try to relogin") // let { success, err } = await this.login() let { success, err, conn } = await limLogin(this.url, req, this); if (success) { if (typeof imClient.listeners.onSocketReConnectSuccessEvent === 'function') { imClient.listeners.onSocketReConnectSuccessEvent(); } return } log.info(err) } catch (error) { log.info(error) } } this.onclose("reconnect timeout") } // 表示连接中止 private onclose(reason: string) { if (this.state == State.CLOSED) { return } this.state = State.CLOSED log.info("connection closed due to " + reason) this.conn = undefined this.userId = "" // 加入socket 关闭事件 if (typeof imClient.listeners.onSocketErrorEvent === 'function') { imClient.listeners.onSocketCloseEvent(); } } public getSingleUserInfo(uid: string): Promise { return new Promise((resolve, _) => { let api = new HttpApi(this.httpUrl); let resp = api.call("/user/data/getSingleUserInfo", this.getRequestParams(), { userId: uid }) resolve(resp); }) } public async syncGetUserInfo(userId: string[]) { let api = new HttpApi(this.httpUrl); let resp = api.call("/user/data/getUserInfo", this.getRequestParams(), { userIds: userId }) return resp; } public getUserInfo(userId: string[]): Promise { return new Promise((resolve, _) => { let api = new HttpApi(this.httpUrl); let resp = api.call("/user/data/getUserInfo", this.getRequestParams(), { userIds: userId }) resolve(resp); }) } public getAllFriend(): Promise { return new Promise((resolve, _) => { let api = new HttpApi(this.httpUrl); let resp = api.call("/friendship/getAllFriendShip", this.getRequestParams(), { fromId: this.userId }) resolve(resp); }) } public getUserSequence(): Promise { return new Promise((resolve, _) => { let api = new HttpApi(this.httpUrl); let resp = api.call("/user/getUserSequence", this.getRequestParams(), { userId: this.userId }) resolve(resp); }) } public getUserConversationList(lastSequence:Long): Promise { return new Promise((resolve, _) => { let api = new HttpApi(this.httpUrl); let resp = api.call("/conversation/syncConversationList", this.getRequestParams(), { userId: this.userId,appId:this.appId,maxLimit:100, lastSequence:lastSequence,operater:this.userId }) resolve(resp); }) } public createRoom(room:any): Promise { return new Promise((resolve,_)=>{ let api = new HttpApi(this.httpUrl); let resp = api.call("/room/createRoom", this.getRequestParams(), room) return resp; }) } public joinRoom(room:any): Promise { return new Promise((resolve,_)=>{ let api = new HttpApi(this.httpUrl); let resp = api.call("/room/joinRoom", this.getRequestParams(), room) return resp; }) } public getRoomInfo(roomId:any): Promise { return new Promise((resolve,_)=>{ let api = new HttpApi(this.httpUrl); let resp = api.call("/room/getRoomInfo", this.getRequestParams(), { roomId: roomId }) return resp; }) } // 2、心跳 private heartbeatLoop(conn) { let start = Date.now() let loop = () => { if (this.state != State.CONNECTED) { log.error("heartbeatLoop exited") return } if (Date.now() - start >= heartbeatInterval) { // log.info(`>>> send ping ;`) start = Date.now() let pingPack = imClient.buildMessagePack(SystemCommand.PING, {}); conn.send(pingPack.pack(false)); } setTimeout(loop, 500) } setTimeout(loop, 500) } //构建单聊消息对象 public createP2PTextMessage(to: string, text: string) { let messagePack = new MessagePack(this.appId); messagePack.buildTextMessagePack(this.userId, to, text); return messagePack; } public sendP2PMessage(pack: MessagePack) { let p2pPack = imClient.buildMessagePack(MessageCommand.MSG_P2P, pack); if (this.conn) { this.conn.send(p2pPack.pack(false)); } } // 消息接收ack public sendP2PMessageReciveAck (rAck: any) { console.log("=============== sendP2PMessageReciveAck") console.log(rAck) rAck.toId = rAck.fromId; rAck.fromId = this.userId; console.log(rAck) let p2pPack_1107 = imClient.buildMessagePack(MessageCommand.MSG_RECIVE_ACK, rAck); rAck.conversationType=0 let p2pPack_1106 = imClient.buildMessagePack(MessageCommand.MSG_READED, rAck); let p2pPack_1054 = imClient.buildMessagePack(MessageCommand.MSG_READED_RECEIPT, rAck); if (this.conn) { // console.log("=============== sendP2PMessageReciveAck 正式开始消息接收ack") this.conn.send(p2pPack_1107.pack(false)); this.conn.send(p2pPack_1106.pack(false)); this.conn.send(p2pPack_1054.pack(false)); } } // 消息已读回执,给原消息发送方 1054 public sendP2PMessageReadedReceipt (rAck: any) { console.log("=============== sendP2PMessageReadedRECEIPT") console.log(rAck) rAck.toId = rAck.fromId; rAck.fromId = this.userId; console.log(rAck) let p2pPack = imClient.buildMessagePack(MessageCommand.MSG_READED_RECEIPT, rAck); if (this.conn) { console.log("=============== sendP2PMessageReadedRECEIPT 正式开始发送已读回执") this.conn.send(p2pPack.pack(false)); } } public getUserId() { return this.userId; } //拉取离线信息记录 private async loadOfflineMessage() { log.info("loadOfflineMessage start") let api = new HttpApi(this.httpUrl); let resp = await api.call("/message/syncOfflineMessage",this.getRequestParams(),{clientType : this.clientType,operater:this.userId, appId : this.appId,lastSequence:this.lastOfflineMessageSequence,maxLimit:100}) if(resp.isSucceed()){ this.lastOfflineMessageSequence = resp.data.maxSequence; let offmessages = resp.data.dataList; this.offlineMessageList.push(offmessages) if(offmessages.length > 0 && typeof imClient.listeners.onOfflineMessage === 'function'){ imClient.listeners.onOfflineMessage(offmessages); } console.log(resp.data.completed) if(!resp.data.completed){ this.loadOfflineMessage(); } }else{ log.error("loadOfflineMessage - error") } } } export let limLogin = async (url: string, req: LoginPack, imClient: ImClient): Promise<{ success: boolean, err?: Error, conn: w3cwebsocket }> => { return new Promise((resolve, _) => { let conn = new w3cwebsocket(url) conn.binaryType = "arraybuffer" log.info("limLogin"); // 设置一个登陆超时器 let tr = setTimeout(() => { clearTimeout(tr) resolve({ success: false, err: new Error("timeout"), conn: conn }); }, loginTimeout); conn.onopen = () => { if (conn.readyState == w3cwebsocket.OPEN) { // 加入socket 连接事件 if (typeof imClient.listeners.onSocketConnectEvent === 'function') { imClient.listeners.onSocketConnectEvent(url, req); } log.info(`开启连接`); //登录数据包 var data = { "userId": req.userId } let loginPack = imClient.buildMessagePack(0x2328, data); conn.send(loginPack.pack(false)); } } conn.onerror = (error: Error) => { clearTimeout(tr) log.error(error) resolve({ success: false, err: error, conn: conn }); } conn.onmessage = (evt) => { if (typeof evt.data === 'string') { log.info("Received: '" + evt.data + "'"); return } clearTimeout(tr) var bytebuf = new ByteBuffer(evt.data); let byteBuffer = bytebuf.int32().int32().unpack(); let command = byteBuffer[0]; let bodyLen = byteBuffer[1]; if (command == 0x2329) { resolve({ success: true, conn: conn }); } } }) } export let getLen = (str) => { var len = 0; for (var i = 0; i < str.length; i++) { var c = str.charCodeAt(i); //单字节加1 if ((c >= 0x0001 && c <= 0x007e) || (0xff60 <= c && c <= 0xff9f)) { len++; } else { len += 3; } } return len; } export const imClient = new ImClient();