首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >用RxJS延迟批量观测数据

用RxJS延迟批量观测数据
EN

Stack Overflow用户
提问于 2020-11-23 10:09:59
回答 3查看 617关注 0票数 2

我对我的db执行http请求,并注意到如果我同时发送所有请求,其中一些请求将得到超时错误。我想在调用之间添加一个延迟,这样服务器就不会超载。我试图找到这个问题的RxJS解决方案,并且不想添加一个setTimeout

以下是我目前所做的工作:

代码语言:javascript
复制
let observables = [];
for(let int = 0; int < 10000; int++){
   observables.push(new Observable((observer) => {
      db.add(doc[int], (err, result)=>{
         observer.next();
         observer.complete();
      })
   }))
}

forkJoin(observables).subscribe(
   data => {
   },
   error => {
      console.log(error);
   },
   () => {
      db.close();
   }
);
EN

回答 3

Stack Overflow用户

回答已采纳

发布于 2020-11-23 10:45:53

您确实可以通过Rxjs很好地实现这一点。你需要更高的可观测值,这意味着你将发射一个可观测到的,而更高的可观测到的将为你平平这一点。

这种方法的好处是,您可以轻松地在//中运行X请求,而不必自己管理请求池。

以下是工作代码:

代码语言:javascript
复制
import { Observable, Subject } from "rxjs";
import { mergeAll, take, tap } from "rxjs/operators";

// this is just a mock to demonstrate how it'd behave if the API was
// taking 2s to reply for a call
const mockDbAddHtppCall = (id, cb) =>
  setTimeout(() => {
    cb(null, `some result for call "${id}"`);
  }, 2000);

// I have no idea what your response type looks like so I'm assigning
// any but of course you should have your own type instead of this
type YourRequestType = any;

const NUMBER_OF_ITEMS_TO_FETCH = 10;

const calls$$ = new Subject<Observable<YourRequestType>>();

calls$$
  .pipe(
    mergeAll(3),
    take(NUMBER_OF_ITEMS_TO_FETCH),
    tap({ complete: () => console.log(`All calls are done`) })
  )
  .subscribe(console.log);

for (let id = 0; id < NUMBER_OF_ITEMS_TO_FETCH; id++) {
  calls$$.next(
    new Observable(observer => {
      console.log(`Starting a request for ID "${id}""`);
      mockDbAddHtppCall(id, (err, result) => {
        if (err) {
          observer.error(err);
        } else {
          observer.next(result);
          observer.complete();
        }
      });
    })
  );
}

和Stackblitz的现场演示:

请打开浏览器的控制台,并注意控制台日志显示触发呼叫时会立即启动其中的3个,然后等待1完成后再接另一个调用。

票数 1
EN

Stack Overflow用户

发布于 2020-11-23 10:22:57

看起来您可以使用初始的timer来触发http调用。例如:

timer(delayTime).pipe(combineLatest(()=>sendHttpRequest()));

这只会在可观察到的计时器完成后触发sendHttpRequest()方法。

所以用你的解决方案。你可以做以下..。

代码语言:javascript
复制
   observables.push(
     timer(delay + int).pipe(combineLatest(new Observable((observer) => {
      db.add(doc[int], (err, result)=>{
         observer.next();
         observer.complete();
      }))
   }))

延迟可能从0开始,您可以使用循环的int索引来增加延迟。

计时器文档:https://www.learnrxjs.io/learn-rxjs/operators/creation/timer

合并最新文档:https://www.learnrxjs.io/learn-rxjs/operators/combination/combinelatest

票数 0
EN

Stack Overflow用户

发布于 2020-11-23 14:41:57

具有并发值的merge

mergeAllmergeMap都允许您定义订阅可观测值的最大数量。mergeAll(1)/mergeMap(LAMBDA, 1)基本上是concatAll()/concatMap(LAMBDA)

merge基本上只是静态的mergeAll

下面是你如何使用它的方法:

代码语言:javascript
复制
let observables = [...Array(10000).keys()].map(intV => 
  new Observable(observer => {
    db.add(doc[intV], (err, result) => {
      observer.next();
      observer.complete();
    });
  })
);

const MAX_CONCURRENT_REQUESTS = 10;

merge(...observables, MAX_CONCURRENT_REQUESTS).subscribe({
   next: data => {},
   error: err => console.log(err),
   complete: () => db.close()
});

注意:这不会对您的呼叫进行批处理,但是它应该可以解决所描述的问题,而且可能也比批处理要快一些。

具有并发值的mergeMap

也许使用rangemergeMap的方式稍微多一点

代码语言:javascript
复制
const MAX_CONCURRENT_REQUESTS = 10;

range(0, 10000).pipe(
  mergeMap(intV => 
    new Observable(observer => {
      db.add(doc[intV], (err, result) => {
        observer.next();
        observer.complete();
      });
    }),
    MAX_CONCURRENT_REQUESTS
  )
).subscribe({
   next: data => {},
   error: err => console.log(err),
   complete: () => db.close()
});
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/64966355

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档