社区首页 >问答首页 >如何使用RxJs使用多个异步任务对批量操作进行管道和调优?

如何使用RxJs使用多个异步任务对批量操作进行管道和调优?
EN

Stack Overflow用户
提问于 2020-07-28 21:41:52
回答 2查看 106关注 0票数 0

例如

  • 有100种产品,数量1-5在购物车和用户点击订购。
  • 在创建顺序时,将执行以下任务:
    1. 使用产品1乘1检查产品的可用性。
    2. 检查并对每次购买10种产品的产品实行折扣。
    3. 对每次接受50种产品的产品征收适用的税金
    4. 一旦计算完所有产品的订单总数
    5. 创建一个订单

注意:没有限制可以将多少项添加到购物车中,当产品被添加到购物车中时,正在执行类似的任务。

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2020-07-29 03:30:08

我不完全确定你想要完成什么,但我猜下面是你想要的结果:

假设您的模型如下:

代码语言:javascript
代码运行次数:0
复制
class Product {
  id: number;
  name: string;
}

class ProductOrder {
  product: Product;
  quantity: number;
  discount: number;
  taxes: number;
}

class Order {
  productOrder: ProductOrder[];
  total: number;
}

考虑为每个步骤编写一个函数,其中每个函数的输入都是前面的输出:

代码语言:javascript
代码运行次数:0
复制
/**
 * Check product availability, 
 * if success return the same @productOrder 
 * else throw error
 */
declare function checkProductAvailabilty(productOrder: ProductOrder): Observable<ProductOrder>;

/**
 * Checks and apply discount on products, 
 * apply changes to @productOrder 
 * if success return the same @productOrder 
 * else throw error
 */
declare function checkAndApplyDiscount(productOrder: ProductOrder): Observable<ProductOrder>;

/**
 * Apply applicable taxes on products, 
 * apply changes to @productOrder 
 * if success return the same @productOrder 
 * else throw error
 */
declare function applyApplicableTaxes(productOrder: ProductOrder): Observable<ProductOrder>;

/**
 * Calculate order total, 
 * if success return @order
 * else throw error
 */
declare function calculatOrderTotal(productOrder: ProductOrder[]): Observable<Order>;

/**
 * Create order, 
 * if success return
 * else throw error
 */
declare function createOrder(order: Order): Observable<void>;

有了上面的功能,剩下要做的就是把它们都连接起来:

代码语言:javascript
代码运行次数:0
复制
import {
  concatMap,
  mergeMap,
  mergeAll,
  toArray,
  bufferCount,
  catchError,
} from 'rxjs/operators';


let productOrders: ProductOrder[];

from(productOrders)
  .pipe(
    concatMap(checkProductAvailabilty),
    bufferCount(10),
    mergeAll(),
    concatMap(checkAndApplyDiscount),
    bufferCount(50),
    mergeAll(),
    concatMap(applyApplicableTaxes),
    toArray(),
    mergeMap(calculatOrderTotal),
    mergeMap(createOrder),
    catchError(errorHandler)
  );
票数 1
EN

Stack Overflow用户

发布于 2020-07-29 06:22:56

我已经提出了一个可能的解决方案,可以在这一堆闪电战中进行测试。

这里,带有注释的逻辑是内联的

代码语言:javascript
代码运行次数:0
复制
// this is the function that simulates a remote call to get the avalability of the 
// quantity requested for a certain item
function checkAvailability(prodId: string, qty: number) {
  // randomly decide if a product is available or not
  const prodAvailable = Math.random() > 0
  return  of(prodAvailable)
}

// this is the function that simulates a call to get the discound for batches of 10 products
// at the time (the last batch can have less then 10 products)
function discount(prodIds: string[]) {
  console.log('discount batch len', prodIds.length)
  // discount is fixed to be 10%
  const discounts = prodIds.map(p => 0.1)
  return of(discounts)
}

// this function simulates a call to get the tax for arrays of max 50 products
function taxes(prodIds: string[]) {
  console.log('taxes batch len', prodIds.length)
  // taxes are fixed to be 20%
  const taxes = prodIds.map(p => 0.2)
  return of(taxes)
}

// build the initial cart which contains 100 items - each item contains the product id 
// the quantity and the price (quantity and price as set to 1)
let cart: {prodId: string, qty: number, price: number}[] = new Array(100).fill(null)
cart = cart.map((_, i) => ({prodId: 'p' + i, qty: 1, price: 1}))

// this is the function that returns an Observabel which notifies the total of the order
function orderTotal$(cart: {
    prodId: string;
    qty: number;
    price: number;
}[])  { 
  // first we create a stream of items in the cart
  return from(cart).pipe(
    // for each item we call the function which returns the availability of the item
    // since the availability is calculated by an aync service, we use mergeeMap
    mergeMap(({prodId, qty, price}) => checkAvailability(prodId, qty).pipe(
      // we need to return not only the availability but also the rest of the data
      // so we pipe this map operator to enrich the data passed back
      map(available => ({available, prodId, qty, price}))
    )),
    // we filter only the available products
    filter(({available, prodId, qty}) => available),
    // buffer the items available in budders of 10
    bufferCount(10),
    // use mergeMap, as we do above, to retrieve the discount from the remote async service
    mergeMap(ar => {
      // return an object enriched wit the discound info
      return discount(ar.map(item => item.prodId)).pipe(
        map(discs => discs.map((disc, i) => ({...ar[i], disc})))
      )
    }),
    // use this merge mergeMap to tranform the emissions of batches of items with length 10
    // into a stream of 10 items
    mergeMap(ar => ar),
    // now create batched with size 50
    bufferCount(50),
    // use mergeMap to invoKe the remote service which returns the taxes
    mergeMap(ar => {
      const pIds = ar.map(item => item.prodId)
      return taxes(pIds).pipe(
        map(txs => txs.map((tax, i) => ({...ar[i], tax})))
      )
    }),
    // again use mergeMap to generare a strea  of 50 items
    mergeMap(ar => ar),
    // use reduce to calculate the order total
    reduce((orderTotal, item) => {
      const itemPrice = item.price
      orderTotal = orderTotal + (itemPrice - itemPrice * item.disc + itemPrice * item.tax) * item.qty
      return orderTotal
    }, 0)
  )
}

// subscribe to the result of orderTotal$ function to get the total of the orer
orderTotal$(cart).subscribe(d => console.log('order total', d))
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/63146984

复制
相关文章
thingsboard入门通过mqtt发送数据
thingsboard支持三种传输遥测数据方式:http、mqtt以及coap,本文介绍如何通过mqtt协议推送数据到server端,
johnhuster的分享
2022/03/28
2.6K0
thingsboard入门通过mqtt发送数据
尝试通过MQTT向thingsboard上的设备发送数据
在thingsboard demo网站上注册一个用户 https://demo.thingsboard.io/signup
lilugirl
2020/02/18
4.5K0
尝试通过MQTT向thingsboard上的设备发送数据
通过pyHook来快速发送信息
最近看了一个视频,通过 python 的 pyHook 模块来监听电脑的键盘响应事件,只要按下 ctrl 键就能得到一句随机的祖安话,然后 ctrl+v 快速粘贴发送出去就能够在游戏中跟人对喷,挺有意思的,指的是这个思路,并不是教唆大家去骂人。然后我也尝试了一下,将过程记录下来。
棒棒鸡不棒
2022/09/02
7480
通过pyHook来快速发送信息
PyTorch中的梯度累积
我们在训练神经网络的时候,超参数batch_size的大小会对模型最终效果产生很大的影响,通常的经验是,batch_size越小效果越差;batch_size越大模型越稳定。理想很丰满,现实很骨感,很多时候不是你想增大batch_size就能增大的,受限于显存大小等因素,我们的batch_size往往只能设置为2或4,否则就会出现"CUDA OUT OF MEMORY"(OOM)报错。如何在有限的计算资源下,采用更大的batch_size进行训练,或者达到和大batch_size一样的效果?这就是梯度累加(Gradient Accumulation)技术了
mathor
2021/07/28
1.5K0
如何通过jQuery发送AJAX?
contentType:发生请求时的内容编码类型(application/x-www-form-urlencoded)
切图仔
2022/09/08
1.2K0
网页上收集的信息如何发送?
网页上收集用户信息完成后,都需要发送到服务器上存储起来,存储是后台的事,但是我们需要负责发送,是如何发送消息呢?
呆呆
2021/11/25
8020
网页上收集的信息如何发送?
网页上收集用户信息完成后,都需要发送到服务器上存储起来,存储是后台的事,但是我们需要负责发送,是如何发送消息呢?
呆呆
2021/09/28
9250
轻量通讯协议 --- MQTT
「MQTT(Message Queuing Telemetry Transport)」 是一种轻量级的消息传输协议,通常用于在物联网(IoT)和传感器网络中进行通信。它设计用于在低带宽、不稳定或高延迟的网络环境下传输数据,因此非常适用于连接设备之间的通信,尤其是在资源有限的环境中。
Niuery Diary
2023/10/22
4K0
轻量通讯协议 --- MQTT
专家专栏|使用agent2自定义插件采集通过MQTT协议发送的数据
Zabbix运维工程师,熟悉Zabbix开源监控系统的架构。乐于分享Zabbix运维经验,个人公众号“运维开发故事”。
没有故事的陈师傅
2020/09/16
1.3K0
黑客可以通过发送信息从ATM机获取到现金
来自赛门铁克(Symantec)最新披露的消息,黑客能 够通过发送短信从ATM机获取到现金——这是通过首先将恶意程序加载到ATM设备中实现的。在本周一的报道中,赛门铁克将2013年10月在墨西哥爆发的 Ploutus恶意程序,通过CD-ROM和USB驱动器就非常轻易地将Ploutus上传到ATM设备中,罪犯要接入这些驱动器首先需要解锁,或者直接 在ATM机器外部钻孔。 赛门铁克安全研究人员Daniel Regalado在文章中提到:罪犯需要通过USB数据线将手机与ATM设备相连,并进行一些
安恒信息
2018/04/11
7230
WinCC 通过MQTT连接到云端
自 WinCC V7.5 起,您可使用&ldquo;WinCC Cloud Connector&rdquo;在云端(如&ldquo;Amazon AWS&rdquo;)建立直接通信。
科控物联
2022/03/29
4.7K0
WinCC 通过MQTT连接到云端
PHP中通过json格式定义字面量对象
PHPer 都知道 PHP 是不支持字面量了,至少目前版本都不支持。比如,在 JS 中可以这样定义 object var o = { 'name' : 'qq52o' , 'url' : 'www.qq52o.me' }; alert(o.name); Python 中定义字典,也可以这样定义: o = { 'name' : 'qq52o' , 'url' : 'www.qq52o.me' } print o['name'] 但在 PHP 中这么定义 object: $a = { "name" : "qq5
沈唁
2018/05/24
1.7K0
通过案例理解 MQTT 主题与通配符
MQTT 主题本质上是一个 UTF-8 编码的字符串,是 MQTT 协议进行消息路由的基础。MQTT 主题类似 URL 路径,使用斜杠 / 进行分层:
EMQ映云科技
2022/10/13
2.7K0
轻量MQTT服务器mosquitto搭建笔记
搭建流程 安装 直接yum yum install mosquitto 添加用户 用户信息都是保存在一个文件中的。 添加有两种方式,一种是直接覆写文件,一种是追加文件;前者添加后原用户信息全部丢失,后者不会。 创建用户密码文件并添加用户(如存在则覆写) 格式:sudo mosquitto_passwd -c [文件路径] [用户名] sudo mosquitto_passwd -c /etc/mosquitto/pwdfile [用户名] - 输入用户密码 - 再次输入用户密码 追加用户(不会覆写) 格式:
xinhuo
2022/03/11
3.7K0
如何通过Cloudera Manager的API获取集群告警信息
告警监控对于一个集群来说,其重要性不言而喻。Cloudera Manager的告警功能非常详尽,CDH集群出现的异常、故障信息等都会及时地出现在CM页面上,通过页面可以快速方便地了解到集群运行性状况。
soundhearer
2020/10/16
2.7K0
如何通过Cloudera Manager的API获取集群告警信息
【工控技术】在STEP 7 (TIA Portal) 中,如何实现流量累积功能?
使用库'Totalizer_Lib_TIA_Portal' 中的函数块 'Totalizer' ,可以计算出一个瞬时流量的累积值。
剑指工控
2021/11/09
3.2K0
【工控技术】在STEP 7 (TIA Portal) 中,如何实现流量累积功能?
EasyPlayer-rtsp 如何配置向Server发送心跳信息?
EasyPlayer播放器系列已经支持了H265编码视频的播放,根据用户在不同场景下的不同需求,我们已经有EasyPlayer-RTSP、EasyPlayer- RTMP、EasyPlayerPro 和EasyPlayer.js 等版本。
TSINGSEE青犀视频
2021/07/19
1.1K0
EasyPlayer-rtsp 如何配置向Server发送心跳信息?
[ 物联网篇 ] - MQTT协议是如何工作的 ?
此外,它被设计为轻量级消息传递协议,它使用发布/订阅操作在客户端和服务器之间交换数据。此外,它的小尺寸,低功耗,最小化数据包和易于实现使该协议成为“机器到机器”或“物联网”世界的理想选择。
程序手艺人
2022/05/10
2.7K0
[ 物联网篇 ] -  MQTT协议是如何工作的 ?
在Managed Code通过Google Gmail发送邮件以及如何通过Outlook配置Gmail
在项目开发中,发送邮件时一种非常常见的功能。一般的情况下,大型的公司都有自己的邮件系统,我们可以直接通过公司的Pop/SMTP Server进行邮件的发送和接收。不过,对于一些小公司不具有这样的条件,他们一般通过一些公共的邮件服务通过商提供的邮件服务。比如Sina,163就是很好的、常用的邮件服务。不过相比之下,我还是习惯使用Google Gmail。 接下来,我将介绍两方面来介绍今天的内容,如果通过Managed code通过Gmail进行邮件的发送,以及如何在Outlook中配置Gmail。今天介绍的东
蒋金楠
2018/02/07
1.7K0
在Managed Code通过Google Gmail发送邮件以及如何通过Outlook配置Gmail
如何通过经纬度获取地址信息?
摘要 Google Maps API Web Services,是一个为您的地图应用程序提供地理数据的 Google 服务的 HTTP 接口集合。具体包括:Google Geocoding API、Google Directions API、Google Elevation API、Google Places API。本文将探讨如何通过Google Geocoding API服务来获取地址信息。 ----  目录 什么是网络服务? 区分地址解析与反地址解析 地址查询(反地址解析)请求 地址查询(反地址解析)
刘皓
2018/04/03
7.5K0
如何通过经纬度获取地址信息?

相似问题

累积量mqtt测量

10

通过MQTT发送图像

112

通过API发送MQTT数据

140

逆累积量

13

累积量的使用

25
添加站长 进交流群

领取专属 10元无门槛券

AI混元助手 在线答疑

扫码加入开发者社群
关注 腾讯云开发者公众号

洞察 腾讯核心技术

剖析业界实践案例

扫码关注腾讯云开发者公众号
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档