通过使用Http,我们调用一个进行网络调用并返回http observable的方法:
getCustomer() {
return this.http.get('/someUrl').map(res => res.json());
}
如果我们将这个可观察对象添加到多个订阅者中:
let network$ = getCustomer();
let subscriber1 = network$.subscribe(...);
let subscriber2 = network$.subscribe(...);
我们要做的是确保这不会导致多个网络请求。
这看起来可能很不寻常,但实际上很常见:例如,如果调用者订阅了observable以显示错误消息,并使用异步管道将其传递给模板,则我们已经有两个订阅者。
在RxJs5中这样做的正确方法是什么?
也就是说,这似乎工作得很好:
getCustomer() {
return this.http.get('/someUrl').map(res => res.json()).share();
}
但是这是在RxJs5中惯用的方式吗,或者我们应该做一些其他的事情呢?
注意:根据Angular 5新的HttpClient
__,所有示例中的.map(res => res.json())
部分现在都是无用的,因为JSON结果现在是默认假设的。
发布于 2016-03-30 06:09:47
根据@Cristian的建议,这是一种适用于HTTP可观察对象的方法,它只发出一次,然后就完成了:
getCustomer() {
return this.http.get('/someUrl')
.map(res => res.json()).publishLast().refCount();
}
发布于 2017-03-23 09:28:41
更新: Ben Lesh说在5.2.0之后的下一个小版本中,你将能够仅仅调用shareReplay()来真正的缓存。
以前……
首先,不要使用share()或publishReplay(1).refCount(),它们是相同的,问题是它只在可观察对象处于活动状态时建立连接时才共享,如果你在连接完成后连接,它会再次创建一个新的可观察对象,转换,而不是真正的缓存。
Birowski在上面给出了正确的解决方案,那就是使用ReplaySubject。ReplaySubject将缓存您为其提供的值(在本例1中为bufferSize)。一旦refCount达到零,并且您建立了新的连接,它就不会创建新的可观察对象,如share(),这是正确的缓存行为。
下面是一个可重用的函数
export function cacheable<T>(o: Observable<T>): Observable<T> {
let replay = new ReplaySubject<T>(1);
o.subscribe(
x => replay.next(x),
x => replay.error(x),
() => replay.complete()
);
return replay.asObservable();
}
下面是如何使用它
import { Injectable } from '@angular/core';
import { Http } from '@angular/http';
import { Observable } from 'rxjs/Observable';
import { cacheable } from '../utils/rxjs-functions';
@Injectable()
export class SettingsService {
_cache: Observable<any>;
constructor(private _http: Http, ) { }
refresh = () => {
if (this._cache) {
return this._cache;
}
return this._cache = cacheable<any>(this._http.get('YOUR URL'));
}
}
下面是cacheable函数的一个更高级的版本,这个函数允许有自己的查找表+提供自定义查找表的能力。这样,你就不必像上面的例子那样检查this._cache了。另请注意,不是传递observable作为第一个参数,而是传递一个返回observable的函数,这是因为Angular的Http会立即执行,因此通过返回一个延迟执行的函数,如果它已经在缓存中,我们可以决定不调用它。
let cacheableCache: { [key: string]: Observable<any> } = {};
export function cacheable<T>(returnObservable: () => Observable<T>, key?: string, customCache?: { [key: string]: Observable<T> }): Observable<T> {
if (!!key && (customCache || cacheableCache)[key]) {
return (customCache || cacheableCache)[key] as Observable<T>;
}
let replay = new ReplaySubject<T>(1);
returnObservable().subscribe(
x => replay.next(x),
x => replay.error(x),
() => replay.complete()
);
let observable = replay.asObservable();
if (!!key) {
if (!!customCache) {
customCache[key] = observable;
} else {
cacheableCache[key] = observable;
}
}
return observable;
}
用法:
getData() => cacheable(this._http.get("YOUR URL"), "this is key for my cache")
发布于 2016-03-30 05:59:49
我在这个问题上加了星号,但我会试着试一试。
//this will be the shared observable that
//anyone can subscribe to, get the value,
//but not cause an api request
let customer$ = new Rx.ReplaySubject(1);
getCustomer().subscribe(customer$);
//here's the first subscriber
customer$.subscribe(val => console.log('subscriber 1: ' + val));
//here's the second subscriber
setTimeout(() => {
customer$.subscribe(val => console.log('subscriber 2: ' + val));
}, 1000);
function getCustomer() {
return new Rx.Observable(observer => {
console.log('api request');
setTimeout(() => {
console.log('api response');
observer.next('customer object');
observer.complete();
}, 500);
});
}
下面是proof :)
只有一个外卖:getCustomer().subscribe(customer$)
我们不是订阅getCustomer()
的接口响应,我们订阅的是一个可观察的ReplaySubject,它也能够订阅不同的可观察对象,并且(这一点很重要)保存它的最后一个发射值,并将其重新发布给它的任何(ReplaySubject的)订阅者。
https://stackoverflow.com/questions/36271899
复制相似问题