前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >用Go语言实现ReactiveX(一)——Observable

用Go语言实现ReactiveX(一)——Observable

作者头像
我不是码神
发布2022-07-28 14:22:23
3510
发布2022-07-28 14:22:23
举报
文章被收录于专栏:流媒体技术

用Go语言实现ReactiveX有很大的挑战,Go语言本身没有类的继承,所以无法采用基类来做一些封装操作。不过好在Go语言是有闭包和匿名函数。所以可以实现ReactiveX https://github.com/langhuihui/GoRx

影响设计ReactiveX的要素

  • 没有类的继承
  • 有匿名函数
  • 有闭包
  • 强类型,没有泛型
  • goroutine代替异步

实现生产者Observable

  1. 发送数据
  2. 完成事件
  3. error事件
  4. 被订阅
  5. 被取消订阅

发送数据功能

有两种方式可以实现,一种是直接调用回调函数,和javascript一样。这种方式的局限性在于代码相对啰嗦,因为golang的函数定义必须是有类型的,会涉及到更多的类型断言的操作,匿名函数使用起来也比javascript的要更麻烦一些。第二种方式是采用channel来传递数据,这种方式更加go方式一点。所以我后来采取了第二种方式实现。(第一种也尝试过) 简而言之,核心就是一个chan interface{},一个无缓冲的channel用来发送数据。这个channel是由Observer传递进来的(类似于回调的概念)

代码语言:javascript
复制
type Next chan interface{}
Observable <------Next----- Observer     //subscribe
Observable
      Next-----data----> Observer       //next

被订阅

当Observable接收到用于发送数据的channel的时候,就是被订阅的时候。见上图。

完成事件

利用close一个channel会产生一个事件的方式进行触发。

代码语言:javascript
复制
Observable  close(Next)  ------> Observer              (complete)

Observer通过对channel读取操作,如果第二个参数返回false(channel已经被关闭)代表complete

代码语言:javascript
复制
data,ok:=<-next
if !ok{
//complete
}

error事件

由于golang对异常捕获目前上不健全,所以暂时就通过next channel发送错误对象,在Observer中对数据类型进行类型断言,如果是error类型,则认为收到了错误事件。

被取消订阅(dispose)

这个事件是由Observer向Observable发出的 我们定义了一个新的channel :chan bool。成为stop channel专门用来做这个事情,这个channel不发送任何数据,只用来close的时候广播这个事件。

代码语言:javascript
复制
type Stop chan bool

channel在close的时候,所有等待接受数据的goroutine均能接受到这个关闭事件,这是其他语言不具备的优势。

代码语言:javascript
复制
Obserable <-------Next、Stop---------- Observer  //subscribe
                  <--------- close stop ----------- Observer  //dispose

案例:FromArray

代码语言:javascript
复制
func FromArray(array []interface{}) Observable {
    return func(n Next, s Stop) {
        for _, item := range array {
            select {
            case <-s:
                return
            default:
                n <- item
            }
        }
        close(n)
    }
}

我们看到FromArray是一个函数,调用FromArray(数组或切片),会返回一个Observable。Observable是一个函数

代码语言:javascript
复制
type Observable func(Next, Stop)

我们遍历传入的数组或切片,然后向Next管道传入数组中的元素(n<-item),假如Stop被关闭,我们也能及时取消数据发送(case <-s:return)。 当所有数据发送完毕我们关闭Next管道,发出complete信号(close(n))。 (未完待续)

李宇翔:用Go语言实现ReactiveX(二)——Deliver李宇翔:用Go语言实现ReactiveX(三)——链式编程

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2018-09-27,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 影响设计ReactiveX的要素
  • 实现生产者Observable
  • 发送数据功能
  • 被订阅
  • 完成事件
  • error事件
  • 被取消订阅(dispose)
  • 案例:FromArray
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档