基于事件实现流程控制
目录
异步通信常用的回调(Callback)和事件(Event)都很相似,如果通信双方都在同一个环境,那么用回调比较方便;如果不同一个平台,需要编解码的,则用事件通信更好。
事件和控制流程
在 JavaScript 中,如果确定回调只会触发一次,那么可以用 Promise 可以优雅地处理大量回调的业务。
doAsyncTask().then(result => /* 异步执行结果 */)
但是有些适用 Promise 的场景,只提供 Event 接口,在运行环境的边界经常会遇到,甚至是同一块内存,就像海运港口,出海的物品有可能丢海里了,让货船给一个100%安全到达的保证是不现实的。
仅提供 Event 接口的异步通信在多线程通信中比较常见:
-
Android 子线程通过 Handler 通知UI线程,更新界面
- Handler.
sendMessage
- Handler.
handleMessage
- Handler.
-
Electron 渲染线程想调用主线程执行系统操作(如:选择文件)
- ipcMain.
send
、ipcMain.on
- ipcRenderer.
send
、ipcRenderer.on
- ipcMain.
-
UDP 通信
- socket.
sendto
- socket.
recvfrom
- socket.
用这些接口收发时,可能确定对方的标识,但是无法建立控制流程。A 端发起请求读文件、读系统信息,B 端可能先回复读系统信息,再回复读文件,这些数据都通过事件通信,没有任务标识。
// TypeScript
Channel.send({
task: 'readfile',
file: '/tmp/run.log'
})
Channel.send({
task: 'readsys'
})
Channel.subscribeOnce(msg => {
// 可能收到读系统信息的结果
// 也可能收到读文件的结果
// 需要判别消息的特征
})
基于事件流建立控制流程
对上方代码基于事件流建立控制流程后,看起来的业务代码像这样:
// TypeScript
Channel.send({ task: 'readsys' })
.then(msg => { /* 我应该收到的是读系统的结果 */ })
Channel.send({ task: 'readfile', file: '/tmp/run.log' })
.then(msg => { /* 我应该收到的是读文件的结果 */ })
如何实现?
给消息封装一层任务标识是比较简单的做法,如 TCP 在无状态的通信中用 Ack 确保消息按顺序发送。封装上述的事件可以用这个方法:
{"task": "readsys"} -> {"task_id": 1, "data": {"task": "readsys"} }
建立一个流程管理工具来管理任务标识和对应的回调,收到事件后,触发对应的回调。其实就是给无状态的数据流建立协议,所以两端都要定好相同的编码协议。

协定流程收发的协议
管理控制流程
示例: 一次完整的请求
- A端组建请求
- A端把请求序列化,附上 id
- A端把序列数据发给B端
- B端反序列数据,得到 id 和 请求
- B端处理完毕,组建响应
- B端把响应序列化,附上与请求一样的 id
- B端把序列数据发给A端
- A端反序列数据,得到 id 和 响应
- A端触发该 id 的回调
在这个请求里面,序列数据(无状态的数据, raw data)必须携带请求ID、数据标识,前者表示请求的ID,通过请求ID可以找到对应的回调函数,才能触发流程回调;后者表示该数据是一个请求体,还是一个响应体,带上这个标识可以实现请求响应双向流动,否则只能做到一端仅发送、一端仅接收。
控制流程系统的核心组件
encodeCmd
把请求体序列化,输出 raw datadecodeCmd
把 raw data 反序列化成请求体decodeRsp
把响应体序列化,输出 raw datadecodeRsp
把 raw data 反序列化成响应体probe
猜测 raw data 是响应体,还是请求体
工具库: PromisifyChannel
在业务空闲之余,自己按上方的流程做了个简单的控制流程工具,可应用到双端通信的场景中。
会将基于事件的流转换成 Promise 对象。
建立 PromisifyChannel
PromisifyChannel 需要数据编解码的具体实现,对于一般字符串数据,JSON 是快速有效的方法,所以这个库自带了 JSON 编解码来处理数据。
const channel = createChannel({
// 使用内建的 JSON 编解码数据
...BuiltinChannelEncoding.JsonString,
sendRaw: (raw) => {
// 把编码数据发给对端
anotherServer.send(raw)
},
handleCmdFunc: (key, cmd, channel) => {
// 处理请求数据
},
handleEventFunc: (key, cmd, channel) => {
// 处理事件数据
}
})
PromisifyChannel 处理对端数据
channel 提供一个方法处理对端发来的编码数据,channel 将它编解码后,触发处理请求回调。
如果对端调用 channel.invoke
则本端触发 handleCmdFunc
。
如果对端调用 channel.sendEvent
则本端触发 handleEventFunc
。
如果对端调用 channel.sendRsp
则本端触发 invoke
的回调。
所有触发的前提,都需要在本端调用 handleRaw。
anotherServer.on('data', raw => channel.handleRaw(raw))
PromisifyChannel 发送请求
给对端发送请求,channel 会对请求附上一个 key(请求ID),如果对端的响应也有一个请求ID,则由此触发一个完整的 Promise。
// 发送请求,得到请求id
channel.sendCmd({ msg: 'hi' }).then(key => {/* key = 请求的id */})
// 发送请求,等待对端响应
channel.invoke({ msg: 'hi' }).then(rsp => {/* rsp = 对端响应 */})
// 发送请求,等待对端响应,并设置 10 秒超时
channel.invokeWithTimeout({ msg: 'hi' }, 10 * 1000).then(rsp => {/* rsp = 对端响应 */})
示例: 用 PromisifyChannel 建立流程通信
这个示例模拟 Electron 环境,用 PromisifyChannel 接管渲染线程、主线程的通信,用 JSON 编码。
确定协议
export type BridgeCmd = {
fn: string
}
export type BridgeRsp = {
result: any
}
export type BridgeEvent = {
description: string
}
渲染线程配置
const channel = createChannel<BridgeCmd, BridgeRsp, BridgeEvent>({
sendRaw: (raw) => new Promise((resolve) => {
ipcRenderer.send('bridge', raw)
resolve(0)
}),
handleCmdFunc: (key, cmd, channel) => {
switch (cmd.fn) {
case 'getFPS':
channel.sendRsp(key, { result: 60 })
break
}
},
})
ipcRenderer.on('bridge', (event, raw) => channel.handleRaw(raw))
主线程配置
一个应用可能有多个 Web 页面,所以先用
webContents.getFocusedWebContents()
获取当前聚焦的页面,再发送数据。
const channel = createChannel<BridgeCmd, BridgeRsp, BridgeEvent>({
sendRaw: (raw) =>
new Promise((resolve) => {
const currentWebContents = webContents.getFocusedWebContents()
currentWebContents.send('bridge', raw)
resolve(0)
}),
handleFunc: (key, cmd, channel) => {
switch (cmd.fn) {
case 'selectFile':
channel.sendRsp(key, { result: '/tmp/run.log' })
break
}
},
})
ipcMain.on('bridge', (event, raw) => channel.handleRaw(raw))
调用
上方渲染线程开放了 getFPS
方法,主线程开放了 selectFile
方法。
渲染线程异步发给主线程,通知主线程选择文件,然后在回调中得到结果:
channel.invoke({ fn: 'selectFile' }).then(rsp => {
rsp.result
// 可以得到 /tmp/run.log
})
主线程异步发送渲染线程,通知页面获取当前FPS值,然后在回调中得到结果:
channel.invoke({ fn: 'getFPS' }).then(rsp => {
rsp.result
// 可以得到 60
})
其实 Electron 已经自带了控制流程,但是不是所有的平台都自带控制流程,所以看实际场景来选型,或者从各种事件源中抽象出控制流程。
附录
promisify an event-based channel, easy to build context-isolated apis