抽象的问题
有没有办法以外部可观察对象的原始顺序使用mergeMap
的结果,同时仍然允许内部可观察对象并行运行?
更详细的解释
让我们来看看两个合并映射操作符:
...which接受映射回调,以及可以同时运行多少个内部可观察对象:
of(1,2,3,4,5,6).pipe( mergeMap( => api.get(‘/=>’,{.pipe }),3) );
在:https://codepen.io/JosephSilber/pen/YzwVYNb?editors=1010的行动中查看它
这将分别为1
、2
和3
发出3个并行请求。一旦其中一个请求完成,它就会触发另一个对4
的请求。依此类推,始终保持3个并发请求,直到处理完所有值。
但是,由于先前的请求可能在后续请求之前完成,因此生成的值可能顺序混乱。因此,不是:
2、4、6、8、10、12
...we实际上可能会得到:
4、2、8、10、6、12 //或任何其他排列
...enter concatMap
.该运算符确保所有可观测对象都按原始顺序连接,因此:
of(1,2,3,4,5,6).pipe( concatMap( => api.get(‘/=>’,{.pipe} );
...will始终生成:
2、4、6、8、10、12
在:https://codepen.io/JosephSilber/pen/OJMmzpy?editors=1010的行动中查看它
这就是我们想要的,但现在请求将不会并行运行。正如the documentation所说:
当concurrency
参数设置为1
.时,concatMap
等效于mergeMap
所以回到问题:有没有可能获得mergeMap
的好处,即给定数量的请求可以并行运行,同时仍然以原始顺序发出映射值?
我的具体问题
上面抽象地描述了这个问题。当你知道手头的实际问题时,有时更容易推断出问题,所以这里是这样的:
const orderNumbers = 1,2,3,4,5,6,7,8,9,10;const
shipOrder
方法,它实际发送订单。它返回一个Promise
:const api.shipOrder(orderNumber); = orderNumber => const
mergeMap
来处理:来自(OrderNumbers).pipe( mergeMap(orderNumber => shipOrder(orderNumber),5) );
printShippingLabel
函数,给定已发货订单的订单号,它将打印其发货标签。因此,我订阅了我们的observable,并在值传入时打印运输标签:从(OrderNumbers) .pipe(mergeMap(orderNumber => shipOrder(orderNumber),5)) .pipe(orderNumber => from
mergeMap
会根据shipOrder
完成其请求的时间发出值。我想要的是以与原始list.相同的顺序打印标签
这有可能吗?
可视化
有关问题的可视化信息,请参阅此处:https://codepen.io/JosephSilber/pen/YzwVYZb?editors=1010
您可以看到,早些时候的订单甚至在发货之前就已经打印出来了。
发布于 2019-07-16 03:13:05
我确实设法部分解决了这个问题,所以我把它贴在这里作为我自己问题的答案。
我仍然非常想知道处理这种情况的规范方法。
复杂的解决方案
{ index: number }
),并保留这些值的缓冲区,仅根据这些值的index
顺序发出它们。index
的对象列表中。sortByIndex
运算符。下面是sortByIndex
应该是什么样子:
function sortByIndex() {
return observable => {
return Observable.create(subscriber => {
const buffer = new Map();
let current = 0;
return observable.subscribe({
next: value => {
if (current != value.index) {
buffer.set(value.index, value);
} else {
subscriber.next(value);
while (buffer.has(++current)) {
subscriber.next(buffer.get(current));
buffer.delete(current);
}
}
},
complete: value => subscriber.complete(),
});
});
};
}
有了sortByIndex
操作符,我们现在就可以完成整个管道了:
of(1, 2, 3, 4, 5, 6).pipe(
map((number, index) => ({ number, index })),
mergeMap(async ({ number, index }) => {
const doubled = await api.get('/double', { number });
return { index, number: doubled };
}, 3),
sortByIndex(),
map(({ number }) => number)
);
在:https://codepen.io/JosephSilber/pen/zYrwpNj?editors=1010的行动中查看它
创建concurrentConcat
运算符
实际上,有了这个sortByIndex
操作符,我们现在可以创建一个通用的concurrentConcat
操作符,它将在内部进行{ index: number, value: T }
类型之间的转换:
function concurrentConcat(mapper, parallel) {
return observable => {
return observable.pipe(
mergeMap(
mapper,
(_, value, index) => ({ value, index }),
parallel
),
sortByIndex(),
map(({ value }) => value)
);
};
}
然后,我们可以使用这个concurrentConcat
运算符而不是mergeMap
,它现在将按原始顺序发送值:
of(1, 2, 3, 4, 5, 6).pipe(
concurrentConcat(number => api.get('/double', { number }), 3),
);
在:https://codepen.io/JosephSilber/pen/pogPpRP?editors=1010的行动中查看它
因此,为了解决我最初的订单发货问题:
from(orderNumbers)
.pipe(concurrentConcat(orderNumber => shipOrder(orderNumber), maxConcurrent))
.subscribe(orderNumber => printShippingLabel(orderNumber));
在:https://codepen.io/JosephSilber/pen/rNxmpWp?editors=1010的行动中查看它
您可以看到,即使较晚的订单最终可能会在较早的订单之前发货,但标签始终按其原始顺序打印。
结论
这个解决方案甚至不完整(因为它不处理发出多个值的内部可见对象),但它需要一堆自定义代码。这是一个很常见的问题,我觉得必须有一种更简单的(内置的)方法来解决这个问题:|
发布于 2020-06-25 18:42:09
你可以使用这个操作符:sortedMergeMap
,example。
const DONE = Symbol("DONE");
const DONE$ = of(DONE);
const sortedMergeMap = <I, O>(
mapper: (i: I) => ObservableInput<O>,
concurrent = 1
) => (source$: Observable<I>) =>
source$.pipe(
mergeMap(
(value, idx) =>
concat(mapper(value), DONE$).pipe(map(x => [x, idx] as const)),
concurrent
),
scan(
(acc, [value, idx]) => {
if (idx === acc.currentIdx) {
if (value === DONE) {
let currentIdx = idx;
const valuesToEmit = [];
do {
currentIdx++;
const nextValues = acc.buffer.get(currentIdx);
if (!nextValues) {
break;
}
valuesToEmit.push(...nextValues);
acc.buffer.delete(currentIdx);
} while (valuesToEmit[valuesToEmit.length - 1] === DONE);
return {
...acc,
currentIdx,
valuesToEmit: valuesToEmit.filter(x => x !== DONE) as O[]
};
} else {
return {
...acc,
valuesToEmit: [value]
};
}
} else {
if (!acc.buffer.has(idx)) {
acc.buffer.set(idx, []);
}
acc.buffer.get(idx)!.push(value);
if (acc.valuesToEmit.length > 0) {
acc.valuesToEmit = [];
}
return acc;
}
},
{
currentIdx: 0,
valuesToEmit: [] as O[],
buffer: new Map<number, (O | typeof DONE)[]>([[0, []]])
}
),
mergeMap(scannedValues => scannedValues.valuesToEmit)
);
发布于 2022-02-15 16:10:13
结果
request 1
request 2
request 3
response 3
request 4
response 1
request 5
1
response 4
request 6
response 2
request 7
2
3
4
response 6
request 8
response 5
request 9
5
6
response 7
request 10
7
response 9
response 10
response 8
8
9
10
代码
https://stackblitz.com/edit/js-5kvwl6?file=index.js
import { range, Subject, from, of } from 'rxjs';
import { concatMap, share, map, concatAll, delayWhen } from 'rxjs/operators';
const pipeNotifier = new Subject().pipe(share());
range(1, 10)
.pipe(
// 1. Make Observable controlled by pipeNotifier
concatMap((v) => of(v).pipe(delayWhen(() => pipeNotifier))),
// 2. Submit the request
map((v) =>
from(
(async () => {
console.log('request', v);
await wait();
console.log('response', v);
pipeNotifier.next();
return v;
})()
)
),
// 3. Keep order
concatAll()
)
.subscribe((x) => console.log(x));
// pipeNotifier controler
range(0, 3).subscribe(() => {
pipeNotifier.next();
});
function wait() {
return new Promise((resolve) => {
const random = 5000 * Math.random();
setTimeout(() => resolve(random), random);
});
}
https://stackoverflow.com/questions/57045892
复制相似问题