前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >kafka多个分区一个消费_kafka集群节点挂掉

kafka多个分区一个消费_kafka集群节点挂掉

作者头像
全栈程序员站长
发布2022-11-08 09:54:25
3200
发布2022-11-08 09:54:25
举报
文章被收录于专栏:全栈程序员必看

之前的csdn找不回来了,决定重新注册一个。望支持~~~

为了解决多台服务,共同消费kafka消息,目前采用的是随机partition。

画了个简图:

kafka多个分区一个消费_kafka集群节点挂掉
kafka多个分区一个消费_kafka集群节点挂掉
代码语言:javascript
复制
/**  
    * @ClassName: RiskPartitioner  
    * @author DHing  
    * 
*/  
    
public class RiskPartitioner implements Partitioner {

	private Logger LOG =  LoggerFactory.getLogger(getClass());
	  
	    /* (非 Javadoc)  
	    *   
	    *   
	    * @param topic
	    * @param key
	    * @param keyBytes
	    * @param value
	    * @param valueBytes
	    * @param cluster
	    * @return  
	    * @see org.apache.kafka.clients.producer.Partitioner#partition(java.lang.String, java.lang.Object, byte[], java.lang.Object, byte[], org.apache.kafka.common.Cluster)  
	    *这个方法就决定了消息往哪个分区里面发送
              这个方法的返回值就是表示我们的数据要去哪个分区,如果返回值是0,表示我们的数据去0分区
	    */  
	    
	@Override
	public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
		LOG .info("Collecting Kafka data:[ topic : {} ], [ value : {} ] " , topic, value);
		 List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
	        int num = partitions.size();
	        int partNum = 0;
	        try {
	          //  partNum = Integer.parseInt((String) key);
	           partNum = new Random().nextInt(255);
	        } catch (Exception e) {
	            partNum = key.hashCode();
	        }
	    return Math.abs(partNum % num);
	}

	@Override
	public void close() {
		
	}

	@Override
	public void configure(Map<String, ?> configs) {
		
	}
}

我们定定义分区过后,需要加入到Config进行生效:

代码语言:javascript
复制
@Configuration
public class KafkaProducerConfig {
	@Value("${kafka.server.producer.urls}")
	private String urls;
	@Value("${kafka.server.producer.key}")
	private String key;
	@Value("${kafka.server.producer.value}")
	private String value;
	
	private String acks;
	
	private String retries;
	
	private String batchSize;
	
	private String 	partitioner;
	
	
	public Properties getProp(){
		 Properties props = new Properties();
	        props.put("bootstrap.servers", urls);
	        props.put("acks", "all");
	        props.put("retries", 0);
	        props.put("batch.size", 16384);
	        props.put("linger.ms", 1);
	        props.put("buffer.memory", 33554432);
	        //自定义分区类
	        props.put("partitioner.class", "这就是我们的定义分区类的包属性, eg: com.x.x.patition");
	        props.put("key.serializer", key);
	        props.put("value.serializer", value);
			return props;
	}
	
	@Bean
	@Qualifier("kafkaProducer")
	public KafkaProducer<LongSerializer, StringSerializer> getKafka() {
		Properties props = new Properties();
//		props.put("bootstrap.servers", urls);
//		props.put("key.serializer", key);
//		props.put("value.serializer", value);
	    props.put("bootstrap.servers", urls);
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        //自定义分区类
        props.put("partitioner.class", "这就是我们的定义分区类的包属性, eg: com.x.x.patition");
        props.put("key.serializer", key);
        props.put("value.serializer", value);
		return new KafkaProducer<LongSerializer, StringSerializer>(props);
	}

}

亲测没有问题,还有其他的方式进行自定义分区。这个就是算法的问题(个人理解)

之前的csdn找不回来了,决定重新注册一个。望支持~~~

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/184515.html原文链接:https://javaforall.cn

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022年10月8日 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档