我想知道如何将一个可观察到的类似于switchMap的流转换成多个(有限)流,而不是局限于单个活动流。
其目的是使多个任务同时工作,达到某些任务计数的限制,并允许使用FIFO队列策略启动新任务,这意味着任何新任务都将立即启动,队列中最老的任务将被取消。
switchMap将为源的每个发射创建可观测的,并将取消以前运行的可观测流(一旦创建了新的),我希望实现类似的目标,但允许具有某种级别的并发性(如flatMap),这意味着允许为每个发射创建可观测的数量,并同时运行到某个并发极限,当达到并发极限时,最古老的可观测值将被取消,新的可观测流将启动。
实际上,这与flatMap与maxConcurrent类似,但是当到达maxConcurrent时,不再等待新的可观测值,而是取消旧的可观测值并立即输入新的。
发布于 2017-01-31 07:43:06
虽然已准备好的解决方案不可用,但下面这样的内容应该会有所帮助。
public static void main(String[] args) {
Observable.create(subscriber -> {
for (int i = 0; i < 5; i++) {
Observable.timer(i, TimeUnit.SECONDS).toBlocking().subscribe();
subscriber.onNext(i);
}
})
.switchMap(
n -> {
System.out.println("Main task emitted event - " + n);
return Observable.interval(1, TimeUnit.SECONDS).take((int) n * 3)
.doOnUnsubscribe(() -> System.out.println("Unsubscribed for main task event - "+ n));
}).subscribe(n2 -> System.out.println("\t" + n2));
Observable.timer(20, TimeUnit.SECONDS).toBlocking().subscribe();
}Observable.create部分创建了一个缓慢的生成器,它以发射0、睡眠1s和发出1、睡眠2s和发出2等方式发布项目。
switchMap为每秒钟发出数字的每个元素创建Observable对象。您还可以注意到,每当主Observable发出一个元素时,以及当它取消订阅时,它都会打印一行。
因此,在您的示例中,您可能对使用doOnUnsubscribe关闭最老的任务感兴趣。希望能帮上忙。
下面的伪代码可能有助于更好地理解。
getTaskObservable()
.switchMap(
task -> {
System.out.println("Main task emitted event - " + task);
return Observable.create(subscriber -> {
initiateTaskAndNotify(task, subscriber);
}).doOnUnsubscribe(() -> checkAndKillIfMaxConcurrentTasksReached(task));
}).subscribe(value -> System.out.println("Done with task and got output" + value));https://stackoverflow.com/questions/41931175
复制相似问题