import { IWorkerController, WorkerMessageType, IWorkerMessage } from './worker.core'; const COMMUNICATION_TIMEOUT = 30000; /** * 通信通道 */ export class WorkerChannel { /** * Web Worker 实例 */ private worker: Worker; /** * 上层通信控制器 */ private controller: IWorkerController; /** * 会话响应器 Map */ private sessionHandlerMap: { [propsName: string]: Function; }; public constructor(worker: Worker, controller: IWorkerController) { this.worker = worker; this.controller = controller; this.sessionHandlerMap = {}; // 绑定 worker onmessage 事件的回调 this.worker.addEventListener('message', this.onmessage.bind(this)); } /** * 发送响应 * * @param sessionId 会话 Id * @param payload 负载 */ public response(sessionId: string, actionType: string, payload: any): void { this.postMessage({ messageType: WorkerMessageType.REPLY, actionType, payload, sessionId, }); } /** * 发送请求, 不等待响应 * * @param actionType 事务类型 * @param payload 负载 */ public request(actionType: string, payload: any): void { const sessionId = this.generateSessionId(); this.postMessage({ messageType: WorkerMessageType.REQUEST, actionType, payload, sessionId, }); // 不等待结果, 还会收到响应, 添加个空的会话响应器 this.addSessionHandler(sessionId, () => {}); } /** * 发送请求, 并等待响应 * * @param actionType 事务类型 * @param payload 负载 * @param timeout 响应超时 * @returns {Promise} 等待响应的 Promise */ public requestPromise(actionType: string, payload: any, timeout = COMMUNICATION_TIMEOUT): Promise { const sessionId = this.generateSessionId(); const message = { messageType: WorkerMessageType.REQUEST, actionType, payload, sessionId, }; // 请求封装为一个 Promise, 等待会话响应器进行 resolve const PromiseFunction = (resolve: Function, reject: Function): any => { // 启动请求超时计时器 const timeoutHandler = setTimeout(() => { clearTimeout(timeoutHandler); reject(); }, timeout); const sessionHandler: Function = (message: IWorkerMessage) => { // 会话回调函数, 开始处理响应 this.deleteSessionHandler(message.sessionId); clearTimeout(timeoutHandler); resolve(message.payload); }; this.addSessionHandler(sessionId, sessionHandler); // 开始发送请求 this.postMessage(message); }; return new Promise(PromiseFunction); } /** * 收到会话消息的处理函数 * * 发现是请求, 调用通信控制器的事务处理器进行处理, 获取事务结果并响应; * 发现是响应,调用会话响应器 * @param event worker 通信事件 */ private onmessage(event: { data: IWorkerMessage }): void { const { data: message } = event; const { messageType, sessionId, actionType } = message; // 接收到请求 if (messageType === WorkerMessageType.REQUEST) { // 处理请求 this.controller.actionHandler(message) .then(actionResult => { // 响应请求 this.response(sessionId, actionType, actionResult); }); } // 接收到响应 if (messageType === WorkerMessageType.REPLY) { // 处理响应 if (this.hasSessionHandler(sessionId)) { this.sessionHandlerMap[sessionId](message); } else { throw new Error(`Session \`${sessionId}\` handler no exist`); } } } /** * 封装的 worker 原生 postMessage 接口 * 支持 structured clone 和 transfer 2种通信模式 * * @param message 会话消息 */ private postMessage(message: IWorkerMessage): void { this.worker.postMessage(message); } /** * 添加会话响应器 * * @param sessionId 会话 Id * @param handler 会话响应器 */ private addSessionHandler(sessionId: string, handler: Function): void { if (!this.hasSessionHandler(sessionId)) { this.sessionHandlerMap[sessionId] = handler; } else { throw new Error(`SessionId \`${sessionId}\` already exist!`); } } /** * 移除会话响应器 * * @param sessionId */ private deleteSessionHandler(sessionId: string): void { if (this.hasSessionHandler(sessionId)) { delete this.sessionHandlerMap[sessionId]; } } /** * 生成每次独立会话的 Id * * @returns 会话 Id */ private generateSessionId(): string { const sessionId = `w_${BI.UUID()}`; return sessionId; } /** * 判断是否有指定会话的处理器 * * @param sessionId 会话 Id * @returns {boolean} 判断结果 */ private hasSessionHandler(sessionId: string): boolean { return !!this.sessionHandlerMap[sessionId]; } }