我正在处理一个文件加密和上传类使用角。这些操作中有许多是异步的,因此我编写的方法正在返回RxJS可观测值。
// 1.
private prepareUpload(file): Observable<T>;
// 2.
private encryptData(data, filekey): Observable<T>
// 3.
private uploadEncryptedData(formData, token, range): Observable<T>
// 4.
private completeUpload(updatedFilekey, token): Observable<T>
我想将这个逻辑封装到一个公共的upload(file)
方法中,最后我使用了嵌套的订阅,它可以工作,但是我知道它是错误的,并且是RxJS中的反模式,原因有几个。下面是代码的简化版本:
public upload(file) {
const gen = this.indexGenerator(); // generator function
this.prepareUpload(file).subscribe(values => {
const [response, filekey, data] = values;
this.encryptData(data, filekey).subscribe(encryptedDataContainer => {
const formData = this.prepareEncDataUpload(encryptedDataContainer.data, file.name)
const range = this.getRange(file.size, gen.next().value);
this.uploadEncryptedData(formData, response.token, range).subscribe(() => {
if (range.isFinalPart) {
this.completeUpload(encryptedDataContainer.updatedFilekey, response.token).subscribe(console.log);
}
});
});
});
}
我未能使用几个RxJS运算符的组合来清除这段代码。我的目标是避免嵌套订阅,而是在工作流完成时从公共upload()
方法返回一个可观察到的单个订阅。
谢谢!
发布于 2019-03-29 11:52:10
您可以从RxJ中使用mergeMap
和filter
运算符,并链接您的调用。您将需要创建一些函数级变量,以便在链接期间使用。
import { mergeMap, filter, catchError } from 'rxjs/operators`
public upload(file) {
const gen = this.indexGenerator(); // generator function
let range, token;
this.prepareUpload(file)
.pipe(
mergeMap((values) => {
const [response, filekey, data] = values;
token = response.token;
return this.encryptData(data, filekey);
}),
mergeMap(encryptedDataContainer => {
const formData = this.prepareEncDataUpload(encryptedDataContainer.data, file.name)
range = this.getRange(file.size, gen.next().value);
return this.uploadEncryptedData(formData, token, range);
}),
filter(() => !!range.isFinalPart),
mergeMap(() => {
return this.completeUpload(encryptedDataContainer.updatedFilekey, token);
})
catchError((error) => {
console.log(error);
// handle the error accordingly.
})
)
.subscribe(() => {
console.log('success');
});
}
发布于 2019-03-29 11:42:15
您想在订阅之前使用管道。管道允许您在流发出之前对从流中下来的值进行更改。另外,使用mergeMap
来扁平订阅链。这是一个概述。这并不能提供一个完整的解决方案-支付我的钱不够;-我付的钱不够;
this.prepareUpload(file).pipe(
tap(values => console.log('hello1', values)),
map(values => [response, filekey, data]),
tap(values => console.log('hello2', values)),
mergeMap(() =>
// essential to catchError else an HTTP error response will disable this effect - if it uses HTTP - catchError essentially prevents the stream from erroring in which case it will never emit again
this.encryptData(data, filekey).pipe(
map(res => res), // do something with res here if you want
catchError(() => {
return of(null)
})
)
),
filter(res => !!res)
// more mergemap stuff here
).subscribe(values => {
console.log(values)
})
提示:在console.log值向下传递时,请使用tap运算符
未检查语法,可能缺少逗号或括号或2。
管道中的函数都是RxJS操作符
发布于 2019-03-29 11:48:24
您可以使用mergeMap rxjs操作符合并这些可观察到的内容,并消除嵌套订阅。
虽然有一个缺点,但是要注意,由于mergeMap同时维护多个活动的内部订阅,所以可以通过长时间的内部订阅创建内存泄漏。
供参考和举例:https://www.learnrxjs.io/operators/transformation/mergemap.html
https://stackoverflow.com/questions/55416011
复制相似问题