我对Rx很陌生。我想知道是否有可能向不同的订阅者发送消息,使它们在不同的线程上运行?IObserable如何控制它?简单的主题实现,据我所知,它在一个线程上一个接一个地调用订阅者。
public class Subsciber : IObserver<int>
{
public void OnNext(int a)
{
// Do something
}
public void OnError(Exception e)
{
// Do something
}
public void OnCompeleted()
{
}
}
public static class Program
{
public void static Main()
{
var observable = new <....SomeClass....>();
var sub1 = new Subscriber();
var sub2 = new Subscriber();
observable.Subscribe(sub1);
observable.Subscribe(sub2);
// some waiting function
}
}如果我使用Subject作为'SomeClass',那么在Sub1的OnNext()完成之前,不会调用Sub2的OnNext()。如果sub1需要很长时间,我不希望它延迟Sub2的接收。有人能告诉我Rx是如何允许SomeClass实现的吗?
发布于 2011-10-19 13:50:58
您编写的代码几乎可以并行运行可观察到的代码。如果你把你的观察者写成这样:
public class Subscriber : IObserver<int>
{
public void OnNext(int a)
{
Console.WriteLine("{0} on {1} at {2}",
a,
Thread.CurrentThread.ManagedThreadId,
DateTime.Now.ToString());
}
public void OnError(Exception e)
{ }
public void OnCompleted()
{ }
} 然后运行以下代码:
var observable =
Observable
.Interval(TimeSpan.FromSeconds(1.0))
.Select(x => (int)x)
.Take(5)
.ObserveOn(Scheduler.ThreadPool);
var sub1 = new Subscriber();
var sub2 = new Subscriber();
observable.Subscribe(sub1);
observable.Subscribe(sub2);
Thread.Sleep(10000);将编制下列文件:
0 on 28 at 2011/10/20 00:13:49
0 on 16 at 2011/10/20 00:13:49
1 on 29 at 2011/10/20 00:13:50
1 on 22 at 2011/10/20 00:13:50
2 on 27 at 2011/10/20 00:13:51
2 on 29 at 2011/10/20 00:13:51
3 on 27 at 2011/10/20 00:13:52
3 on 19 at 2011/10/20 00:13:52
4 on 27 at 2011/10/20 00:13:53
4 on 27 at 2011/10/20 00:13:53它已经在不同的线程上并行运行订阅。
我使用的最重要的是.ObserveOn扩展方法--这就是实现这一工作的原因。
您应该记住,观察者通常不会共享相同的可观测结果实例。订阅一个可观测的有效连接一个独特的可观测运算符的“链”从可观测的来源到观察者。这与在可枚举上两次调用GetEnumerator非常相似,您将不会共享相同的枚举器实例,您将得到两个唯一的实例。
现在,我想描述一下我所说的链子的意思。我将给出从Reflector.NET & Observable.Where中提取的代码来说明这一点。
以以下代码为例:
var xs = Observable.Generate(0, x => x < 10, x => x + 1, x => x);
var ys = xs.Where(x => x % 2 == 0);
ys.Subscribe(y => { /* produces 0, 2, 4, 6, 8 */ });在幕后,Generate和Where各自创建一个内部Rx类AnonymousObservable<T>的新实例。AnonymousObservable<T>的构造函数接受一个Func<IObserver<T>, IDisposable>委托,每当它收到对Subscribe的调用时就使用它。
Observable.Generate<T>(...)从Reflector.NET中略为清理的代码如下:
public static IObservable<TResult> Generate<TState, TResult>(
TState initialState,
Func<TState, bool> condition,
Func<TState, TState> iterate,
Func<TState, TResult> resultSelector,
IScheduler scheduler)
{
return new AnonymousObservable<TResult>((IObserver<TResult> observer) =>
{
TState state = initialState;
bool first = true;
return scheduler.Schedule((Action self) =>
{
bool flag = false;
TResult local = default(TResult);
try
{
if (first)
{
first = false;
}
else
{
state = iterate(state);
}
flag = condition(state);
if (flag)
{
local = resultSelector(state);
}
}
catch (Exception exception)
{
observer.OnError(exception);
return;
}
if (flag)
{
observer.OnNext(local);
self();
}
else
{
observer.OnCompleted();
}
});
});
}Action self参数是迭代输出值的递归调用。您会注意到,在这段代码中没有任何地方存储observer,或者将值粘贴到多个观察者。此代码对每个新的观察者运行一次。
Observable.Where<T>(...)从Reflector.NET中略为清理的代码如下:
public static IObservable<TSource> Where<TSource>(
this IObservable<TSource> source,
Func<TSource, bool> predicate)
{
return new AnonymousObservable<TSource>(observer =>
source.Subscribe(x =>
{
bool flag;
try
{
flag = predicate(x);
}
catch (Exception exception)
{
observer.OnError(exception);
return;
}
if (flag)
{
observer.OnNext(x);
}
}, ex => observer.OnError(ex), () => observer.OnCompleted));
}同样,此代码不跟踪多个观察者。它调用Subscribe有效地将自己的代码作为观察者传递到底层的source中。
您应该看到,在我上面的示例代码中,订阅Where创建了对Generate的订阅,因此这是一个可观察的链。实际上,它正在链接订阅一系列AnonymousObservable对象的调用。
如果您有两个订阅,则有两个链。如果您有1,000个订阅,则有1,000条链。
现在,顺便提一下--尽管有IObservable<T>和IObserver<T>接口--您应该很少在自己的类中实际实现这些接口。内置类和运算符处理99.99%的所有情况.这有点像IEnumerable<T> --您需要多久自己实现这个接口?
请告诉我这是否有帮助,如果你需要进一步的解释。
发布于 2011-10-19 16:33:24
如果您有一个IObservable,并且需要强制在另一个线程上运行订阅,那么您可以使用ObserveOn函数。
如果运行以下代码,它将强制数字生成器在不同的线程上下文中运行。您还可以使用EventLoopScheduler并指定要使用的System.Thread、设置优先级、设置名称等.
void Main()
{
var numbers = Observable.Interval(TimeSpan.FromMilliseconds(100));
var disposable = new CompositeDisposable()
{
numbers.ObserveOn(Scheduler.TaskPool).Subscribe(x=> Console.WriteLine("TaskPool: "+ Thread.CurrentThread.ManagedThreadId)),
numbers.ObserveOn(Scheduler.ThreadPool).Subscribe(x=> Console.WriteLine("ThreadPool: "+ Thread.CurrentThread.ManagedThreadId)),
numbers.ObserveOn(Scheduler.Immediate).Subscribe(x=> Console.WriteLine("Immediate: "+ Thread.CurrentThread.ManagedThreadId))
};
Thread.Sleep(1000);
disposable.Dispose();
}输出
Immediate: 10
ThreadPool: 4
TaskPool: 20
TaskPool: 4
ThreadPool: 24
Immediate: 27
Immediate: 10
TaskPool: 24
ThreadPool: 27
Immediate: 24
TaskPool: 26
ThreadPool: 20
Immediate: 26
ThreadPool: 24
TaskPool: 27
Immediate: 28
ThreadPool: 27
TaskPool: 26
Immediate: 10请注意,我是如何使用CompositeDisposable在末尾释放所有订阅的。例如,如果在LinqPad中不这样做的话。Observable.Interval将继续在内存中运行,直到您终止进程为止。
https://stackoverflow.com/questions/7821404
复制相似问题