前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊kafka0.8的topic的partition以及topicCountMap

聊聊kafka0.8的topic的partition以及topicCountMap

作者头像
code4it
发布2018-09-17 15:03:55
4070
发布2018-09-17 15:03:55
举报
文章被收录于专栏:码匠的流水账

本文主要研究下kafka0.8版本api的topicCountMap与topic的partition的关系。

partition

物理上把topic分成一个或多个partition,每个partition在物理上对应一个文件夹,该文件夹下存储 这个partition的所有消息和索引文件。

partition与consumer

  • 如果consumer比partition多,是浪费,因为kafka的设计是在一个partition上是不允许并发的,所以consumer数不要大于partition数
  • 如果consumer比partition少,一个consumer会对应于多个partitions,这里主要合理分配consumer数和partition数,否则会导致partition里面的数据被取的不均匀.最好partiton数目是consumer数目的整数倍,所以partition数目很重要,比如取24,就很容易设定consumer数目
  • 如果consumer从多个partition读到数据,不保证数据间的顺序性,kafka只保证在一个partition上数据是有序的,但多个partition,根据你读的顺序会有不同

kafka producer发送消息的时候,如果有key的话,根据key进行hash,然后分发到指定的partition;如果没有key则按counter进行partition。

rebalance

如果增减consumer,broker,partition会导致rebalance,rebalance后consumer对应的partition会发生变化。

比如减少一个consumer,然后rebalance之后,consumer对应的partition会进行重新调整映射。

topicCountMap

告诉Kafka我们在Consumer中将用多少个线程来消费该topic。topicCountMap的key是topic name,value针对该topic是线程的数量。

假设有个topic,有6个partiton,然后启动了两个consumer,每个consumer的topicCount为3,则观察会发现,每个consumer的消费线程都在运行; 如果每个consumer的topicCount变为4,则会发现,先启动的consmer中4个线程都在运行,而后启动的consumer中只有2个线程在运行,其他2个被阻塞住了。

也就是说,对于consumer来说,实际的消费个数=consumer实例个数每个consumer的topicCount个数,如果这个值>partition,则会造成某些消费线程多余,阻塞住。 如果这个值<=partition,则所有消费线程都在消费。 因此实际分布式部署consumer的时候,其consumer实例个数每个consumer的topicCount个数<=topic的partition值。

代码实例

  • 创建topic sh kafka-topics.sh --create --topic topic20170921 --replication-factor 1 --partitions 6 --zookeeper localhost:2181
  • 查看consumer group sh kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:2181 --group test-group-0921
  • consumer public class NativeConsumer { ExecutorService pool = Executors.newFixedThreadPool(10); public void exec(String topic,String zk,int consumerCount,String group) throws UnsupportedEncodingException { Properties props = new Properties(); props.put("zookeeper.connect", zk); // props.put("auto.offset.reset","smallest"); props.put("group.id",group); props.put("zookeeper.session.timeout.ms", "10000"); props.put("zookeeper.sync.time.ms", "2000"); props.put("auto.commit.interval.ms", "10000"); props.put(org.apache.kafka.clients.consumer.ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY, "range"); ConsumerConfig consumerConfig = new kafka.consumer.ConsumerConfig(props); ConsumerConnector consumerConnector = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig); Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, consumerCount); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector .createMessageStreams(topicCountMap); consumerMap.get(topic).stream().forEach(stream -> { pool.submit(new Runnable() { @Override public void run() { ConsumerIterator<byte[], byte[]> it = stream.iterator(); while (it.hasNext()) { System.out.println(Thread.currentThread().getName()+":"+new String(it.next().message())); } } }); }); } }
  • producer public class NativeProducer { public void produce(String topic,String brokerAddr) throws ExecutionException, InterruptedException { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,brokerAddr); props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); try (KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(props)) { int totalCountOfSendedMessages = 0; long totalSendTime = 0; long timeOfLastUpdate = 0; int countOfMessagesInSec = 0; for(int i=0;i<1000000;i++){ //todo key不能相同,否则都发送到同一个partition了,消费者无法scale out byte[] dataKey = SerializationUtils.serialize(UUID.randomUUID().toString()); byte[] dataValue = SerializationUtils.serialize(UUID.randomUUID().toString()); ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<>( topic, dataKey, dataValue ); long sendingStartTime = System.currentTimeMillis(); // Sync send producer.send(producerRecord).get(); Thread.sleep(100); long currentTime = System.currentTimeMillis(); long sendTime = currentTime - sendingStartTime; totalSendTime += sendTime; totalCountOfSendedMessages++; countOfMessagesInSec++; if (currentTime - timeOfLastUpdate > TimeUnit.SECONDS.toMillis(1)) { System.out.println("Average send time: " + (double) (totalSendTime / totalCountOfSendedMessages) + " ms."); System.out.println("Count of messages in second: " + countOfMessagesInSec); timeOfLastUpdate = currentTime; countOfMessagesInSec = 0; } } } } }
  • test String zkAddr = "localhost:2181"; String topic = "topic20170921"; //partition 6 String brokerAddr = "localhost:9092"; String group = "test-group-0921"; @Test public void testConsumer1() throws InterruptedException { NativeConsumer nativeConsumer = new NativeConsumer(); try { nativeConsumer.exec(topic,zkAddr,4,group); } catch (UnsupportedEncodingException e1) { e1.printStackTrace(); } Thread.sleep(100000); } @Test public void testConsumer2() throws InterruptedException { NativeConsumer nativeConsumer = new NativeConsumer(); try { nativeConsumer.exec(topic,zkAddr,4,group); } catch (UnsupportedEncodingException e1) { e1.printStackTrace(); } Thread.sleep(100000); } @Test public void testProducer() throws UnsupportedEncodingException, InterruptedException { NativeProducer producer = new NativeProducer(); try { producer.produce(topic,brokerAddr); } catch (ExecutionException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } }

doc

  • Kafka Consumer接口
  • Kafka分区机制介绍与示例
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2017-09-21,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • partition
  • partition与consumer
  • rebalance
  • topicCountMap
    • 代码实例
    • doc
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档