前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Web Workers RPC:Comlink 源码解析

Web Workers RPC:Comlink 源码解析

作者头像
奋飛
发布2022-11-02 17:58:25
6350
发布2022-11-02 17:58:25
举报
文章被收录于专栏:Super 前端Super 前端Super 前端

上篇文章,有提及 Web Workers RPC 以解决浏览器不阻塞UI的问题,其中 comlink 是一把利器,本文就 comlink 的关键源码进行解析。

Comlink 通过提供 RPC 实现将基于 Worker.postMessage(someObject) 的 API 变成了对开发人员更友好的“类似本地调用”方式。

拆解源码之前,先介绍几个重要的概念:Proxy、Channel Messaging API、Transferable objects

在这里插入图片描述
在这里插入图片描述

注意:worker 创建完成后,每次通信都是新建 MessageChannel,目的是避免消息冲突。

重要概念

proxy
new Proxy(target, handler)
  • target 被代理的对象
  • handler 被代理对象上的自定义行为

handler 处理函数

说明

get

劫持获取属性值

set

劫持设置属性值

apply

劫持函数调用

construct

劫持 new 操作符

apply

function sum(a, b) {
  return a + b
}

const handler = {
  apply: function(target, thisArg, args) {
    return target(...args) * 10
  }
};

const proxy1 = new Proxy(sum, handler)

console.log(sum(1, 2))			// 3
console.log(proxy1(1, 2)) 	// 30

construct

class P {
  constructor (name) {
    this.name = name
  }
  sayName () {
    console.log(this.name)
    return this.name
  }
}

const ProxyP = new Proxy(P, {
  construct (target, args) {
    return new target(...args)
  }
})
new ProxyP('LiGang').sayName()	// LiGang
Channel Messaging API

Channel Messaging API 允许两个不同的脚本运行在同一个文档的不同浏览器上下文(比如两个 iframe,或者文档主体和一个 iframe,使用 SharedWorker 的两个文档,或者两个 worker)来直接通讯,在每端使用一个端口(port)通过双向频道(channel)向彼此传递消息。

使用 MessageChannel() 构造函数来创建通讯信道。一旦创建,信道的两个端口即可通过 MessageChannel.port1MessageChannel.port2 属性进行访问(都会返回 MessagePort 对象)。

MessageChannel 以 DOM Event 的形式发送消息,所以它属于异步的宏任务。

示例:作为 EventEmitter 事件订阅发布使用,实现脚本间通信

/* one.js */
export default function (port) { port.onmessage/port.postMessage }
/* two.js */
export default function (port) { port.onmessage/port.postMessage }

/* index.js */
import one from 'one.js'
import two from 'two.js'

const { port1, port2 } = new MessageChannel()
one(port1)
two(port2)

通过 Channel Messaging 进行通信,也可以完成 worker 和 worker 直接通信,无需主进程。

Transferable objects

可转移对象是拥有可以从一个上下文转移到另一个上下文的资源的对象,确保资源一次只能在一个上下文中可用。转移后,原始对象不再可用;它不再指向传输的资源,任何读取或写入对象的尝试都将引发异常。

可转移对象通常用于共享一次只能安全地暴露给单个 JavaScript 线程的资源。

支持的对象:ArrayBuffer、MessagePort、ReadableStream、TransformStream、AudioData、ImageBitmap 等

Channel Messaging API 的 MessageChannel 接口允许我们创建一个新的消息通道,并通过它的两个 MessagePort 属性(port1/port2)发送数据。

myWorker.postMessage(aMessage, transferList) transferList(可选):一个可选的Transferable对象的数组,用于传递所有权。如果一个对象的所有权被转移,在发送它的上下文中将变为不可用(中止),并且只有在它被发送到的 worker 中可用。

源码解析

  1. 通过 Proxy 对 wrap(worker) 劫持相关操作;
  2. 通过 ep(worker/MessageChannel)进行 on message 以及 postMessage 操作;
    • 基本类型:直接通过 worker 传递;
    • 非基本类型:需要通过 MessageChannel 传递port,进行 expose、wrap 处理
  3. 通过 toWireValue/fromWireValue 对通信原始数据处理
wrap

① createProxy(Proxy:get/set/apply/construct):创建后,会生成相应 Proxy ② processArguments : Proxy(get/set/apply/construct) 劫持处理 ③ toWireValue:对传入参数进行统一格式处理 [{type,name,value}, [transferables]] ④ serialize(MessageChannel)=> expose:创建 MessageChannel 通信管道、同时监听 worker 的返回(通过 expose,下面介绍) ⑤ requestResponseMessage(on message、postMessage):监听消息,同时发送当前信息

// ①
function createProxy<T>(
  ep: Endpoint,
  path: (string | number | symbol)[] = [],
  target: object = function () {}
): Remote<T> {
	return  new Proxy(target, {
    get() {},
    set() {},
    apply() {},
    construct() {
      const [argumentList, transferables] = processArguments(rawArgumentList);
      // ⑤
      return requestResponseMessage(...).then(fromWireValue)
    }
  }    
}

// ②
function processArguments(argumentList: any[]): [WireValue[], Transferable[]] {
  const processed = argumentList.map(toWireValue);
  return [processed.map((v) => v[0]), myFlat(processed.map((v) => v[1]))];
}

// ③    
function toWireValue(value: any): [WireValue, Transferable[]] {
  for (const [name, handler] of transferHandlers) {
     const [serializedValue, transferables] = handler.serialize(value);
  }
}
   
const proxyTransferHandler: TransferHandler<object, MessagePort> = {
  canHandle: (val): val is ProxyMarked =>
    isObject(val) && (val as ProxyMarked)[proxyMarker],
  // ④       
  serialize(obj) {
    const { port1, port2 } = new MessageChannel();
    expose(obj, port1);
    return [port2, [port2]];
  },
  deserialize(port) {
    port.start();
    return wrap(port);
  },
}    

// ⑤
function requestResponseMessage(): Promise<WireValue> {
  return new Promise((resolve) => {
    ep.addEventListener("message", function l(ev: MessageEvent) {
      resolve(ev.data);
    } as any);
    ep.postMessage({ id, ...msg }, transfers);
  });
}

关于 get 中 then 的特别说明:

根据 ECMAScript® 2022 Language Specification 中 await 的描述:

  • await value 在内部实现中会变成 await Promise.resolve(value)
  • Promise.resolve 的处理中 则会获取 value.then 的值,如果它是一个函数则会通过它创建一个 Promise Job。

await value => await Promise.resolve(value) => await {then}

下述例子中 value 等于 success

const value = await {
  then: (resolve, reject) => {
    resolve('success')
  }
}
expose

① on message:监听 message 事件 ② fromWireValue:对接受参数进行统一格式处理 ③ deserialize => wrap:发送消息对列、同时代理相关内容(通过 wrap,上面介绍) ④ GET/SET/APPLY/CONSTRUCT/ENDPONIT/RELEASE:针对不同MessageType,执行不同逻辑 ⑤ returnValue :依据分支 ④ 产生返回结果 ⑥ toWireValue ⑦ serialize(MessageChannel) ⑧ postMessage(transferables):发送

export function expose(obj: any, ep: Endpoint = self as any) {
  // ①
  ep.addEventListener("message", function callback(ev: MessageEvent) {
    // ② ③
    const argumentList = (ev.data.argumentList || []).map(fromWireValue);
    switch (type) {
      // ④  
    	case GET/SET/APPLY/CONSTRUCT/ENDPONIT/RELEASE
        // ⑤
      	returnValue = ...  
    }
    Promise.resolve(returnValue)
      .catch((value) => {
      return { value, [throwMarker]: 0 };
    }).then((returnValue) => {
      // ⑥ ⑦ 
      const [wireValue, transferables] = toWireValue(returnValue);
      // ⑧
      ep.postMessage({ ...wireValue, id }, transferables);
    });
  }
}

// ②
function fromWireValue(value: WireValue): any {
  switch (value.type) {
    case WireValueType.HANDLER:
      // ③
      return transferHandlers.get(value.name)!.deserialize(value.value);
    case WireValueType.RAW:
      return value.value;
  }
}   
  
const proxyTransferHandler: TransferHandler<object, MessagePort> = {
  deserialize(port) {
    port.start();
    // 执行 wrap 流程
    return wrap(port);
  },
}   

使用

如果大家项目中需要使用 webWorker,强烈推荐大家尝试 Comlink,Comlink 同项目结合,可以使用 comlink-loader

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2022-10-24 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 重要概念
    • proxy
      • Channel Messaging API
        • Transferable objects
          • wrap
          • expose
      • 源码解析
      • 使用
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档