首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >用Go语言实现ReactiveX(三)——链式编程

用Go语言实现ReactiveX(三)——链式编程

作者头像
我不是码神
发布2022-07-28 14:22:46
发布2022-07-28 14:22:46
72100
代码可运行
举报
文章被收录于专栏:流媒体技术流媒体技术
运行总次数:0
代码可运行

接上一篇

李宇翔:用Go语言实现ReactiveX(二)——Deliver

我们在上一篇,谈到了数据传递者Deliver。那么还差一个Subscriber没讲,这个实现其实已经没什么好讲的了,可以直接看源码。因为Deliver里面蕴含了对Observable的订阅过程,而Subscriber的主要功能就是这个,相当于去掉被订阅功能的Deliver。

Reactive 编程就是把Observable、Deliver、Subscriber串起来变成一个单向流动的数据管道。所以必须设计一个串起来的方式。

Pipe编程模式

RxJS 6.0 的时候引入了pipe模式。所以我们的实现是基于pipe模式的。

代码语言:javascript
代码运行次数:0
运行
复制
func Pipe(source Observable, cbs ...Deliver) Observable {
    for _, cb := range cbs {
        source = cb(source)
    }
    return source
}

这时候我们可以将使用这个函数来组合所有的Rx对象

代码语言:javascript
代码运行次数:0
运行
复制
Pipe(FromArray(...),Filter(...),...)

这个函数返回的仍然是Observable,所以可以继续使用Pipe

代码语言:javascript
代码运行次数:0
运行
复制
ob1:=Pipe(FromArray(...),Filter(...),...)
Pipe(ob1,Map(...),SwitchMap(...),...)

当然最后必须得有人订阅这个Observable

代码语言:javascript
代码运行次数:0
运行
复制
Subscribe(...)(observable)

这么设计的原因是golang是强类型语言,pipe无法兼容observer类型,除非有泛型。否则Subscriber就可以放到pipe函数参数末尾传入了。

下面我们回到标题说的链式编程的实现

链式编程实现

所谓链式编程,就是一个对象的方法返回值是对象自身,这样可以接着调用对象的其他方法,行程一个链条,Rx早期的实现都是这么做的。

最终我们可以如此调用:

代码语言:javascript
代码运行次数:0
运行
复制
rx.FromArray(...).Filter(...).Subscribe(...)

那么如何实现呢?

代码语言:javascript
代码运行次数:0
运行
复制
package rx
import (
    p "github.com/langhuihui/gorx/pipe"
)
type Observable struct {
    source p.Observable
}

我们所有的Observable和Deliver包括Subscriber以及Pipe函数等定义全部都在github.com/langhuihui/gorx/pipe这个包里面 那么我们在外层的rx包里面就定义上面这个Observable,名称是相同的,但在不同包里面。 在pipe包里面,Observable是一个函数,而在rx包里面Observable是一个结构体,目的是实现链式编程。这个结构体只有一个成员就是source,类型是pipe包里面的Observable。魔法就此展开了。

代码语言:javascript
代码运行次数:0
运行
复制
func FromArray(array []interface{}) *Observable {
    return &Observable{p.FromArray(array)}
}

当我们调用rx.FromArray(...)的时候,会返回一个rx.Observable 的对象指针,这个对象里面的source属性就是pipe包里面的FromArray函数调用后的Observable

当我们继续调用操作符Filter的时候,rx.FromArray(...).Filter(...),就会调用rx.Observable结构体的Filter方法,这时候我们只需要定义这个成员函数即可。

代码语言:javascript
代码运行次数:0
运行
复制
func (observable *Observable) Filter(f func(interface{}) bool) *Observable {
    return &Observable{p.Filter(f)(observable.source)}
}

其他操作符以此类推,我写了一个脚本用来生成一系列这个定义,省去手工抄写的重复劳动。 可以瞬间从源码生成一堆成员方法

代码语言:javascript
代码运行次数:0
运行
复制
//TakeUntil 
func (observable *Observable) TakeUntil(sSrc Observable, delivers ...p.Deliver) *Observable {
    return &Observable{p.TakeUntil(sSrc.source, delivers...)(observable.source)}
}

//TakeLast 
func (observable *Observable) TakeLast(count int) *Observable {
    return &Observable{p.TakeLast(count)(observable.source)}
}

//Skip 
func (observable *Observable) Skip(count int) *Observable {
    return &Observable{p.Skip(count)(observable.source)}
}

//SkipWhile 
func (observable *Observable) SkipWhile(f func(interface{}) bool) *Observable {
    return &Observable{p.SkipWhile(f)(observable.source)}
}

//SkipUntil 
func (observable *Observable) SkipUntil(sSrc Observable, delivers ...p.Deliver) *Observable {
    return &Observable{p.SkipUntil(sSrc.source, delivers...)(observable.source)}
}

链式编程就算大工告成了。下面就是愉快的Rx编程了。

代码语言:javascript
代码运行次数:0
运行
复制
import "github.com/langhuihui/gorx"
rx.Interval(1000).SkipUntil(rx.Of(1).Delay(3000)).Subscribe(func(x interface{}, dispose func()) {
        fmt.Print(x)
    }, nil, nil)
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2018-09-27,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Pipe编程模式
  • 链式编程实现
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档