RxJS mergeMap()与原始顺序

内容来源于 Stack Overflow,并遵循CC BY-SA 3.0许可协议进行翻译与使用

  • 回答 (1)
  • 关注 (0)
  • 查看 (189)

抽象的问题

有没有办法mergeMap在外部observable的原始顺序中使用a的结果,同时仍然允许内部observable并行运行?

更详细的解释

让我们看看两个合并映射运算符:

  • mergeMap ...它采用映射回调,并且可以同时运行多少个内部可观察量: of(1, 2, 3, 4, 5, 6).pipe( mergeMap(number => api.get('/double', { number }), 3) ); 在此处查看https//codepen.io/anon/pen/qzgGdJ?edit = 1010 这将火熄灭3个并行请求123分别。只要其中一个请求完成,它就会触发另一个请求4。依此类推,始终保持3个并发请求,直到所有值都已处理完毕。 但是,由于先前的请求可能在后续请求之前完成,因此生成的值可能会出现故障。所以代替: [2, 4, 6, 8, 10, 12] ......我们可能真的得到: [4, 2, 8, 10, 6, 12] // or any other permutation
  • concatMap ......进入concatMap。此运算符确保observable都按原始顺序连接,以便: of(1, 2, 3, 4, 5, 6).pipe( concatMap(number => api.get('/double', { number })) ); ...将永远产生: [2, 4, 6, 8, 10, 12] 请在此处查看https//codepen.io/anon/pen/MMLdav?edit = 1010 这是我们想要的,但现在请求不会并行运行。正如文件所说: concatMap相当于mergeMapconcurrency参数设置为1

回到这个问题:是否有可能获得好处mergeMap,即可以并行运行给定数量的请求,同时仍然按原始顺序发出映射值?

我的具体问题

以上是抽象的问题。当你知道手头的实际问题时,有时候更容易推理出问题,所以这里有:

  1. 我有一份必须发货的订单清单: const orderNumbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
  2. 我有一个shipOrder实际发送订单的方法。它返回一个Promiseconst shipOrder = orderNumber => api.shipOrder(orderNumber);
  3. API最多只能同时处理5个订单,所以我使用mergemap来处理: from(orderNumbers).pipe( mergeMap(orderNumber => shipOrder(orderNumber), 5) );
  4. 订单发货后,我们需要打印其运输标签。我有一个printShippingLabel功能,根据发货订单的订单号,将打印其运输标签。所以我订阅了我们的observable,并在值出现时打印出货标签: from(orderNumbers) .pipe(mergeMap(orderNumber => shipOrder(orderNumber), 5)) .pipe(orderNumber => printShippingLabel(orderNumber));
  5. 这样做有效,但现在运输标签不按顺序打印,因为mergeMap根据何时shipOrder完成其请求而发出值。我想要的是标签的打印顺序与原始列表相同

那可能吗?

可视化

请参阅此处以查看问题:https : //codepen.io/anon/pen/agMWBK?editors=1010

您可以看到之前的订单在以后的订单发货之前正在打印。

提问于
用户回答回答于

我确实设法部分解决它,所以我在这里发布它作为我自己的问题的答案。仍然非常想知道处理这种情况的规范方法。

一个复杂的解决方案

  1. 创建一个自定义运算符,它接受具有索引键的值({ index: number }Typescript用语中),并保留值的缓冲区,只根据它们index的顺序发出它们。
  2. 将原始列表映射到index嵌入其中的对象列表。
  3. 将其传递给我们的自定义sortByIndex运营商
  4. 将值映射回原始值。

这是什么sortByIndex样子:

function sortByIndex() {
    return observable => {
        return Observable.create(subscriber => {
            const buffer = new Map();
            let current = 0;
            return observable.subscribe({
                next: value => {
                    if (current != value.index) {
                        buffer.set(value.index, value);
                    } else {
                        subscriber.next(value);

                        while (buffer.has(++current)) {
                            subscriber.next(buffer.get(current));
                            buffer.delete(current);
                        }
                    }
                },
                complete: value => subscriber.complete(),
            });
        });
    };
}

有了sortByIndex操作员,我们现在可以完成整个管道:

of(1, 2, 3, 4, 5, 6).pipe(
    map((number, index) => ({ number, index })),
    mergeMap(async ({ number, index }) => {
        const doubled = await api.get('/double', { number });
        return { index, number: doubled };
    }, 3),
    sortByIndex(),
    map(({ number }) => number)
);

在此处查看https//codepen.io/anon/pen/VJgOeN?edit = 1010

创建一个orderdMergeMap运营商

事实上,有了这个sortByIndex操作符,我们现在可以创建一个通用orderdMergeMap运算符,它将在{ index: number, value: T }内部进行类型转换:

function orderedMergeMap(mapper, parallel) {
    return observable => {
        return observable.pipe(
            mergeMap(
                mapper,
                (_, value, index) => ({ value, index }),
                parallel
            ),
            sortByIndex(),
            map(({ value }) => value)
        );
    };
}

然后我们可以使用此orderedMergeMap运算符代替mergeMap,它现在将按原始顺序发出值:

of(1, 2, 3, 4, 5, 6).pipe(
    orderedMergeMap(number => api.get('/double', { number }), 3),
);

在此处查看https//codepen.io/anon/pen/ZdwNXx?edit = 1010

所以解决我订单发货的原始问题:

from(orderNumbers)
    .pipe(orderedMergeMap(orderNumber => shipOrder(orderNumber), maxConcurrent))
    .subscribe(orderNumber => printShippingLabel(orderNumber));

在此处查看https//codepen.io/anon/pen/QXRVxY?edit = 1010

您可以看到,即使以后的订单最终可能会在之前的订单之前发货,但标签始终按原始订单打印。

结论

这个解决方案甚至不完整(因为它不处理发出多个值的内部可观察对象),但它需要一堆自定义代码。这是一个常见的问题,我觉得必须有一种更容易(内置)的方法来解决这个问题:|

扫码关注云+社区

领取腾讯云代金券