Skip to main content

基于事件实现流程控制

异步通信常用的回调(Callback)和事件(Event)都很相似,如果通信双方都在同一个环境,那么用回调比较方便;如果不同一个平台,需要编解码的,则用事件通信更好。

事件和控制流程

在 JavaScript 中,如果确定回调只会触发一次,那么可以用 Promise 可以优雅地处理大量回调的业务。

doAsyncTask().then(result => /* 异步执行结果 */)

但是有些适用 Promise 的场景,只提供 Event 接口,在运行环境的边界经常会遇到,甚至是同一块内存,就像海运港口,出海的物品有可能丢海里了,让货船给一个100%安全到达的保证是不现实的。

仅提供 Event 接口的异步通信在多线程通信中比较常见:

  • Android 子线程通过 Handler 通知UI线程,更新界面

    • Handler.sendMessage
    • Handler.handleMessage
  • Electron 渲染线程想调用主线程执行系统操作(如:选择文件)

    • ipcMain.send、ipcMain.on
    • ipcRenderer.send、ipcRenderer.on
  • UDP 通信

    • socket.sendto
    • socket.recvfrom

用这些接口收发时,可能确定对方的标识,但是无法建立控制流程。A 端发起请求读文件、读系统信息,B 端可能先回复读系统信息,再回复读文件,这些数据都通过事件通信,没有任务标识。

// TypeScript
Channel.send({
  task: 'readfile',
  file: '/tmp/run.log'
})
Channel.send({
  task: 'readsys'
})
Channel.subscribeOnce(msg => {
  // 可能收到读系统信息的结果
  // 也可能收到读文件的结果
  // 需要判别消息的特征
})

基于事件流建立控制流程

对上方代码基于事件流建立控制流程后,看起来的业务代码像这样:

// TypeScript
Channel.send({ task: 'readsys' })
       .then(msg => { /* 我应该收到的是读系统的结果 */ })

Channel.send({ task: 'readfile', file: '/tmp/run.log' })
       .then(msg => { /* 我应该收到的是读文件的结果 */ })

如何实现?

给消息封装一层任务标识是比较简单的做法,如 TCP 在无状态的通信中用 Ack 确保消息按顺序发送。封装上述的事件可以用这个方法:

{"task": "readsys"} -> {"task_id": 1, "data": {"task": "readsys"} }

建立一个流程管理工具来管理任务标识和对应的回调,收到事件后,触发对应的回调。其实就是给无状态的数据流建立协议,所以两端都要定好相同的编码协议。

协定流程收发的协议

管理控制流程

示例: 一次完整的请求

  1. A端组建请求
  2. A端把请求序列化,附上 id
  3. A端把序列数据发给B端
  4. B端反序列数据,得到 id 和 请求
  5. B端处理完毕,组建响应
  6. B端把响应序列化,附上与请求一样的 id
  7. B端把序列数据发给A端
  8. A端反序列数据,得到 id 和 响应
  9. A端触发该 id 的回调

在这个请求里面,序列数据(无状态的数据, raw data)必须携带请求ID数据标识,前者表示请求的ID,通过请求ID可以找到对应的回调函数,才能触发流程回调;后者表示该数据是一个请求体,还是一个响应体,带上这个标识可以实现请求响应双向流动,否则只能做到一端仅发送、一端仅接收。

控制流程系统的核心组件

  • encodeCmd请求体序列化,输出 raw data
  • decodeCmd 把 raw data 反序列化成请求体
  • decodeRsp响应体序列化,输出 raw data
  • decodeRsp 把 raw data 反序列化成响应体
  • probe 猜测 raw data 是响应体,还是请求体

工具库: PromisifyChannel

在业务空闲之余,自己按上方的流程做了个简单的控制流程工具,可应用到双端通信的场景中。

会将基于事件的流转换成 Promise 对象。

在 Github Gist 查看源码

建立 PromisifyChannel

PromisifyChannel 需要数据编解码的具体实现,对于一般字符串数据,JSON 是快速有效的方法,所以这个库自带了 JSON 编解码来处理数据。

const channel = createChannel({
  // 使用内建的 JSON 编解码数据
  ...BuiltinChannelEncoding.JsonString,
  sendRaw: (raw) => {
    // 把编码数据发给对端
    anotherServer.send(raw)
  },
  handleCmdFunc: (key, cmd, channel) => {
    // 处理请求数据
  },
  handleEventFunc: (key, cmd, channel) => {
    // 处理事件数据
  }
})

PromisifyChannel 处理对端数据

channel 提供一个方法处理对端发来的编码数据,channel 将它编解码后,触发处理请求回调。

如果对端调用 channel.invoke 则本端触发 handleCmdFunc

如果对端调用 channel.sendEvent 则本端触发 handleEventFunc

如果对端调用 channel.sendRsp 则本端触发 invoke 的回调。

所有触发的前提,都需要在本端调用 handleRaw。

anotherServer.on('data', raw => channel.handleRaw(raw))

PromisifyChannel 发送请求

给对端发送请求,channel 会对请求附上一个 key(请求ID),如果对端的响应也有一个请求ID,则由此触发一个完整的 Promise。

// 发送请求,得到请求id
channel.sendCmd({ msg: 'hi' }).then(key => {/* key = 请求的id */})

// 发送请求,等待对端响应
channel.invoke({ msg: 'hi' }).then(rsp => {/* rsp = 对端响应 */})

// 发送请求,等待对端响应,并设置 10 秒超时
channel.invokeWithTimeout({ msg: 'hi' }, 10 * 1000).then(rsp => {/* rsp = 对端响应 */})

示例: 用 PromisifyChannel 建立流程通信

这个示例模拟 Electron 环境,用 PromisifyChannel 接管渲染线程、主线程的通信,用 JSON 编码。

确定协议

export type BridgeCmd = {
  fn: string
}

export type BridgeRsp = {
  result: any
}

export type BridgeEvent = {
  description: string
}

渲染线程配置

const channel = createChannel<BridgeCmd, BridgeRsp, BridgeEvent>({
  sendRaw: (raw) => new Promise((resolve) => {
    ipcRenderer.send('bridge', raw)
    resolve(0)
  }),
  handleCmdFunc: (key, cmd, channel) => {
    switch (cmd.fn) {
      case 'getFPS':
        channel.sendRsp(key, { result: 60 })
        break
    }
  },
})
ipcRenderer.on('bridge', (event, raw) => channel.handleRaw(raw))

主线程配置

一个应用可能有多个 Web 页面,所以先用 webContents.getFocusedWebContents() 获取当前聚焦的页面,再发送数据。

const channel = createChannel<BridgeCmd, BridgeRsp, BridgeEvent>({
  sendRaw: (raw) =>
    new Promise((resolve) => {
      const currentWebContents = webContents.getFocusedWebContents()
      currentWebContents.send('bridge', raw)
      resolve(0)
    }),
  handleFunc: (key, cmd, channel) => {
    switch (cmd.fn) {
      case 'selectFile':
        channel.sendRsp(key, { result: '/tmp/run.log' })
        break
    }
  },
})
ipcMain.on('bridge', (event, raw) => channel.handleRaw(raw))

调用

上方渲染线程开放了 getFPS 方法,主线程开放了 selectFile 方法。

渲染线程异步发给主线程,通知主线程选择文件,然后在回调中得到结果:

channel.invoke({ fn: 'selectFile' }).then(rsp => {
  rsp.result
  // 可以得到 /tmp/run.log
})

主线程异步发送渲染线程,通知页面获取当前FPS值,然后在回调中得到结果:

channel.invoke({ fn: 'getFPS' }).then(rsp => {
  rsp.result
  // 可以得到 60
})

其实 Electron 已经自带了控制流程,但是不是所有的平台都自带控制流程,所以看实际场景来选型,或者从各种事件源中抽象出控制流程。

附录

promisify an event-based channel, easy to build context-isolated apis

WebContents | Electron

ipcMain | Electron