前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >rxgo笔记

rxgo笔记

作者头像
超级大猪
发布2019-11-22 09:33:05
8310
发布2019-11-22 09:33:05
举报
文章被收录于专栏:大猪的笔记大猪的笔记

使用chan做subject

代码语言:javascript
复制
package main

import (
    "fmt"
    "time"

    "github.com/reactivex/rxgo/handlers"
    "github.com/reactivex/rxgo/iterable"
    "github.com/reactivex/rxgo/observable"
)

func main() {
    itChan := make(chan interface{})
    defer close(itChan)

    it, _ := iterable.New(itChan)

    go func() {
        <-observable.From(it).
            Subscribe(handlers.NextFunc(func(v interface{}) {
                if num, ok := v.(int); ok {
                    fmt.Println(num)
                }
            }))
    }()

    itChan <- 1
    time.Sleep(1 * time.Second)
}

扩充原来的operators

包装包装即可,实现了简单的sum, flatmap :

代码语言:javascript
复制
package main

import (
    "fmt"

    "github.com/reactivex/rxgo/fx"
    "github.com/reactivex/rxgo/handlers"
    "github.com/reactivex/rxgo/iterable"
    "github.com/reactivex/rxgo/observable"
    "github.com/reactivex/rxgo/observer"
)

type MyObservable struct {
    observable.Observable
}

func (o MyObservable) Sum(apply fx.MappableFunc) MyObservable {
    out := make(chan interface{})
    go func() {
        sum := 0.0
        for item := range o.Observable {
            retItem := apply(item)
            switch v := retItem.(type) {
            case int:
                sum += float64(v)
            case float64:
                sum += float64(v)
            }
        }
        out <- sum
        close(out)
    }()
    return MyObservable{observable.Observable(out)}
}

type FlatMappableFunc func(interface{}) observable.Observable

// FlatMap 参考https://github.com/ReactiveX/RxGo/issues/49
func (o MyObservable) FlatMap(apply FlatMappableFunc) MyObservable {
    out := make(chan interface{})
    go func() {
        for item := range o.Observable {
            go func(sub observable.Observable) {
                handler := observer.Observer{
                    NextHandler: func(i interface{}) {
                        out <- i
                    },
                    ErrHandler: func(err error) {
                        out <- err
                    },
                }
                s := sub.Subscribe(handler)
                <-s
            }(apply(item))
        }
        close(out)
    }()
    return MyObservable{observable.Observable(out)}
}

func ToMyObv(obv interface{}) MyObservable {
    switch v := obv.(type) {
    case observable.Observable:
        return MyObservable{v}
    case iterable.Iterable:
        return MyObservable{observable.From(v)}
    }
    return MyObservable{}
}

func main() {

    it, _ := iterable.New([]interface{}{1, 2, 3, 4, 5})
    <-ToMyObv(it).
        Sum(func(v interface{}) interface{} {
            return v
        }).
        Map(func(v interface{}) interface{} {
            return v.(float64) + 1
        }).
        Subscribe(handlers.NextFunc(func(v interface{}) {
            fmt.Println(v)
        }))
}
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018-09-23 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 使用chan做subject
  • 扩充原来的operators
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档