前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >简析redux技术栈(二):认识saga的buffer和chanel

简析redux技术栈(二):认识saga的buffer和chanel

作者头像
flytam
发布2020-01-14 17:58:44
1.1K0
发布2020-01-14 17:58:44
举报

本文地址 我们知道redux-saga 也是通过中间件的形式与 redux 本身连接起来。例如下面使用了redux-saga的react项目需要以下这样的 初始化

代码语言:javascript
复制
function configureStore(initialState) {
  // 运行返回一个redux middleware
  const sagaMiddleware = createSagaMiddleware();
  return {
    ...createStore(
      reducer,
      initialState,
      applyMiddleware(middleware1, middleware2, sagaMiddleware)
    ),
    runSaga: sagaMiddleware.run
  };
}

所以分析 redux-saga 的第一步,就从 redux-saga 的中间件开始。我们平时写代码在 react 中与 saga 进行交互,都是dispatch一个action到与我们的 saga 逻辑进行交互。翻看createSagaMiddleware源码,可以很清晰的看到,这就是使用了中间件后,我们每次dispatch一个 action 后,在 saga 中间件内会往channelput这个action,进而触发我们 saga 里面的逻辑。就实现了 react 组件和 saga 的交互了。那么这个stdChannel是什么呢

代码语言:javascript
复制
// 省略一些多余部分
function sagaMiddlewareFactory({ channel = stdChannel() } = {}) {
  //...
  function sagaMiddleware({ getState, dispatch }) {
    return next => action => {
      const result = next(action);
      // 实现了react和saga的交互
      channel.put(action);
      return result;
    };
  }
  //...
  return sagaMiddleware;
}

在了解 saga 的运行机制之前,先学习 redux-saga 源码内部的两个比较常用的数据结构bufferchanel

buffer

buffer 是一个固定长度类似队列的数据结构,它有四种类型(下面介绍),对外暴露了几个函数,如下

  • put 用来缓存 action
  • take 取出一个 action
  • isEmpty 判断 buffer 是否为空
  • flush 取出缓存的内的所有 action

我们知道如果我们直接使用数组的 push/unshift(pop/shift)函数实现队列的话,当我们出队列的时候时间复杂度是o(n)。而这里的 buffer 实现是比较巧妙的。数据存储是使用定长数组。通过pushIndexpopIndex标识位来记录出入队列的位置,它们的初始值都是 0,出队列的时候直接把popIndex位置空,然后值+1。入队列则是pushIndex+1。这样,无论take还是put,时间复杂度都是o(1)

pushIndex达到了 buffer 的长度的时候,buffer 的处理会根据 buffer 类型不同进行处理

1、ON_OVERFLOW_THROW:超出限制直接报错

2、ON_OVERFLOW_SLIDE:类似于环状队列,达到长度限制后,从索引 0 继续存储。

3、ON_OVERFLOW_EXPAND:达到限制后,长度自动变大 2 倍。

4、ON_OVERFLOW_DROP:达到限制后,后续的都丢弃

chanel

chanel 的实现是类似发布/订阅的设计模式。chanel.take(taker)存入一个 taker 函数,chanel.put(action)时,取出 cb 函数执行,action 是用来消费 taker 的

  • 普通 chanel(单播)

特点:当put一个 action 时,如果没有taker的时候,会将这个 action 存起来,存 action 是用了上面提到的buffer这个数据结构。等到有 taker 的时候可以马上调用 action。

一个简化版的单播 chanel 实现如下

代码语言:javascript
复制
class Chanel {
  constructor() {
    // 存action
    this.buffers = [];
    // 存taker
    this.takers = [];
    this.isClosed = false;
  }
  take(cb) {
    if (this.isClosed) {
      return;
    }
    if (this.buffers.length > 0) {
      cb(this.buffers.shift());
    } else {
      this.takers.push(cb);
    }
  }
  put(action) {
    if (this.takers.length === 0) {
      this.buffers.push(action);
    } else {
      this.takers.shift()(action);
    }
  }
  close() {
    if (this.isClosed) {
      return;
    }
  }
}

eventChanel 是在普通 Chanel 基础上实现,是用来用于订阅外部的事件源。chanel的一些使用参考可以看文档

简化的 eventChanel 实现如下,其实给订阅函数传进一个函数,调用这个函数可以往 Chanel 内 put 东西。

代码语言:javascript
复制
class eventChanel extends Chanel {
  constructor(subscribe) {
    super();
    this.unscribe = subscribe(action => {
      super.put(action);
    });
  }

  close() {
    this.unscribe();
    this.isClosed = true;
  }
}
  • 多播(multiCast) chanel

从上面的中间件源码可以看到,redux-saga 默认情况下的ChanelstdChannelstdChannel就是基于多播 chanel (multiCastChanel)实现,只不过添加了redux-saga本身的调度系统。multiCastChanel和 nodejs 的eventEmiter是非常类似的,multiCastChaneltake类似于eventEmiteronce,multiCastChanelput类似于eventEmiteremit

通俗的理解,saga 内 multiCastChanel 和 Chanel 最大的区别是,multiCastChanel 不能存 action,只能存 taker,能根据 action 的 type 判断是否执行 taker;chanel 可以缓存 action 和 taker,接收到 action 马上触发 taker,不会判断 type,类似于两个人对话的样子(单播)

一个简化版的 multiCastChanel 实现如下

代码语言:javascript
复制
class Chanel {
  constructor() {
    this.isClosed = false;
    this.takers = [];
  }
  put(action) {
    if (this.isClosed) {
      return;
    }
    const takers = this.takers;
    for (let i = 0, len = takers.length; i < len; i++) {
      if (!takers[i].MATCH || action.type === takers[i].MATCH) {
        takers[i](action);
        takers.splice(takers.indexOf(takers[i]), 1);
      }
    }
  }
  take(cb, match) {
    cb["MATCH"] = match;
    this.takers.push(cb);
  }
  close() {
    this.isClosed = true;
  }
}

源码中的 stdChanel 实现

代码语言:javascript
复制
export function stdChannel() {
  const chan = multicastChannel();
  const { put } = chan;
  chan.put = input => {
    // saga的action,不进入调度状态
    if (input[SAGA_ACTION]) {
      put(input);
      return;
    }
    asap(() => {
      put(input);
    });
  };
  return chan;
}

上面代码中的multicastChannel和我们的简化版 chanel 原理是一样的。我们可以看到,stdChanel是对multicastChannelput方法进行了重写。只是对于非 saga 内置action使用asap(() => { put(input); });进行调用,这个asap方法其实是 saga 内部调度系统的一个执行函数,它的作用是如果当前 saga 是空闲状态,则执行我们的回调;如果是挂起状态则将回调存进任务队列中。后面会专门介绍 saga 的调度系统。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • buffer
  • chanel
相关产品与服务
消息队列 TDMQ
消息队列 TDMQ (Tencent Distributed Message Queue)是腾讯基于 Apache Pulsar 自研的一个云原生消息中间件系列,其中包含兼容Pulsar、RabbitMQ、RocketMQ 等协议的消息队列子产品,得益于其底层计算与存储分离的架构,TDMQ 具备良好的弹性伸缩以及故障恢复能力。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档