diff --git a/typescript/core/worker/worker.channel.ts b/typescript/core/worker/worker.channel.ts new file mode 100644 index 000000000..0f0253121 --- /dev/null +++ b/typescript/core/worker/worker.channel.ts @@ -0,0 +1,200 @@ +import { IWorkerController, WorkerMessageType, IWorkerMessage } from './worker'; + +const COMMUNICATION_TIMEOUT = 30000; + +/** + * 通信通道 + */ +export class WorkerChannel { + /** + * Web Worker 实例 + */ + private worker: Worker; + + /** + * 上层通信控制器 + */ + private controller: IWorkerController; + + /** + * 会话响应器 Map + */ + private sessionHandlerMap: { + [propsName: string]: Function; + }; + + public constructor(worker: Worker, controller: IWorkerController) { + this.worker = worker; + this.controller = controller; + + this.sessionHandlerMap = {}; + + // 绑定 worker onmessage 事件的回调 + this.worker.addEventListener('message', this.onmessage.bind(this)); + } + + /** + * 发送响应 + * + * @param sessionId 会话 Id + * @param payload 负载 + */ + public response(sessionId: string, actionType: string, payload: any): void { + this.postMessage({ + messageType: WorkerMessageType.REPLY, + actionType, + payload, + sessionId, + }); + } + + /** + * 发送请求, 不等待响应 + * + * @param actionType 事务类型 + * @param payload 负载 + */ + public request(actionType: string, payload: any): void { + const sessionId = this.generateSessionId(); + this.postMessage({ + messageType: WorkerMessageType.REQUEST, + actionType, + payload, + sessionId, + }); + + // 不等待结果, 还会收到响应, 添加个空的会话响应器 + this.addSessionHandler(sessionId, () => {}); + } + + /** + * 发送请求, 并等待响应 + * + * @param actionType 事务类型 + * @param payload 负载 + * @param timeout 响应超时 + * @returns {Promise} 等待响应的 Promise + */ + public requestPromise(actionType: string, payload: any, timeout = COMMUNICATION_TIMEOUT): Promise { + const sessionId = this.generateSessionId(); + const message = { + messageType: WorkerMessageType.REQUEST, + actionType, + payload, + sessionId, + }; + + // 请求封装为一个 Promise, 等待会话响应器进行 resolve + const PromiseFunction = (resolve: Function, reject: Function): any => { + // 启动请求超时计时器 + const timeoutHandler = setTimeout(() => { + clearTimeout(timeoutHandler); + + reject(); + }, timeout); + + const sessionHandler: Function = (message: IWorkerMessage) => { + // 会话回调函数, 开始处理响应 + this.deleteSessionHandler(message.sessionId); + clearTimeout(timeoutHandler); + + resolve(message.payload); + }; + + this.addSessionHandler(sessionId, sessionHandler); + + // 开始发送请求 + this.postMessage(message); + }; + + return new Promise(PromiseFunction); + } + + /** + * 收到会话消息的处理函数 + * + * 发现是请求, 调用通信控制器的事务处理器进行处理, 获取事务结果并响应; + * 发现是响应,调用会话响应器 + * @param event worker 通信事件 + */ + private onmessage(event: { data: IWorkerMessage }): void { + const { data: message } = event; + const { messageType, sessionId, actionType } = message; + + // 接收到请求 + if (messageType === WorkerMessageType.REQUEST) { + // 处理请求 + this.controller.actionHandler(message) + .then(actionResult => { + // 响应请求 + this.response(sessionId, actionType, actionResult); + }); + } + + // 接收到响应 + if (messageType === WorkerMessageType.REPLY) { + // 处理响应 + if (this.hasSessionHandler(sessionId)) { + this.sessionHandlerMap[sessionId](message); + } else { + throw new Error(`Session \`${sessionId}\` handler no exist`); + } + } + } + + /** + * 封装的 worker 原生 postMessage 接口 + * 支持 structured clone 和 transfer 2种通信模式 + * + * @param message 会话消息 + */ + private postMessage(message: IWorkerMessage): void { + this.worker.postMessage(message); + } + + /** + * 添加会话响应器 + * + * @param sessionId 会话 Id + * @param handler 会话响应器 + */ + private addSessionHandler(sessionId: string, handler: Function): void { + if (!this.hasSessionHandler(sessionId)) { + this.sessionHandlerMap[sessionId] = handler; + } else { + throw new Error(`SessionId \`${sessionId}\` already exist!`); + } + } + + /** + * 移除会话响应器 + * + * @param sessionId + */ + private deleteSessionHandler(sessionId: string): void { + if (this.hasSessionHandler(sessionId)) { + delete this.sessionHandlerMap[sessionId]; + } + } + + /** + * 生成每次独立会话的 Id + * + * @returns 会话 Id + */ + private generateSessionId(): string { + const sessionId = `w_${BI.UUID()}`; + + return sessionId; + } + + /** + * 判断是否有指定会话的处理器 + * + * @param sessionId 会话 Id + * @returns {boolean} 判断结果 + */ + private hasSessionHandler(sessionId: string): boolean { + return !!this.sessionHandlerMap[sessionId]; + } +} diff --git a/typescript/core/worker/worker.controller.ts b/typescript/core/worker/worker.controller.ts new file mode 100644 index 000000000..ebc0d16f6 --- /dev/null +++ b/typescript/core/worker/worker.controller.ts @@ -0,0 +1,131 @@ +import type { IWorkerController, IWorkerMessage } from './worker'; +import { WorkerChannel } from './worker.channel'; + +/** + * 通信控制器 + * + * @class BaseWorkerController + */ +export default class BaseWorkerController implements IWorkerController { + /** + * 原生 worker, 在子类中实例化 + */ + protected worker: Worker; + + /** + * 通信 Channel, 在子类中实例化 + */ + protected channel: WorkerChannel; + + /** + * 事务处理器 Map + */ + protected actionHandlerMap: { + [propsName: string]: (payload: any) => any; + }; + + public constructor() { + this.actionHandlerMap = {}; + } + + /** + * 发送事务,不等待结果 + * + * @param actionType 事务类型 + * @param payload 负载 + */ + public request(actionType: string, payload: any): void { + if (this.channel) { + return this.channel.request(actionType, payload); + } + + console.error('No channel.'); + + return; + } + + /** + * 发送 Promise 形式的事务, 在 then 中获取响应 + * + * @param actionType 事务类型 + * @param payload 负载 + * @param [timeout] 响应的超时; Worker 通道是可靠的, 超时后只上报, 不阻止当前请求 + */ + public requestPromise(actionType: string, payload: any = '', timeout?: number): Promise { + // 有 Channel 实例才能进行通信, 此时还没有实例化是浏览器不支持创建 worker + if (this.channel) { + return this.channel.requestPromise(actionType, payload, timeout); + } + + // 兼容上层调用的 .then().catch() + return Promise.reject(new Error('No channel.')); + } + + /** + * 添加事务处理器, 不允许重复添加 + * + * @param actionType 事务类型 + * @param handler 事务处理器 + */ + public addActionHandler(actionType: string, handler: (payload: any) => any): void { + if (this.hasActionHandler(actionType)) { + throw new Error(`Add action \`${actionType}\` handler repeat`); + } + this.actionHandlerMap[actionType] = handler; + } + + /** + * 事务处理器, 提供给通信 Channel 调用 + * + * @param message 会话消息 + * @returns + */ + public actionHandler(message: IWorkerMessage): Promise { + const { actionType, payload } = message; + + if (this.hasActionHandler(actionType)) { + // 执行指定的事务处理器, 并返回 Promise 封装的事务结果 + try { + const actionResult = this.actionHandlerMap[actionType](payload); + + // 对于 Promise 形式的结果, 需要进行 Promise 错误捕获 + if (BI.isPromise(actionResult)) { + return actionResult.catch(error => Promise.reject(error)); + } + + // 对数据结果, 包装为 Promise + return Promise.resolve(actionResult); + } catch (error) { + // 继续抛出给外层 + return Promise.reject(error); + } + } else { + throw new Error(`Not Found Session Handler \`${actionType}\`.`); + } + } + + /** + * 添加 worker onmessage 事件的回调 + * + * @param {(event: any) => void} onmessage 回调函数 + * @returns {() => void} 移除监听函数 + */ + public addOnmessageListener(onmessage: (event: any) => void): () => void { + this.worker.addEventListener('message', onmessage); + + // 返回移除监听函数 + return () => { + this.worker.removeEventListener('message', onmessage); + }; + } + + /** + * 判断是否有指定事务的处理器 + * + * @param actionType 事务类型 + * @returns {boolean} + */ + protected hasActionHandler(actionType: string): boolean { + return !!this.actionHandlerMap[actionType]; + } +} diff --git a/typescript/core/worker/worker.ts b/typescript/core/worker/worker.ts new file mode 100644 index 000000000..24e5825a4 --- /dev/null +++ b/typescript/core/worker/worker.ts @@ -0,0 +1,32 @@ +/** + * 会话消息类型枚举 + */ +export const enum WorkerMessageType { + REQUEST = 'REQUEST', + REPLY = 'REPLY', +} + +/** + * 会话消息 + */ +export interface IWorkerMessage { + messageType: WorkerMessageType; + actionType: string; + sessionId: string; + + /** + * 数据交换参数 + */ + payload: any; +} + +/** + * 通信控制器需要实现的 interface + */ +export interface IWorkerController { + + /** + * 事务处理器 + */ + actionHandler: (message: IWorkerMessage) => Promise; +}