iapyang
2 years ago
3 changed files with 363 additions and 0 deletions
@ -0,0 +1,200 @@
|
||||
import { IWorkerController, WorkerMessageType, IWorkerMessage } from './worker'; |
||||
|
||||
const COMMUNICATION_TIMEOUT = 30000; |
||||
|
||||
/** |
||||
* 通信通道 |
||||
*/ |
||||
export class WorkerChannel { |
||||
/** |
||||
* Web Worker 实例 |
||||
*/ |
||||
private worker: Worker; |
||||
|
||||
/** |
||||
* 上层通信控制器 |
||||
*/ |
||||
private controller: IWorkerController; |
||||
|
||||
/** |
||||
* 会话响应器 Map |
||||
*/ |
||||
private sessionHandlerMap: { |
||||
[propsName: string]: Function; |
||||
}; |
||||
|
||||
public constructor(worker: Worker, controller: IWorkerController) { |
||||
this.worker = worker; |
||||
this.controller = controller; |
||||
|
||||
this.sessionHandlerMap = {}; |
||||
|
||||
// 绑定 worker onmessage 事件的回调
|
||||
this.worker.addEventListener('message', this.onmessage.bind(this)); |
||||
} |
||||
|
||||
/** |
||||
* 发送响应 |
||||
* |
||||
* @param sessionId 会话 Id |
||||
* @param payload 负载 |
||||
*/ |
||||
public response(sessionId: string, actionType: string, payload: any): void { |
||||
this.postMessage({ |
||||
messageType: WorkerMessageType.REPLY, |
||||
actionType, |
||||
payload, |
||||
sessionId, |
||||
}); |
||||
} |
||||
|
||||
/** |
||||
* 发送请求, 不等待响应 |
||||
* |
||||
* @param actionType 事务类型 |
||||
* @param payload 负载 |
||||
*/ |
||||
public request(actionType: string, payload: any): void { |
||||
const sessionId = this.generateSessionId(); |
||||
this.postMessage({ |
||||
messageType: WorkerMessageType.REQUEST, |
||||
actionType, |
||||
payload, |
||||
sessionId, |
||||
}); |
||||
|
||||
// 不等待结果, 还会收到响应, 添加个空的会话响应器
|
||||
this.addSessionHandler(sessionId, () => {}); |
||||
} |
||||
|
||||
/** |
||||
* 发送请求, 并等待响应 |
||||
* |
||||
* @param actionType 事务类型 |
||||
* @param payload 负载 |
||||
* @param timeout 响应超时 |
||||
* @returns {Promise<IWorkerMessage>} 等待响应的 Promise |
||||
*/ |
||||
public requestPromise<T>(actionType: string, payload: any, timeout = COMMUNICATION_TIMEOUT): Promise<T> { |
||||
const sessionId = this.generateSessionId(); |
||||
const message = { |
||||
messageType: WorkerMessageType.REQUEST, |
||||
actionType, |
||||
payload, |
||||
sessionId, |
||||
}; |
||||
|
||||
// 请求封装为一个 Promise, 等待会话响应器进行 resolve
|
||||
const PromiseFunction = (resolve: Function, reject: Function): any => { |
||||
// 启动请求超时计时器
|
||||
const timeoutHandler = setTimeout(() => { |
||||
clearTimeout(timeoutHandler); |
||||
|
||||
reject(); |
||||
}, timeout); |
||||
|
||||
const sessionHandler: Function = (message: IWorkerMessage) => { |
||||
// 会话回调函数, 开始处理响应
|
||||
this.deleteSessionHandler(message.sessionId); |
||||
clearTimeout(timeoutHandler); |
||||
|
||||
resolve(message.payload); |
||||
}; |
||||
|
||||
this.addSessionHandler(sessionId, sessionHandler); |
||||
|
||||
// 开始发送请求
|
||||
this.postMessage(message); |
||||
}; |
||||
|
||||
return new Promise(PromiseFunction); |
||||
} |
||||
|
||||
/** |
||||
* 收到会话消息的处理函数 |
||||
* |
||||
* 发现是请求, 调用通信控制器的事务处理器进行处理, 获取事务结果并响应; |
||||
* 发现是响应,调用会话响应器 |
||||
* @param event worker 通信事件 |
||||
*/ |
||||
private onmessage(event: { data: IWorkerMessage }): void { |
||||
const { data: message } = event; |
||||
const { messageType, sessionId, actionType } = message; |
||||
|
||||
// 接收到请求
|
||||
if (messageType === WorkerMessageType.REQUEST) { |
||||
// 处理请求
|
||||
this.controller.actionHandler(message) |
||||
.then(actionResult => { |
||||
// 响应请求
|
||||
this.response(sessionId, actionType, actionResult); |
||||
}); |
||||
} |
||||
|
||||
// 接收到响应
|
||||
if (messageType === WorkerMessageType.REPLY) { |
||||
// 处理响应
|
||||
if (this.hasSessionHandler(sessionId)) { |
||||
this.sessionHandlerMap[sessionId](message); |
||||
} else { |
||||
throw new Error(`Session \`${sessionId}\` handler no exist`); |
||||
} |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* 封装的 worker 原生 postMessage 接口 |
||||
* 支持 structured clone 和 transfer 2种通信模式 |
||||
* |
||||
* @param message 会话消息 |
||||
*/ |
||||
private postMessage(message: IWorkerMessage): void { |
||||
this.worker.postMessage(message); |
||||
} |
||||
|
||||
/** |
||||
* 添加会话响应器 |
||||
* |
||||
* @param sessionId 会话 Id |
||||
* @param handler 会话响应器 |
||||
*/ |
||||
private addSessionHandler(sessionId: string, handler: Function): void { |
||||
if (!this.hasSessionHandler(sessionId)) { |
||||
this.sessionHandlerMap[sessionId] = handler; |
||||
} else { |
||||
throw new Error(`SessionId \`${sessionId}\` already exist!`); |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* 移除会话响应器 |
||||
* |
||||
* @param sessionId |
||||
*/ |
||||
private deleteSessionHandler(sessionId: string): void { |
||||
if (this.hasSessionHandler(sessionId)) { |
||||
delete this.sessionHandlerMap[sessionId]; |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* 生成每次独立会话的 Id |
||||
* |
||||
* @returns 会话 Id |
||||
*/ |
||||
private generateSessionId(): string { |
||||
const sessionId = `w_${BI.UUID()}`; |
||||
|
||||
return sessionId; |
||||
} |
||||
|
||||
/** |
||||
* 判断是否有指定会话的处理器 |
||||
* |
||||
* @param sessionId 会话 Id |
||||
* @returns {boolean} 判断结果 |
||||
*/ |
||||
private hasSessionHandler(sessionId: string): boolean { |
||||
return !!this.sessionHandlerMap[sessionId]; |
||||
} |
||||
} |
@ -0,0 +1,131 @@
|
||||
import type { IWorkerController, IWorkerMessage } from './worker'; |
||||
import { WorkerChannel } from './worker.channel'; |
||||
|
||||
/** |
||||
* 通信控制器 |
||||
* |
||||
* @class BaseWorkerController |
||||
*/ |
||||
export default class BaseWorkerController implements IWorkerController { |
||||
/** |
||||
* 原生 worker, 在子类中实例化 |
||||
*/ |
||||
protected worker: Worker; |
||||
|
||||
/** |
||||
* 通信 Channel, 在子类中实例化 |
||||
*/ |
||||
protected channel: WorkerChannel; |
||||
|
||||
/** |
||||
* 事务处理器 Map |
||||
*/ |
||||
protected actionHandlerMap: { |
||||
[propsName: string]: (payload: any) => any; |
||||
}; |
||||
|
||||
public constructor() { |
||||
this.actionHandlerMap = {}; |
||||
} |
||||
|
||||
/** |
||||
* 发送事务,不等待结果 |
||||
* |
||||
* @param actionType 事务类型 |
||||
* @param payload 负载 |
||||
*/ |
||||
public request(actionType: string, payload: any): void { |
||||
if (this.channel) { |
||||
return this.channel.request(actionType, payload); |
||||
} |
||||
|
||||
console.error('No channel.'); |
||||
|
||||
return; |
||||
} |
||||
|
||||
/** |
||||
* 发送 Promise 形式的事务, 在 then 中获取响应 |
||||
* |
||||
* @param actionType 事务类型 |
||||
* @param payload 负载 |
||||
* @param [timeout] 响应的超时; Worker 通道是可靠的, 超时后只上报, 不阻止当前请求 |
||||
*/ |
||||
public requestPromise<T = any>(actionType: string, payload: any = '', timeout?: number): Promise<T> { |
||||
// 有 Channel 实例才能进行通信, 此时还没有实例化是浏览器不支持创建 worker
|
||||
if (this.channel) { |
||||
return this.channel.requestPromise<T>(actionType, payload, timeout); |
||||
} |
||||
|
||||
// 兼容上层调用的 .then().catch()
|
||||
return Promise.reject(new Error('No channel.')); |
||||
} |
||||
|
||||
/** |
||||
* 添加事务处理器, 不允许重复添加 |
||||
* |
||||
* @param actionType 事务类型 |
||||
* @param handler 事务处理器 |
||||
*/ |
||||
public addActionHandler(actionType: string, handler: (payload: any) => any): void { |
||||
if (this.hasActionHandler(actionType)) { |
||||
throw new Error(`Add action \`${actionType}\` handler repeat`); |
||||
} |
||||
this.actionHandlerMap[actionType] = handler; |
||||
} |
||||
|
||||
/** |
||||
* 事务处理器, 提供给通信 Channel 调用 |
||||
* |
||||
* @param message 会话消息 |
||||
* @returns |
||||
*/ |
||||
public actionHandler(message: IWorkerMessage): Promise<any> { |
||||
const { actionType, payload } = message; |
||||
|
||||
if (this.hasActionHandler(actionType)) { |
||||
// 执行指定的事务处理器, 并返回 Promise 封装的事务结果
|
||||
try { |
||||
const actionResult = this.actionHandlerMap[actionType](payload); |
||||
|
||||
// 对于 Promise 形式的结果, 需要进行 Promise 错误捕获
|
||||
if (BI.isPromise(actionResult)) { |
||||
return actionResult.catch(error => Promise.reject(error)); |
||||
} |
||||
|
||||
// 对数据结果, 包装为 Promise
|
||||
return Promise.resolve(actionResult); |
||||
} catch (error) { |
||||
// 继续抛出给外层
|
||||
return Promise.reject(error); |
||||
} |
||||
} else { |
||||
throw new Error(`Not Found Session Handler \`${actionType}\`.`); |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* 添加 worker onmessage 事件的回调 |
||||
* |
||||
* @param {(event: any) => void} onmessage 回调函数 |
||||
* @returns {() => void} 移除监听函数 |
||||
*/ |
||||
public addOnmessageListener(onmessage: (event: any) => void): () => void { |
||||
this.worker.addEventListener('message', onmessage); |
||||
|
||||
// 返回移除监听函数
|
||||
return () => { |
||||
this.worker.removeEventListener('message', onmessage); |
||||
}; |
||||
} |
||||
|
||||
/** |
||||
* 判断是否有指定事务的处理器 |
||||
* |
||||
* @param actionType 事务类型 |
||||
* @returns {boolean} |
||||
*/ |
||||
protected hasActionHandler(actionType: string): boolean { |
||||
return !!this.actionHandlerMap[actionType]; |
||||
} |
||||
} |
@ -0,0 +1,32 @@
|
||||
/** |
||||
* 会话消息类型枚举 |
||||
*/ |
||||
export const enum WorkerMessageType { |
||||
REQUEST = 'REQUEST', |
||||
REPLY = 'REPLY', |
||||
} |
||||
|
||||
/** |
||||
* 会话消息 |
||||
*/ |
||||
export interface IWorkerMessage { |
||||
messageType: WorkerMessageType; |
||||
actionType: string; |
||||
sessionId: string; |
||||
|
||||
/** |
||||
* 数据交换参数 |
||||
*/ |
||||
payload: any; |
||||
} |
||||
|
||||
/** |
||||
* 通信控制器需要实现的 interface |
||||
*/ |
||||
export interface IWorkerController { |
||||
|
||||
/** |
||||
* 事务处理器 |
||||
*/ |
||||
actionHandler: (message: IWorkerMessage) => Promise<any>; |
||||
} |
Loading…
Reference in new issue