我正在使用一个实现长轮询循环的API,因为服务器在任意时间内(比如在0到5s之间)保持连接打开,然后在有新消息要传输时立即返回响应。
目前,我的代码每隔5秒发送一次请求,而不管服务器何时响应。如果连续有3个请求,并且服务器在1s,2s,3s内响应,那么目前我将发送3个请求5s,5s,5s,总共~15s,而理想情况下,我希望所有请求都在6s (1 +2+ 3)内发生。
Rx.interval(5000)
.pipe(
Rx.concatMap(() => httpClient.get('/api/messages')),
retry(8000)
)
.subscribe((data) => handleResponse(data));
如果这是一个承诺,我会写下这样的东西
const fetchRequest = httpClient.get('/api/messages').toPromise()
.then(data => {handleResponse(data); fetchRequest()});
但在我的例子中,我最终需要返回一个Obvservable。
从RxJS文档来看,似乎最接近我想要的是retryWhen
,它可以工作,但在我看来在语义上是错误的,因为它将迫使我引发一个Error
来保持循环。
httpClient.get('/api/messages')),
.subscribe((data) => {
handleResponse(data);
throw 'keep going';
})
.retryWhen(val => val === 'keep going')
有比使用retryWhen
更好的方式来处理这种情况吗?
发布于 2018-06-09 05:00:36
据我所知,您希望向服务器发送请求,当您收到响应后,再次发送新的请求。
StackBlitz live demo for this code
这就是该行为的示例。
import { Observable, Subject, of } from 'rxjs';
import { switchMap, delay, startWith, tap, concatMap } from 'rxjs/operators';
@Component({
selector: 'my-app',
templateUrl: './app.component.html',
styleUrls: ['./app.component.css']
})
export class AppComponent {
subject = new Subject();
constructor() {
this.subject.pipe(
startWith({}),
concatMap(() => this.testGet()),
tap(() => this.subject.next()),
).subscribe(x => {
console.log('inside subscribe', x);
})
}
async testGet() {
console.log('start get');
await of({}).pipe(delay(2000)).toPromise();
console.log('finish get');
return null;
}
}
https://stackoverflow.com/questions/50767446
复制相似问题