前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Kafka之Producer生产者分区

Kafka之Producer生产者分区

作者头像
克虏伯
发布2020-07-14 11:34:31
3750
发布2020-07-14 11:34:31
举报

kafka-client的版本:0.10有个很重要的类Partitioner

List-1

代码语言:javascript
复制
public interface Partitioner extends Configurable {

    /**
     * Compute the partition for the given record.
     *
     * @param topic The topic name
     * @param key The key to partition on (or null if no key)
     * @param keyBytes The serialized key to partition on( or null if no key)
     * @param value The value to partition on or null
     * @param valueBytes The serialized value to partition on or null
     * @param cluster The current cluster metadata
     */
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);

    public void close();
}

    如List-1所示,发送消息时,传入到partition方法中,返回的int值就是分区号,即发送到哪个分区,默认的实现是DefaultPartitioner,如下List-2

List-2

代码语言:javascript
复制
public class DefaultPartitioner implements Partitioner {

    private final AtomicInteger counter = new AtomicInteger(new Random().nextInt());

    private static int toPositive(int number) {
        return number & 0x7fffffff;
    }

    public void configure(Map<String, ?> configs) {}

    /**
     * Compute the partition for the given record.
     *
     * @param topic The topic name
     * @param key The key to partition on (or null if no key)
     * @param keyBytes serialized key to partition on (or null if no key)
     * @param value The value to partition on or null
     * @param valueBytes serialized value to partition on or null
     * @param cluster The current cluster metadata
     */
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        //1获取所有的分区信息
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if (keyBytes == null) {
       //2如果没有指定key  
            int nextValue = counter.getAndIncrement();
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() > 0) {
          //3    
                int part = DefaultPartitioner.toPositive(nextValue) % availablePartitions.size();
                return availablePartitions.get(part).partition();
            } else {
                // no partitions are available, give a non-available partition
                //4
                return DefaultPartitioner.toPositive(nextValue) % numPartitions;
            }
        } else {
            // hash the keyBytes to choose a partition
            //5 如果有key,那么根据key进行hash计算得到hash值,之后对分区数取模
            return DefaultPartitioner.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }

    public void close() {}

}
  • 从cluster中获取集群的分区信息
  • 如果我们没有指定消息key,那个获取下一个递增int值
  • 3和4中,用递增值对分区数取模,这样达到轮询发送到各个分区
  • 如果指定了指定了key,那么对key取hash值,之后用hash值对分区数取模

    所以如果没有指定key,那么使用轮询发送到各个分区;如果指定了key,那么同key的发送到相同的分区

    如果我们想随机发送到分区,那个我们应该实现这个接口,而后指定使用这个分区算法。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档