rxgo笔记

使用chan做subject

package main

import (
    "fmt"
    "time"

    "github.com/reactivex/rxgo/handlers"
    "github.com/reactivex/rxgo/iterable"
    "github.com/reactivex/rxgo/observable"
)

func main() {
    itChan := make(chan interface{})
    defer close(itChan)

    it, _ := iterable.New(itChan)

    go func() {
        <-observable.From(it).
            Subscribe(handlers.NextFunc(func(v interface{}) {
                if num, ok := v.(int); ok {
                    fmt.Println(num)
                }
            }))
    }()

    itChan <- 1
    time.Sleep(1 * time.Second)
}

扩充原来的operators

包装包装即可,实现了简单的sum, flatmap :

package main

import (
    "fmt"

    "github.com/reactivex/rxgo/fx"
    "github.com/reactivex/rxgo/handlers"
    "github.com/reactivex/rxgo/iterable"
    "github.com/reactivex/rxgo/observable"
    "github.com/reactivex/rxgo/observer"
)

type MyObservable struct {
    observable.Observable
}

func (o MyObservable) Sum(apply fx.MappableFunc) MyObservable {
    out := make(chan interface{})
    go func() {
        sum := 0.0
        for item := range o.Observable {
            retItem := apply(item)
            switch v := retItem.(type) {
            case int:
                sum += float64(v)
            case float64:
                sum += float64(v)
            }
        }
        out <- sum
        close(out)
    }()
    return MyObservable{observable.Observable(out)}
}

type FlatMappableFunc func(interface{}) observable.Observable

// FlatMap 参考https://github.com/ReactiveX/RxGo/issues/49
func (o MyObservable) FlatMap(apply FlatMappableFunc) MyObservable {
    out := make(chan interface{})
    go func() {
        for item := range o.Observable {
            go func(sub observable.Observable) {
                handler := observer.Observer{
                    NextHandler: func(i interface{}) {
                        out <- i
                    },
                    ErrHandler: func(err error) {
                        out <- err
                    },
                }
                s := sub.Subscribe(handler)
                <-s
            }(apply(item))
        }
        close(out)
    }()
    return MyObservable{observable.Observable(out)}
}

func ToMyObv(obv interface{}) MyObservable {
    switch v := obv.(type) {
    case observable.Observable:
        return MyObservable{v}
    case iterable.Iterable:
        return MyObservable{observable.From(v)}
    }
    return MyObservable{}
}

func main() {

    it, _ := iterable.New([]interface{}{1, 2, 3, 4, 5})
    <-ToMyObv(it).
        Sum(func(v interface{}) interface{} {
            return v
        }).
        Map(func(v interface{}) interface{} {
            return v.(float64) + 1
        }).
        Subscribe(handlers.NextFunc(func(v interface{}) {
            fmt.Println(v)
        }))
}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Python笔记:装饰器(面向切面)

    http://www.cnblogs.com/rhcad/archive/2011/12/21/2295507.html

    超级大猪
  • go: 插件系统

    参考文章: https://toutiao.io/posts/adjoci/preview

    超级大猪
  • go 自定义排序

    超级大猪
  • package sync

    sync包提供了基本的同步基元,如互斥锁。除了Once和WaitGroup类型,大部分都是适用于低水平程序线程,高水平的同步使用channel通信更好一些。

    李海彬
  • 造势还是来真的?Apple计划在未来三年回购50%股份

    T客汇官网:tikehui.com 撰文 | 张珅健 有消息称,Apple公司管理层计划在未来三年内回购50%的股份。由于近年来iPhone业务的稳定盈利,服务...

    人称T客
  • 安全服务之安全基线及加固(三)Apache篇

    安全服务工程师大家应该都知道,对于他的岗位职责你可能会说不就是渗透测试啊、应急响应嘛.....实际上正式一点的企业对于安服的要求是包括了漏洞扫描、安全基线...

    7089bAt@PowerLi
  • LNMP自动添加vhost脚本(功能蛮强大的)

    老七Linux
  • BFE.dev前端刷题 23. 实现一个sum()方法

    首先完成function的部分。上述的1和2告诉我们需要用一个变量来存储当前的和,这个“和”需要可以在返回的function中可以使用。为了简单,可以直接当作第...

    JSer
  • Python: 受限制的 "函数调用"

    函数功能简单明了, 对于结果, 大家应该也不会有太大的异议:func分别是取得全局命名空间中a的值和使用内置命名空间中的函数id获取了a的地址. 熟悉Pytho...

    Lin_R
  • 浏览器内核之 CSS 解释器和样式布局

    此文章是我最近在看的【WebKit 技术内幕】一书的一些理解和做的笔记。 而【WebKit 技术内幕】是基于 WebKit 的 Chromium 项目的讲解。

    夜尽天明

扫码关注云+社区

领取腾讯云代金券