我对Rx很陌生。我想知道是否有可能向不同的订阅者发送消息,使它们在不同的线程上运行?IObserable如何控制它?简单的主题实现,据我所知,它在一个线程上一个接一个地调用订阅者。
public class Subsciber : IObserver<int>
{
public void OnNext(int a)
{
// Do something
}
public void OnError(Exception e)
{
// Do something
}
public void OnCompeleted()
{
}
}
public static class Program
{
public void static Main()
{
var observable = new <....SomeClass....>();
var sub1 = new Subscriber();
var sub2 = new Subscriber();
observable.Subscribe(sub1);
observable.Subscribe(sub2);
// some waiting function
}
}如果我使用Subject作为'SomeClass',那么在Sub1的OnNext()完成之前,不会调用Sub2的OnNext()。如果sub1需要很长时间,我不希望它延迟Sub2的接收。有人能告诉我Rx是如何允许SomeClass实现的吗?
发布于 2011-10-19 16:33:24
如果您有一个IObservable,并且需要强制在另一个线程上运行订阅,那么您可以使用ObserveOn函数。
如果运行以下代码,它将强制数字生成器在不同的线程上下文中运行。您还可以使用EventLoopScheduler并指定要使用的System.Thread、设置优先级、设置名称等.
void Main()
{
var numbers = Observable.Interval(TimeSpan.FromMilliseconds(100));
var disposable = new CompositeDisposable()
{
numbers.ObserveOn(Scheduler.TaskPool).Subscribe(x=> Console.WriteLine("TaskPool: "+ Thread.CurrentThread.ManagedThreadId)),
numbers.ObserveOn(Scheduler.ThreadPool).Subscribe(x=> Console.WriteLine("ThreadPool: "+ Thread.CurrentThread.ManagedThreadId)),
numbers.ObserveOn(Scheduler.Immediate).Subscribe(x=> Console.WriteLine("Immediate: "+ Thread.CurrentThread.ManagedThreadId))
};
Thread.Sleep(1000);
disposable.Dispose();
}输出
Immediate: 10
ThreadPool: 4
TaskPool: 20
TaskPool: 4
ThreadPool: 24
Immediate: 27
Immediate: 10
TaskPool: 24
ThreadPool: 27
Immediate: 24
TaskPool: 26
ThreadPool: 20
Immediate: 26
ThreadPool: 24
TaskPool: 27
Immediate: 28
ThreadPool: 27
TaskPool: 26
Immediate: 10请注意,我是如何使用CompositeDisposable在末尾释放所有订阅的。例如,如果在LinqPad中不这样做的话。Observable.Interval将继续在内存中运行,直到您终止进程为止。
https://stackoverflow.com/questions/7821404
复制相似问题