所以刚到RX的家伙们我有个问题。
在我学习野兽的探险中,也就是RXJava,这是我的课程。
public class PollingLoop {
public static <T> Observable<T> buildObservable(
final int interval,
final TimeUnit timeUnit,
final int maxJitter,
final Scheduler scheduler,
final Supplier<Observable<T>> scheduledTask) {
if (maxJitter <= 0) throw new IllegalArgumentException("Jitter must be greater than 0");
final Random randomJitter = new Random();
return Observable.timer(interval, timeUnit, scheduler)
.map(x -> {
System.out.println("Flat map jitter");
return randomJitter.nextInt(maxJitter);
})
.flatMap(jitter -> {
System.out.println("Flat map timer");
return Observable.timer(jitter, timeUnit, scheduler);
})
.flatMap(ignored -> {
System.out.println("Flat map task");
return scheduledTask.get();
})
.retry()
.repeat();
}
public static <T> Completable buildCompletable(
final int interval,
final TimeUnit timeUnit,
final int maxJitter,
final Scheduler scheduler,
final Supplier<Completable> scheduledTask) {
if (maxJitter <= 0) throw new IllegalArgumentException("Jitter must be greater than 0");
final Random randomJitter = new Random();
return Observable.timer(interval, timeUnit, scheduler)
.map(x -> {
System.out.println("Flat map jitter");
return randomJitter.nextInt(maxJitter);
})
.flatMapCompletable(jitter -> {
System.out.println("Flat map timer");
return Completable.timer(jitter, timeUnit, scheduler);
})
.flatMapCompletable(ignored -> {
System.out.println("Flat map task that is not called");
return scheduledTask.get();
})
.retry()
.repeat()
.toCompletable();
}
}
从测试中,当我测试可观察到的执行延迟时,我得到了输出。
Flat map jitter
Flat map timer
Flat map task //(observable is being called)
但是当我测试可完全执行的延迟时,我得到了输出。
Flat map jitter
Flat map timer
//(未调用可完成的任务)
我做错什么了?为什么不能从buildCompletable内部调用可完成的任务?
这是测试(用斯波克写的)
def 'should delay execution of observable'() {
given:
def subscriber = new TestSubscriber<>()
def scheduler = new TestScheduler()
def supplier = Mock Supplier
supplier.get() >> Observable.just(true)
when:
PollingLoop.buildObservable(100, TimeUnit.MILLISECONDS, 1, scheduler, supplier).subscribe(subscriber)
scheduler.advanceTimeBy(101, TimeUnit.MILLISECONDS)
then:
subscriber.assertValueCount(1)
subscriber.assertValue(true)
}
def 'should delay execution of completable'(){
given:
def subscriber = new TestSubscriber<>()
def scheduler = new TestScheduler()
def supplier = Mock Supplier
supplier.get() >> Completable.complete()
when:
PollingLoop.buildCompletable(100, TimeUnit.MILLISECONDS, 1, scheduler, supplier).subscribe(subscriber)
scheduler.advanceTimeBy(1001, TimeUnit.MILLISECONDS)
enter code here
then:
1 * supplier.get()
}
发布于 2018-06-28 06:08:22
您的第一个flatMapCompletable()
的结果是可完成的,因为这就是您要返回的内容。但是,可完成性将永远不会发出一个值(根据定义),因此后续的flatMapCompletable()
没有映射的值。
由于第一个Completable
不发出值,因此需要使用andThen()
运算符或类似的方法绑定下一步。
您的代码编译是因为flatMapCompletable()
运算符具有Observable<Long>
签名。您需要将andThen()
操作符放入flatMapCompletable()
函数中。
https://stackoverflow.com/questions/51045579
复制相似问题