专栏首页暴走大数据简析Spark Streaming/Flink的Kafka动态感知

简析Spark Streaming/Flink的Kafka动态感知

前言

Kafka是我们日常的流处理任务中最为常用的数据源之一。随着数据类型和数据量的增大,难免要增加新的Kafka topic,或者为已有的topic增加更多partition。那么,Kafka后面作为消费者的实时处理引擎是如何感知到topic和partition变化的呢?本文以Spark Streaming和Flink为例来简单探究一下。

Spark Streaming的场合

根据官方文档(如上图),spark-streaming-kafka-0-10才支持Kafka的动态感知(即Dynamic Topic Subscription),翻翻源码,来到o.a.s.streaming.kafka010.DirectKafkaInputDStream类,每个微批次都会调用的compute()方法的第一行。

val untilOffsets = clamp(latestOffsets())

顾名思义,clamp()方法用来限制数据的流量,这里不提。而latestOffsets()方法返回各个partition当前最近的offset值,其具体实现如下(包含它调用的paranoidPoll()方法)。

/**
 * Returns the latest (highest) available offsets, taking new partitions into account.
 */
protected def latestOffsets(): Map[TopicPartition, Long] = {
  val c = consumer
  paranoidPoll(c)
  val parts = c.assignment().asScala
  // make sure new partitions are reflected in currentOffsets
  val newPartitions = parts.diff(currentOffsets.keySet)
  // Check if there's any partition been revoked because of consumer rebalance.
  val revokedPartitions = currentOffsets.keySet.diff(parts)
  if (revokedPartitions.nonEmpty) {
    throw new IllegalStateException(s"Previously tracked partitions " +
      s"${revokedPartitions.mkString("[", ",", "]")} been revoked by Kafka because of consumer " +
      s"rebalance. This is mostly due to another stream with same group id joined, " +
      s"please check if there're different streaming application misconfigure to use same " +
      s"group id. Fundamentally different stream should use different group id")
  }
  // position for new partitions determined by auto.offset.reset if no commit
  currentOffsets = currentOffsets ++ newPartitions.map(tp => tp -> c.position(tp)).toMap
  // find latest available offsets
  c.seekToEnd(currentOffsets.keySet.asJava)
  parts.map(tp => tp -> c.position(tp)).toMap
}

/**
 * The concern here is that poll might consume messages despite being paused,
 * which would throw off consumer position.  Fix position if this happens.
 */
private def paranoidPoll(c: Consumer[K, V]): Unit = {
  // don't actually want to consume any messages, so pause all partitions
  c.pause(c.assignment())
  val msgs = c.poll(0)
  if (!msgs.isEmpty) {
    // position should be minimum offset per topicpartition
    msgs.asScala.foldLeft(Map[TopicPartition, Long]()) { (acc, m) =>
      val tp = new TopicPartition(m.topic, m.partition)
      val off = acc.get(tp).map(o => Math.min(o, m.offset)).getOrElse(m.offset)
      acc + (tp -> off)
    }.foreach { case (tp, off) =>
        logInfo(s"poll(0) returned messages, seeking $tp to $off to compensate")
        c.seek(tp, off)
    }
  }
}

可见,在每次compute()方法执行时,都会通过paranoidPoll()方法来seek到每个TopicPartition对应的offset位置,并且通过latestOffsets()方法找出那些新加入的partition,并维护它们的offset,实现了动态感知。

由上也可以看出,Spark Streaming无法处理Kafka Consumer的Rebalance,所以一定要为不同的Streaming App设置不同的group.id。

Flink的场合

根据官方文档(如上图),Flink是支持Topic/Partition Discovery的,但是默认并未开启,需要手动配置flink.partition-discovery.interval-millis参数,即动态感知新topic/partition的间隔,单位毫秒。

Flink Kafka Source的基类时o.a.f.streaming.connectors.kafka.FlinkKafkaConsumerBase抽象类,在其run()方法中,会先创建获取数据的KafkaFetcher,再判断是否启用了动态感知。

this.kafkaFetcher = createFetcher(
        sourceContext,
        subscribedPartitionsToStartOffsets,
        watermarkStrategy,
        (StreamingRuntimeContext) getRuntimeContext(),
        offsetCommitMode,
        getRuntimeContext().getMetricGroup().addGroup(KAFKA_CONSUMER_METRICS_GROUP),
        useMetrics);

if (!running) {
    return;
}

// depending on whether we were restored with the current state version (1.3),
// remaining logic branches off into 2 paths:
//  1) New state - partition discovery loop executed as separate thread, with this
//                 thread running the main fetcher loop
//  2) Old state - partition discovery is disabled and only the main fetcher loop is executed
if (discoveryIntervalMillis == PARTITION_DISCOVERY_DISABLED) {
    kafkaFetcher.runFetchLoop();
} else {
    runWithPartitionDiscovery();
}

如果启用了,最终会调用createAndStartDiscoveryLoop()方法,启动一个单独的线程,负责以discoveryIntervalMillis为周期发现新的topic/partition,并传递给KafkaFetcher。

private void createAndStartDiscoveryLoop(AtomicReference<Exception> discoveryLoopErrorRef) {
    discoveryLoopThread = new Thread(() -> {
        try {
            // --------------------- partition discovery loop ---------------------
            // throughout the loop, we always eagerly check if we are still running before
            // performing the next operation, so that we can escape the loop as soon as possible
            while (running) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Consumer subtask {} is trying to discover new partitions ...", getRuntimeContext().getIndexOfThisSubtask());
                }
                final List<KafkaTopicPartition> discoveredPartitions;
                try {
                    discoveredPartitions = partitionDiscoverer.discoverPartitions();
                } catch (AbstractPartitionDiscoverer.WakeupException | AbstractPartitionDiscoverer.ClosedException e) {
                    // the partition discoverer may have been closed or woken up before or during the discovery;
                    // this would only happen if the consumer was canceled; simply escape the loop
                    break;
                }
                // no need to add the discovered partitions if we were closed during the meantime
                if (running && !discoveredPartitions.isEmpty()) {
                    kafkaFetcher.addDiscoveredPartitions(discoveredPartitions);
                }
                // do not waste any time sleeping if we're not running anymore
                if (running && discoveryIntervalMillis != 0) {
                    try {
                        Thread.sleep(discoveryIntervalMillis);
                    } catch (InterruptedException iex) {
                        // may be interrupted if the consumer was canceled midway; simply escape the loop
                        break;
                    }
                }
            }
        } catch (Exception e) {
            discoveryLoopErrorRef.set(e);
        } finally {
            // calling cancel will also let the fetcher loop escape
            // (if not running, cancel() was already called)
            if (running) {
                cancel();
            }
        }
    }, "Kafka Partition Discovery for " + getRuntimeContext().getTaskNameWithSubtasks());
    discoveryLoopThread.start();
}

可见,Flink通过名为PartitionDiscoverer的组件来实现动态感知。上面的代码中调用了discoverPartitions()方法,其源码如下。

public List<KafkaTopicPartition> discoverPartitions() throws WakeupException, ClosedException {
    if (!closed && !wakeup) {
        try {
            List<KafkaTopicPartition> newDiscoveredPartitions;
            // (1) get all possible partitions, based on whether we are subscribed to fixed topics or a topic pattern
            if (topicsDescriptor.isFixedTopics()) {
                newDiscoveredPartitions = getAllPartitionsForTopics(topicsDescriptor.getFixedTopics());
            } else {
                List<String> matchedTopics = getAllTopics();
                // retain topics that match the pattern
                Iterator<String> iter = matchedTopics.iterator();
                while (iter.hasNext()) {
                    if (!topicsDescriptor.isMatchingTopic(iter.next())) {
                        iter.remove();
                    }
                }
                if (matchedTopics.size() != 0) {
                    // get partitions only for matched topics
                    newDiscoveredPartitions = getAllPartitionsForTopics(matchedTopics);
                } else {
                    newDiscoveredPartitions = null;
                }
            }
            // (2) eliminate partition that are old partitions or should not be subscribed by this subtask
            if (newDiscoveredPartitions == null || newDiscoveredPartitions.isEmpty()) {
                throw new RuntimeException("Unable to retrieve any partitions with KafkaTopicsDescriptor: " + topicsDescriptor);
            } else {
                Iterator<KafkaTopicPartition> iter = newDiscoveredPartitions.iterator();
                KafkaTopicPartition nextPartition;
                while (iter.hasNext()) {
                    nextPartition = iter.next();
                    if (!setAndCheckDiscoveredPartition(nextPartition)) {
                        iter.remove();
                    }
                }
            }
            return newDiscoveredPartitions;
        } catch (WakeupException e) {
            // the actual topic / partition metadata fetching methods
            // may be woken up midway; reset the wakeup flag and rethrow
            wakeup = false;
            throw e;
        }
    } else if (!closed && wakeup) {
        // may have been woken up before the method call
        wakeup = false;
        throw new WakeupException();
    } else {
        throw new ClosedException();
    }

首先,会根据传入的是单个固定的topic还是由正则表达式指定的多个topics来分别处理,最终都调用getAllPartitionsForTopics()方法来获取这些topic的所有partition(这个方法由抽象类AbstractPartitionDiscoverer的各个子类实现,很简单)。然后会遍历这些partition,并调用setAndCheckDiscoveredPartition()方法来检查之前是否消费过它们,如果是,则移除之,保证方法返回的是新加入的partition。

本文分享自微信公众号 - 暴走大数据(zhouqiantanxi)

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2020-08-08

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 干货 | Flink Connector 深度解析

    作者介绍:董亭亭,快手大数据架构实时计算引擎团队负责人。目前负责 Flink 引擎在快手内的研发、应用以及周边子系统建设。2013 年毕业于大连理工大学,曾就职...

    大数据真好玩
  • Kafka+Spark Streaming管理offset的几种方法

    场景描述:Kafka配合Spark Streaming是大数据领域常见的黄金搭档之一,主要是用于数据实时入库或分析。为了应对可能出现的引起Streaming程序...

    大数据真好玩
  • ClickHouse 数据存储架构优化

    我们最初当时的ClickHouse的版本还是在1.x的时代,partition还不支持自定义,只能按月来划分。我们组内的数据存储还是按天来分表设计,例如“XXX...

    大数据真好玩
  • 小程序实现sql插入语句转换成Laravel迁移语句

    饶文津
  • HDU 6342

    用户2965768
  • 源码阅读--xutil3

    提莫队长
  • Netty学习二

    前面我们已经了解了官方的Netty的example,知道要编写一个一个聊天demo或者一个简单的rpc,或者应答模式的demo,在Netty中通常需要写服务端和...

    路行的亚洲
  • Android中使用Contentprovider导致进程被杀死

    Contentprovider也是四大组件之一,支持跨进程调用,因此肯定会用到IPC的Binder机制来实现跨进程调用,在应用层就是AIDL

    大大大大大先生
  • 据说这里可以帮你解决许多关于WebView的问题

    使用WebView开发的坑很多,这是众所周知的。本文分别对WebView的三个基本控件(俗称三剑客WebViewClient,WebChromeClient,W...

    阳仔
  • ListView 原理的介绍 qt也可以吧想通的Android

    转载请注明出处:http://blog.csdn.net/guolin_blog/article/details/44996879

    bear_fish

扫码关注云+社区

领取腾讯云代金券