首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Rx.NET 简介

Rx.NET 简介

作者头像
solenovex
发布2018-04-17 15:14:46
3.1K0
发布2018-04-17 15:14:46
举报
文章被收录于专栏:草根专栏草根专栏

官网: http://reactivex.io/

它支持基本所有的主流语言.

这里我简单介绍一下Rx.NET.

基本概念和RxJS是一样的.

下面开始切入正题.

Rx.NET总览

Rx.NET总体上看可以分为三个部分:

  • 核心部分: Observables, Observers和Subjects
  • LINQ和扩展, 用于查询和过滤Observables
  • 并发和调度的支持

.NET Core的Events

.net core里面的event是通过委托对观察者模式的实现.

但是event在.net core里面并不是头等公民:

  • 人们对它的语法+=评价是褒贬不一的.
  • 很难进行传递和组合
  • 很难进行event的连串(chaining)和错误处理(尤其是同一个event有多个handler的时候)
  • event并没有历史记录

举个例子:

鼠标移动这个事件(event), 鼠标移动的时候会触发该事件, 这些事件会进入某个管道并记录该鼠标的坐标, 这样就会产生一个数据的集合/序列/流.

这里我们就是构建了一个基于时间线的鼠标坐标的序列, 每一次触发事件就会在这个管道上产生一个新的值. 在另一端, 一旦管道上有了新的值, 那么管道的观察者就会得到通知, 这些观察者通过提供回调函数的方式来注册到该管道上. 管道每次更新的时候, 这些回调函数就会被调用, 从而刷新了观察者的数据.

这个例子里, Observable就是管道, 一系列的值在这里被生成. Observer(观察者)在Observable有新的值的时候会被通知.

核心接口

IObservable:

  • Subscribe(IObserver<T> observer)

IObserver

  • void OnNext<T>(T value), 序列里有新的值的时候会调用这个
  • void OnCompleted(), 序列结束的时候调用这个
  • void OnError(Exception ex), 发生错误的时候调用这个

这个和RxJS基本是一样的.

Marble图

可以通过marble图来理解Rx

这图表示的是IObserver, 每当有新的值在Observable出现的时候, 传递到IObservable的Subscribe方法的参数IObserver的OnNext方法就会调用. 发生错误的话 OnError方法就会调用, 整个流也就结束了. 没有错误的话, 走到结束就会调用OnComplete方法. 不过有些Observable是不会结束的.

Observable.Subscribe()返回的Subscription对象被Dispose后, Observer就无法收到新的数据了.

创建Observable流/序列

 创建流/序列的方式:

  • 返回简单的值
  • 包装现有的值
  • 写一个生成函数

简单的Observables

  • Observable.Empty 返回一个直接结束的Obsevable序列
  • Observable.Never 返回一个没有值, 且永远不会结束的序列
  • Observable.Throw(exception), 返回一个带有错误的序列
  • Observable.Return(xxx) 返回单值的序列

包装Observables

可以包装下面这些来返回Observable:

  • Action
    • Observable.Start(() => 42) 返回一个含有42的序列, 并在Action结束的时候, OnComplete方法被调用.
  • Task
    • Task.ToObservable() 使用这个扩展方法进行包装, 当Task结束的时候, Observable推送新的数据, 然后结束
  • IEnumerable
    • ienumerable.ToObservable() 也是扩展方法, ienumerable的每个值都会作为新的值被推送到Observable上, 最后结束OnComplete
  • Event
    • Observable.FromEventPattern(obj, "xxChanged") 这是个工厂方法, 需要提供触发event的对象和event的名字.

生成函数

  • Range
  • Interval, Timer
  • Create(低级), Generate

看图解释:

Observable.Range(1, 4):

Observable.Interval(200):

Observable.Timer(200, () => 42):

            Observable.Create<int>(o =>
            {
                o.OnNext(42);
                o.OnComplete();
                return Disposable.Empty;
            });
Observable.Generate(1,
value => value < 5,
value => value + 1,
value => value);

例子

using System;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Threading.Tasks;
using System.Threading.Tasks;

namespace Test
{
    class Program
    {
        static void Main(string[] args)
        {
            var sequence = GetTaskObservable();
            sequence.Subscribe
            (
                x => Console.WriteLine($"OnNext: {x}"),
                ex => Console.WriteLine($"OnError: {ex}"),
                () => Console.WriteLine("OnCompleted")
            );
            Console.ReadKey();
        }

        private static IObservable<int> GetSimpleObservable()
        {
            return Observable.Return(42);
        }

        private static IObservable<int> GetThrowObservable()
        {
            return Observable.Throw<int>(new ArgumentException("Error in observable"));
        }

        private static IObservable<int> GetEmptyObservable()
        {
            return Observable.Empty<int>();
        }

        private static IObservable<int> GetTaskObservable()
        {
            return GetTask().ToObservable();
        }

        private static async Task<int> GetTask()
        {
            return 42;
        }

        private static IObservable<int> GetRangeObservable()
        {
            return Observable.Range(2, 10);
        }

        private static IObservable<long> GetIntervalObservable()
        {
            return Observable.Interval(TimeSpan.FromMilliseconds(200));
        }

        private static IObservable<int> GetCreateObservable()
        {
            return Observable.Create<int>(observer =>
            {
                observer.OnNext(1);
                observer.OnNext(2);
                observer.OnNext(3);
                observer.OnNext(4);
                observer.OnCompleted();
                return Disposable.Empty;
            });
        }

        private static IObservable<int> GetGenerateObservable()
        {
            return Observable.Generate(
                1,
                x => x < 5,
                x => x + 1,
                x => x
            );
        }
    }
}

请自行运行查看结果.

Cold 和 Hot Observable

Cold: Observable可以为每个Subscriber创建新的数据生产者

Hot: 每个Subscriber从订阅的时候开始在同一个数据生产者那里共享其余的数据.

从原理来说是这样的: Cold内部会创建一个新的数据生产者, 而Hot则会一直使用外部的数据生产者.

举个例子:

Cold: 就相当于我在腾讯视频买体育视频会员, 可以从头看里面的足球比赛.

Hot: 就相当于看足球比赛的现场直播, 如果来晚了, 那么前面就看不到了.

把Cold 变 Hot, 使用.Publish()方法.

把Hot 变 Cold, 使用.Subscribe()方法把它变成Subject即可.

过滤和控制序列

LINQ操作符

操作符的类型:

  • 过滤
  • 合并
  • 聚合
  • 工具

过滤

sequence.Where(x => x % 2 == 0):

.OfType<Square>():

移除重复的:

.Distinct():

.DistinctUntilChanged():

过滤头尾元素:

.Take(2)  .Skip(2):

.SkipLast(2)     .TakeLast(2):

序列的阀:

a.TakeUnit(b)l   a.SkipUntil(b):

实际例子: 把鼠标移动和点击转化为拖拽:

代码非常的简单:

var mouseDrags = mouseMoves.SkipUntil(mouseDowns).TakeUnit(mouseUps);

合并

a.Merge(b)

a.Amb(b), 其中的amb是ambiguous的缩写:

a.Concat(b):

为序列配对:

a.CombineLatest(b, (x, y) => x + y):

a.Zip(b, (x, y) => x +  y):

序列的序列:

Merge()是可以达到这种效果的:

.Switch():

聚合

聚合就是指把序列聚合成一个值, 在序列结束后才能返回值

Count() Sum():

Aggregate():

Scan():

其他工具操作符

会有一些副作用

 .Do(x => Log(x)): 但是记住不要改变序列的元素

.TimeStamp():

.Throttle(TimeSpan.FromSeconds(1))

异步和多线程

异步就表示不一定按顺序执行, 但是它可以保证非阻塞, 通常会有回调函数(或者委托或者async await).

但是异步对于Rx来说就是它的本性

Rx的同步异步对比:

多线程

Rx不是多线程的, 但是它是线程自由的(就是可以使用多个线程), 它被设计成只是用必须的线程而已.

多线程表示, 同时有多个线程在执行. 也可以称作并发. 它可以分担计算量. 但是据需要考虑线程安全了.

Rx已经做了一些抽象, 所以不必过多的考虑线程安全了.

例如: 

Observable.Interval(TimeSpan.FromSeconds(1)).Subscribe(xxx):

UI的例子:

Observable.Interval(TimeSpan.FromSeconds(1)).ObserveOn(SynchronizationContext.Current).Subscribe(t => searchBox.Text = t.ToString()):

如果计算量比较大的话:

Observable.Create(大量工作).Subscribe(xxx):

UI假死, 这就不好了.

应该这样:

Observable.Create(大量工作).SubscribeOn(NewThreadScheduler.Default).ObserveOn(SynchronizationContext.Current).Subscribe(xxx):

Schedulers

Scheduler可以在Rx里面安排执行动作. 它使用IScheduler接口.

现在就可以把Scheduler理解为是对未来执行的一个抽象.

它同时也负责着Rx所有的并发工作.

Rx提供了很多Scheduler.

下面是.net现有有很多种在未来执行动作的方法:

Rx里面就这个:

IScheduler接口:

基本上不用直接去使用IScheduler, 因为内置了很多现成的Schedulers了:

  • Immediate, 这是唯一一个不是异步的Scheduler
  • CurrentThread
  • EventLoop
  • Dispatcher
  • NewThread 
  • TaskPool, ThreadPool

Schedulers实际上到处都使用着:

应该用哪个Scheduler?

Fake Scheduler:

用于测试

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018-04-04 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Rx.NET总览
    • .NET Core的Events
      • 核心接口
        • Marble图
        • 创建Observable流/序列
          • 简单的Observables
            • 包装Observables
              • 生成函数
                • 例子
                  • Cold 和 Hot Observable
                  • 过滤和控制序列
                    • LINQ操作符
                      • 过滤
                        • 合并
                          • 聚合
                            • 其他工具操作符
                            • 异步和多线程
                              • 多线程
                                • Schedulers
                                相关产品与服务
                                云直播
                                云直播(Cloud Streaming Services,CSS)为您提供极速、稳定、专业的云端直播处理服务,根据业务的不同直播场景需求,云直播提供了标准直播、快直播、云导播台三种服务,分别针对大规模实时观看、超低延时直播、便捷云端导播的场景,配合腾讯云视立方·直播 SDK,为您提供一站式的音视频直播解决方案。
                                领券
                                问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档