首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >每小时发射一次的RxJS无穷大流

每小时发射一次的RxJS无穷大流
EN

Stack Overflow用户
提问于 2020-12-03 10:26:41
回答 5查看 969关注 0票数 1

我需要构建一个流,它在调用后立即发出api请求,如果页面没有刷新,则每小时(下午1:00,下午2:00 )。我用setTimeout()构建它,但是我想用RxJ实现它。你能帮帮我吗?

代码语言:javascript
运行
复制
isUpdated() {
    return new Observable<Object>(function update(obs) {
        this.http.get(`/update`).subscribe(data => {
            obs.next(data);
        });
        const timer = (60 - new Date().getMinutes()) * 60 * 1000;
        setTimeout(() => update.call(this, obs), timer);
    }.bind(this));
}

//call
isUpdated().subscribe(data => console.log(data));
EN

回答 5

Stack Overflow用户

回答已采纳

发布于 2020-12-03 15:01:03

我认为要解决这个问题,你需要把它分解成更小的部分。

首先,我们知道,在某个时候,根据当前时间,我们想知道何时触发下一个调用。如果我们得到一个时间戳,它给出了ms中的当前时间,并且我们希望在下一个小时之前得到ms的数量,那么我们可以这样做:

代码语言:javascript
运行
复制
const timeToNextHourInMs = (currentTimestampMs) => {
  const timestampSeconds = currentTimestampMs / 1000;

  const numberOfSecondsIntoTheCurrentHour = timestampSeconds % 3600;

  const numberOfSecondsToTheNextHour = 3600 - numberOfSecondsIntoTheCurrentHour;

  return numberOfSecondsToTheNextHour * 1000;
};

我希望变量名足够明确,我不需要评论,但让我知道否则。

接下来,我们要处理流问题:

  • 我们想立即触发一个HTTP调用
  • 立即获取发出的值。
  • 每次新的一小时开始(1:00,2:00,3:00等),都要做上述所有的事情。

以下是你如何做到的:

代码语言:javascript
运行
复制
this.http.get(`/update`).pipe(
  timestamp(),
  switchMap(({ timestamp, value }) =>
    concat(
      of(value),
      EMPTY.pipe(delay(timeToNextHourInMs(timestamp)))
    )
  ),
  repeat()
);

让我们来看看上面的逻辑:

  • 首先,我们立即进行HTTP调用
  • 完成HTTP调用后,我们将得到当前的时间戳(稍后根据该时间戳确定我们何时要执行下一次调用)
  • 我们执行一个switchMap,但是由于HTTP调用只返回1值,所以在这个非常特殊的情况下它并不重要。我们也可以用flatMapconcatMap
  • switchMap内部,我们首先使用concat发送我们刚刚从HTTP调用中得到的值,但也让这个可观察到的值一直保持到当前our的末尾(通过使用我们先前创建的函数)。
  • 因此,在当前小时结束时,流将完成。但是,由于我们有了一个retry,一旦流完成,我们将再次订阅它(并且提醒您,流只在新的一个小时开始时才会完成!)

我建议在这里添加一件事情,但这并不是最初问题的一个要求,那就是要进行一些错误处理,以便当您进行调用时出现问题时,它会在几秒钟后自动重试。否则,想象一下,当轮询开始时,如果您的网络在那个时候不工作5s,那么您的流就会立即出错。

为此,您可以引用此精彩的回答,并在可重用的自定义操作符中这样做:

代码语言:javascript
运行
复制
const RETRY_DELAY = 2000;
const MAX_RETRY_FOR_ONE_HTTP_CALL = 3;

const automaticRetry = () => (obs$) =>
  obs$.pipe(
    retryWhen((error$) =>
      error$.pipe(
        concatMap((error, index) =>
          iif(
            () => index >= MAX_RETRY_FOR_ONE_HTTP_CALL,
            throwError(error),
            of(error).pipe(delay(RETRY_DELAY))
          )
        )
      )
    )
  );

这将在每次重试之间延迟三次可观察到的重试。3次之后,流将抛出最后发出的错误,从而出错。

现在,我们只需将这个自定义操作符添加到流中:

代码语言:javascript
运行
复制
this.http.get(`/update`).pipe(
  automaticRetry(),
  timestamp(),
  switchMap(({ timestamp, value }) =>
    concat(
      of(value),
      EMPTY.pipe(delay(timeToNextHourInMs(timestamp)))
    )
  ),
  repeat()
);

我还没有对上面的代码进行实际测试,所以请在您这边这样做,并让我知道它是如何进行的。但如果我的逻辑是正确的,下面是事情的发展方向:

  • 假设你在2:40启动你的应用程序
  • 立即发出HTTP调用
  • 你很快就会得到回应
  • 这条溪流将保持20 be的开放状态。
  • 3:00,流完成,重试启动:我们执行另一个HTTP调用
  • 这一次,服务器重新部署,几秒钟内无法使用。
  • 在内部,流错误,但是由于我们的自定义操作符automaticRetry,它等待了3秒,然后重新尝试了一次,仍然什么也没有。它再等待3秒,这一次很好,结果会被传递到下游
  • 无限期重复:)

让我知道是怎么回事

票数 1
EN

Stack Overflow用户

发布于 2020-12-03 10:44:16

也许您可以这样做(我没有测试这段代码):

代码语言:javascript
运行
复制
import { merge, concat, of, timer, interval } from 'rxjs';
import { switchMap } from 'rxjs/operators';

...

isUpdated() {
  return concat(
    merge(
      of(null), // make call inmediately
      timer((60 - new Date().getMinutes()) * 60 * 1000), // the next full hour eg. 2:00
    ),
    interval(60 * 60 * 1000), // every hour
  ).pipe(
    switchMap(() => this.http.get(...)),
  )
})

concat只有在第一个merge完成后才会订阅interval,这是在下一个完整的小时(例如: 2:00 ),然后每小时发出一次。

@MrkSef有一个很好的想法,它可以简化成这样的东西:

代码语言:javascript
运行
复制
isUpdated() {
  return timer(
    (60 - new Date().getMinutes()) * 60 * 1000, // the next full hour eg. 2:00
    60 * 60 * 1000 // every hour
  ).pipe(
    startWith(null), // initial request
    switchMap(() => this.http.get(...)),
  )
})
票数 1
EN

Stack Overflow用户

发布于 2020-12-03 16:28:36

我觉得很简单。

计时器接受两个参数,第一个参数是第一个呼叫的延迟,然后是调用的间隔。你的下一个小时的偏移是正确的,我用你的代码。

使用startWith(null)立即启动。

代码语言:javascript
运行
复制
const startDelayMs = (60 - new Date().getMinutes()) * 60 * 1000;
const hourMs = 60 * 60 * 1000;

timer(startDelayMs, hourMs).pipe(
   startWith(null),
   switchMap(()) // do your call here
)
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/65124122

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档