我正在使用星火流和数据被发送给卡夫卡。我要给卡夫卡发地图。假设我有一个20的Map (在流批持续时间中它可能增长到1000 )元素,如下所示:
HashMap<Integer,String> input = new HashMap<Integer,String>();
input.put(11,"One");
input.put(312,"two");
input.put(33,"One");
input.put(24,"One");
input.put(35,"One");
input.put(612,"One");
input.put(7,"One");
input.put(128,"One");
input.put(9,"One");
input.put(10,"One");
input.put(11,"One1");
input.put(12,"two1");
input.put(13,"One1");
input.put(14,"One1");
input.put(15,"One1");
input.put(136,"One1");
input.put(137,"One1");
input.put(158,"One1");
input.put(159,"One1");
input.put(120,"One1");
Set<Integer> inputKeys = input.keySet();
Iterator<Integer> inputKeysIterator = inputKeys.iterator();
while (inputKeysIterator.hasNext()) {
Integer key = inputKeysIterator.next();
ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>(topic,
key%10, input.get(key));
KafkaProducer.send(record);
}我的Kafka主题是有10个分区。这里我调用了kafkaProducer.send() 20次,因此打了20次Kafka电话。如何在批处理中(即在一个Kafka调用中)发送整个数据,但我还是要确保每个记录都由公式key%10驱动的特定分区,如
ProducerRecord ProducerRecord=新ProducerRecord( key%10__,input.get(key));
我看到的选项:linger.ms=1可以确保这一点,但延迟时间为1ms。如何避免这种延迟,避免20个网络( Kafka )调用或最小化Kafka呼叫?
发布于 2018-10-02 23:46:56
Kafka生产者API已经分批发送消息,即使您一个接一个地调用。
请参阅文档中的batch.size,它是按字节,而不是按消息,但您可以通过调用生产者上的刷新来强制实际的网络事件。
关于分区,您需要创建代码分区程序。简单地将mod值作为键传递并不能保证在默认分区程序中不会发生哈希冲突
https://stackoverflow.com/questions/52611397
复制相似问题