发布
社区首页 >问答首页 >如何使用RxJs使用多个异步任务对批量操作进行管道和调优?

如何使用RxJs使用多个异步任务对批量操作进行管道和调优?
EN

Stack Overflow用户
提问于 2020-07-29 05:41:52
回答 2查看 106关注 0票数 0

例如

  • 有100种产品,数量1-5在购物车和用户点击订购。
  • 在创建顺序时,将执行以下任务:
    1. 使用产品1乘1检查产品的可用性。
    2. 检查并对每次购买10种产品的产品实行折扣。
    3. 对每次接受50种产品的产品征收适用的税金
    4. 一旦计算完所有产品的订单总数
    5. 创建一个订单

注意:没有限制可以将多少项添加到购物车中,当产品被添加到购物车中时,正在执行类似的任务。

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2020-07-29 11:30:08

我不完全确定你想要完成什么,但我猜下面是你想要的结果:

假设您的模型如下:

代码语言:javascript
代码运行次数:0
复制
class Product {
  id: number;
  name: string;
}

class ProductOrder {
  product: Product;
  quantity: number;
  discount: number;
  taxes: number;
}

class Order {
  productOrder: ProductOrder[];
  total: number;
}

考虑为每个步骤编写一个函数,其中每个函数的输入都是前面的输出:

代码语言:javascript
代码运行次数:0
复制
/**
 * Check product availability, 
 * if success return the same @productOrder 
 * else throw error
 */
declare function checkProductAvailabilty(productOrder: ProductOrder): Observable<ProductOrder>;

/**
 * Checks and apply discount on products, 
 * apply changes to @productOrder 
 * if success return the same @productOrder 
 * else throw error
 */
declare function checkAndApplyDiscount(productOrder: ProductOrder): Observable<ProductOrder>;

/**
 * Apply applicable taxes on products, 
 * apply changes to @productOrder 
 * if success return the same @productOrder 
 * else throw error
 */
declare function applyApplicableTaxes(productOrder: ProductOrder): Observable<ProductOrder>;

/**
 * Calculate order total, 
 * if success return @order
 * else throw error
 */
declare function calculatOrderTotal(productOrder: ProductOrder[]): Observable<Order>;

/**
 * Create order, 
 * if success return
 * else throw error
 */
declare function createOrder(order: Order): Observable<void>;

有了上面的功能,剩下要做的就是把它们都连接起来:

代码语言:javascript
代码运行次数:0
复制
import {
  concatMap,
  mergeMap,
  mergeAll,
  toArray,
  bufferCount,
  catchError,
} from 'rxjs/operators';


let productOrders: ProductOrder[];

from(productOrders)
  .pipe(
    concatMap(checkProductAvailabilty),
    bufferCount(10),
    mergeAll(),
    concatMap(checkAndApplyDiscount),
    bufferCount(50),
    mergeAll(),
    concatMap(applyApplicableTaxes),
    toArray(),
    mergeMap(calculatOrderTotal),
    mergeMap(createOrder),
    catchError(errorHandler)
  );
票数 1
EN

Stack Overflow用户

发布于 2020-07-29 14:22:56

我已经提出了一个可能的解决方案,可以在这一堆闪电战中进行测试。

这里,带有注释的逻辑是内联的

代码语言:javascript
代码运行次数:0
复制
// this is the function that simulates a remote call to get the avalability of the 
// quantity requested for a certain item
function checkAvailability(prodId: string, qty: number) {
  // randomly decide if a product is available or not
  const prodAvailable = Math.random() > 0
  return  of(prodAvailable)
}

// this is the function that simulates a call to get the discound for batches of 10 products
// at the time (the last batch can have less then 10 products)
function discount(prodIds: string[]) {
  console.log('discount batch len', prodIds.length)
  // discount is fixed to be 10%
  const discounts = prodIds.map(p => 0.1)
  return of(discounts)
}

// this function simulates a call to get the tax for arrays of max 50 products
function taxes(prodIds: string[]) {
  console.log('taxes batch len', prodIds.length)
  // taxes are fixed to be 20%
  const taxes = prodIds.map(p => 0.2)
  return of(taxes)
}

// build the initial cart which contains 100 items - each item contains the product id 
// the quantity and the price (quantity and price as set to 1)
let cart: {prodId: string, qty: number, price: number}[] = new Array(100).fill(null)
cart = cart.map((_, i) => ({prodId: 'p' + i, qty: 1, price: 1}))

// this is the function that returns an Observabel which notifies the total of the order
function orderTotal$(cart: {
    prodId: string;
    qty: number;
    price: number;
}[])  { 
  // first we create a stream of items in the cart
  return from(cart).pipe(
    // for each item we call the function which returns the availability of the item
    // since the availability is calculated by an aync service, we use mergeeMap
    mergeMap(({prodId, qty, price}) => checkAvailability(prodId, qty).pipe(
      // we need to return not only the availability but also the rest of the data
      // so we pipe this map operator to enrich the data passed back
      map(available => ({available, prodId, qty, price}))
    )),
    // we filter only the available products
    filter(({available, prodId, qty}) => available),
    // buffer the items available in budders of 10
    bufferCount(10),
    // use mergeMap, as we do above, to retrieve the discount from the remote async service
    mergeMap(ar => {
      // return an object enriched wit the discound info
      return discount(ar.map(item => item.prodId)).pipe(
        map(discs => discs.map((disc, i) => ({...ar[i], disc})))
      )
    }),
    // use this merge mergeMap to tranform the emissions of batches of items with length 10
    // into a stream of 10 items
    mergeMap(ar => ar),
    // now create batched with size 50
    bufferCount(50),
    // use mergeMap to invoKe the remote service which returns the taxes
    mergeMap(ar => {
      const pIds = ar.map(item => item.prodId)
      return taxes(pIds).pipe(
        map(txs => txs.map((tax, i) => ({...ar[i], tax})))
      )
    }),
    // again use mergeMap to generare a strea  of 50 items
    mergeMap(ar => ar),
    // use reduce to calculate the order total
    reduce((orderTotal, item) => {
      const itemPrice = item.price
      orderTotal = orderTotal + (itemPrice - itemPrice * item.disc + itemPrice * item.tax) * item.qty
      return orderTotal
    }, 0)
  )
}

// subscribe to the result of orderTotal$ function to get the total of the orer
orderTotal$(cart).subscribe(d => console.log('order total', d))
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/63146984

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档