首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >是否有可能在Rx中的不同线程上调用订阅者的OnNexts?

是否有可能在Rx中的不同线程上调用订阅者的OnNexts?
EN

Stack Overflow用户
提问于 2011-10-19 12:36:35
回答 2查看 5.1K关注 0票数 5

我对Rx很陌生。我想知道是否有可能向不同的订阅者发送消息,使它们在不同的线程上运行?IObserable如何控制它?简单的主题实现,据我所知,它在一个线程上一个接一个地调用订阅者。

代码语言:javascript
复制
public class Subsciber : IObserver<int>
{
    public void OnNext(int a)
    {
        // Do something
    }
    public void OnError(Exception e)
    {
        // Do something
    }
    public void OnCompeleted()
    {
    }

} 

public static class Program
{
   public void static Main()
   {
       var observable = new <....SomeClass....>();
       var sub1 = new Subscriber();
       var sub2 = new Subscriber();
       observable.Subscribe(sub1);
       observable.Subscribe(sub2);
       // some waiting function 
   }
}

如果我使用Subject作为'SomeClass',那么在Sub1的OnNext()完成之前,不会调用Sub2的OnNext()。如果sub1需要很长时间,我不希望它延迟Sub2的接收。有人能告诉我Rx是如何允许SomeClass实现的吗?

EN

Stack Overflow用户

发布于 2011-10-19 16:33:24

如果您有一个IObservable,并且需要强制在另一个线程上运行订阅,那么您可以使用ObserveOn函数。

如果运行以下代码,它将强制数字生成器在不同的线程上下文中运行。您还可以使用EventLoopScheduler并指定要使用的System.Thread、设置优先级、设置名称等.

代码语言:javascript
复制
void Main()
{
    var numbers = Observable.Interval(TimeSpan.FromMilliseconds(100));

    var disposable = new CompositeDisposable()
    {
       numbers.ObserveOn(Scheduler.TaskPool).Subscribe(x=> Console.WriteLine("TaskPool: "+ Thread.CurrentThread.ManagedThreadId)),
       numbers.ObserveOn(Scheduler.ThreadPool).Subscribe(x=> Console.WriteLine("ThreadPool: "+ Thread.CurrentThread.ManagedThreadId)),
       numbers.ObserveOn(Scheduler.Immediate).Subscribe(x=> Console.WriteLine("Immediate: "+ Thread.CurrentThread.ManagedThreadId))
    };

    Thread.Sleep(1000);
    disposable.Dispose();
}

输出

代码语言:javascript
复制
Immediate: 10
ThreadPool: 4
TaskPool: 20
TaskPool: 4
ThreadPool: 24
Immediate: 27
Immediate: 10
TaskPool: 24
ThreadPool: 27
Immediate: 24
TaskPool: 26
ThreadPool: 20
Immediate: 26
ThreadPool: 24
TaskPool: 27
Immediate: 28
ThreadPool: 27
TaskPool: 26
Immediate: 10

请注意,我是如何使用CompositeDisposable在末尾释放所有订阅的。例如,如果在LinqPad中不这样做的话。Observable.Interval将继续在内存中运行,直到您终止进程为止。

票数 2
EN
查看全部 2 条回答
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/7821404

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档