前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >用Go语言实现ReactiveX(二)——Deliver

用Go语言实现ReactiveX(二)——Deliver

作者头像
我不是码神
发布2022-07-28 14:22:34
2640
发布2022-07-28 14:22:34
举报
文章被收录于专栏:流媒体技术流媒体技术

接上一篇

李宇翔:用Go语言实现ReactiveX(一)——Observable

本篇,我们来实现ReactiveX中的操作符,即数据传递者Deliver。这些操作符一般包括,过滤、组合、数学运算、转换等几个大类。

Deliver既是Observable又是Observer,它接受一个或者多个Observable作为上一级的数据源,又可被订阅一次或者多次。

实现要点

  1. 传递数据、complete事件、error事件
  2. 订阅和退订上级数据源
  3. 可被下一级观察者订阅和退订

订阅上级数据源

现在假设我们有一个Observable就是前一篇文章中的FromArray

代码语言:javascript
复制
array:=int[]{1,2,3}
observable:=FromArray(array)

这个observable会依次发出1,2,3三个整数,然后complete。作为Deliver,我们可能需要去订阅这个observable,那么如何订阅呢? 根据上篇所述,订阅行为就是传入一个Next和一个Stop。

代码语言:javascript
复制
next:=make(Next)
stop:=make(Stop)
observable(next,stop)

我们完成了订阅,但我们还需要对订阅后采集来自observable的数据。

代码语言:javascript
复制
next:=make(Next)
stop:=make(Stop)
go observable(next,stop)
for d:= range next{
  //处理数据
}

由于observable里面的逻辑会阻塞当前‘线程’,所以我们加了关键字go。

退订上级数据源

代码语言:javascript
复制
close(stop)

随时都可以调用这个方法进行退订。

传递数据

真实的Deliver是这样定义的

代码语言:javascript
复制
Deliver func(source Observable) Observable

它是一个函数,接受一个Observable作为参数,返回一个Observable。 展开出来就是这样的

代码语言:javascript
复制
func deliver(source Observable) Observable {
    return func(next Next, stop Stop) {
        //deliver被订阅的时候就会执行这里面的逻辑
    }
}

我们可以在被订阅的时候,去订阅source,然后获取数据后传递给next管道

代码语言:javascript
复制
func deliver(source Observable) Observable {
    return func(next Next, stop Stop) {
        dnext:= make(Next)
        go source(dnext,stop)
        for d:= range next{
               next<-d
        }
        close(next)
    }
}

这样我就做好了一个什么也没用的数据传递者了。下面我们来实现一个有一点作用的filter

Filter的实现

代码语言:javascript
复制
func Filter(f func(interface{}) bool) Deliver {
    return func(source Observable) Observable {
        return func(next Next, stop Stop) {
            sNext := make(Next)
            go source(sNext, stop)
            for {
                select {
                case d, ok := <-sNext:
                    if !ok {
                        close(next)
                        return
                    }
                    if _, ok = d.(error); ok {
                        next <- d
                        close(next)
                        return
                    } else if f(d) {
                        next <- d
                    }
                case <-stop:
                    return
                }
            }
        }
    }
}

这里,我们用for select代替了for range,这样方便我们的接收到stop被close的时候发来的信息。我们判断了source是否complete,如果complete我们就close(next)——向下级发送complete信号。然后我们判断了数据是否是error类型,然后执行了filter函数来过滤数据。

其他的Deliver都是沿用Filter这套模板来实现的。这是个死循环结构,所以订阅deliver也需要用go关键字,这个和Observable是一脉相承的。

最后我们再看一个startwith操作符,也是一个十分常用的功能,用于在source前面加塞数据。如果有更好的表达方式,欢迎留言。

代码语言:javascript
复制
func StartWith(xs ...interface{}) Deliver {
    return func(source Observable) Observable {
        return func(next Next, s Stop) {
            stopped := false
            go func() {
                <-s
                stopped = true
            }()
            for d := range xs {
                if stopped {
                    return
                }
                next <- d
            }
            if !stopped {
                source(next, s)
            }
        }
    }
}
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2018-09-27,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 实现要点
  • 订阅上级数据源
  • 退订上级数据源
  • 传递数据
  • Filter的实现
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档