前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >你不知道的冷知识 | 指数退避思想及其在Flume/Hadoop中的应用

你不知道的冷知识 | 指数退避思想及其在Flume/Hadoop中的应用

作者头像
王知无-import_bigdata
修改2019-08-17 23:20:57
1K0
修改2019-08-17 23:20:57
举报

5万人关注的大数据成神之路,不来了解一下吗?

5万人关注的大数据成神之路,真的不来了解一下吗?

5万人关注的大数据成神之路,确定真的不来了解一下吗?

欢迎您关注《大数据成神之路》

前言

前段时间爆改Codis的Java客户端Jodis,它的测试类中用到了指数退避算法。这是大学计算机网络课程会讲到的知识,本文权当复习,并且看看它的思想是如何应用在大数据组件中的。

计算机网络中的指数退避

所谓指数退避(exponential backoff),是一种根据系统反馈来成倍地削减操作的速率(比如数据流的速率)的算法,直到系统可以稳定地进行处理为止。在计算机网络的世界里,它一般用来控制数据帧/包的重传,避免密集的冲突与网络拥塞。

以以太网中使用的数据链路层协议CSMA/CD(载波监听多路访问/冲突检测)为例,其处理冲突的方式就是截断二进制指数退避(truncated binary exponential backoff),具体逻辑如下:

  • 确定退避时间的初始值。一般是用端到端的往返时间2τ,该时间也称为冲突窗口(collision window)或争用期,以太网习惯取值51.2μs。
  • 冲突发生时,设冲突次数为c,定K=min(c, 10)。从集合[0, 1, 2, 3, ..., 2K - 1]中随机取一个整数k,等待冲突窗口时长的k倍,然后再尝试重新发送帧。
  • 当c > 16时,认定此帧发送失败,向高层报告错误。

可见,该方法名为“二进制”是因为冲突窗口倍数的可取值有2K个,名为“截断”是因为最多重试16次就失败,不会无限重试下去。随着重试次数增多,退避时间的期望值也就越大,从而在竞争激烈时减少碰撞发生的概率。

下图是CSMA/CD的流程图,蓝框中就是指数退避流程。

指数退避的思想非常简单而有效,在除网络之外的其他方面也有应用。作为大数据工程师,挑两个大数据组件稍微讲解一下吧。

Flume中的指数退避

Flume是一个高效的日志数据采集与聚合框架,它由数据源Source、数据通道Channel、数据汇集Sink三大部分组成。其中,数据源有一个经典且常用的实现SpoolDirectorySource,它负责读取特定目录下的日志文件,其中用到了指数退避算法。它的主要逻辑在SpoolDirectoryRunnable这个线程中,下面来看其run()方法。(Flume版本为我们在用的1.7.0)

代码语言:javascript
复制
    
代码语言:javascript
复制
@Override
  public void run() {
    int backoffInterval = 250;
    try {
      while (!Thread.interrupted()) {
        List<Event> events = reader.readEvents(batchSize);
        if (events.isEmpty()) {
          break;
        }
        sourceCounter.addToEventReceivedCount(events.size());
        sourceCounter.incrementAppendBatchReceivedCount();

        try {
          getChannelProcessor().processEventBatch(events);
          reader.commit();
        } catch (ChannelFullException ex) {
          logger.warn("The channel is full, and cannot write data now. The " +
              "source will try again after " + backoffInterval +
              " milliseconds");
          hitChannelFullException = true;
          backoffInterval = waitAndGetNewBackoffInterval(backoffInterval);
          continue;
        } catch (ChannelException ex) {
          logger.warn("The channel threw an exception, and cannot write data now. The " +
              "source will try again after " + backoffInterval +
              " milliseconds");
          hitChannelException = true;
          backoffInterval = waitAndGetNewBackoffInterval(backoffInterval);
          continue;
        }
        backoffInterval = 250;
        sourceCounter.addToEventAcceptedCount(events.size());
        sourceCounter.incrementAppendBatchAcceptedCount();
      }
    } catch (Throwable t) {
      logger.error("FATAL: " + SpoolDirectorySource.this.toString() + ": " +
          "Uncaught exception in SpoolDirectorySource thread. " +
          "Restart or reconfigure Flume to continue processing.", t);
      hasFatalError = true;
      Throwables.propagate(t);
    }
  }

  private int waitAndGetNewBackoffInterval(int backoffInterval) throws InterruptedException {
    if (backoff) {
      TimeUnit.MILLISECONDS.sleep(backoffInterval);
      backoffInterval = backoffInterval << 1;
      backoffInterval = backoffInterval >= maxBackoff ? maxBackoff :
          backoffInterval;
    }
    return backoffInterval;
  }
代码语言:javascript
复制

该方法先通过ReliableSpoolingFileEventReader.readEvents()方法获取事件,再调用ChannelProcessor.processEventBatch()方法将事件批次放入对应的Channel中并提交。如果Channel已满或者写入发生异常,就以250ms为起始值进行退避,每次退避后等待时长都会翻倍,直到变量maxBackoff设定的最大值(默认为4000ms)。一旦提交成功,等待时长会重设回250ms,多次提交不成功的话也不会截断。

可见,Flume的指数退避方法比CSMA/CD的方法来得更加简单直接。

Hadoop中的指数退避

本来想用ZK客户端Curator举例子的,但是它比较默默无闻,还是用Hadoop吧。

hadoop-common项目里的RetryPolicies类中提供了非常多种重试策略,其中就有指数退避。

代码语言:javascript
复制
  public static final RetryPolicy exponentialBackoffRetry(
      int maxRetries, long sleepTime, TimeUnit timeUnit){
    return new ExponentialBackoffRetry(maxRetries, sleepTime, timeUnit);
  }

  static class ExponentialBackoffRetry extends RetryLimited {
    public ExponentialBackoffRetry(
        int maxRetries, long sleepTime, TimeUnit timeUnit){
      super(maxRetries, sleepTime, timeUnit);

      if (maxRetries < 0) {
        throw new IllegalArgumentException("maxRetries = " + maxRetries + " < 0");
      } else if (maxRetries >= Long.SIZE - 1) {
        throw new IllegalArgumentException("maxRetries = " + maxRetries
            + " >= " + (Long.SIZE - 1));
      }
    }
    
    @Override
    protected long calculateSleepTime(int retries) {
      return calculateExponentialTime(sleepTime, retries + 1);
    }
  }

可见,ExponentialBackoffRetry类强制规定了最大重试次数maxRetries,初始等待时间为sleepTime,实际等待时间则由calculateExponentialTime()方法来计算。

代码语言:javascript
复制
private static long calculateExponentialTime(long time, int retries,
      long cap){
    long baseTime = Math.min(time * (1L << retries), cap);
    return (long) (baseTime * (RANDOM.get().nextDouble() + 0.5));
  }

  private static long calculateExponentialTime(long time, int retries) {
    return calculateExponentialTime(time, retries, Long.MAX_VALUE);
  }

该方法使用cap参数来限制等待时间的最大值,默认是不限制的。除了在初始时间的基础上乘2的重试次数次幂之外,还会用0.5~1.5区间内的随机数加权,比较“聪明”一点。

— THE END —

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-07-16,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据技术与架构 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 计算机网络中的指数退避
  • Flume中的指数退避
  • Hadoop中的指数退避
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档