1、Producer的拦截器interceptor,和consumer端的拦截器interceptor是在kafka0.10版本被引入的,主要用于实现clients端的定制化控制逻辑,生产者拦截器可以用在消息发送前做一些准备工作,使用场景,如下所示:
1)、按照某个规则过滤掉不符合要求的消息。 2)、修改消息的内容。 3)、统计类需求。
1 package com.demo.kafka.listener;
2
3 import java.util.Map;
4
5 import org.apache.kafka.clients.producer.ProducerInterceptor;
6 import org.apache.kafka.clients.producer.ProducerRecord;
7 import org.apache.kafka.clients.producer.RecordMetadata;
8
9 /**
10 * 生产者拦截器
11 *
12 * @author 生产者拦截器
13 *
14 */
15
16 public class ProducerInterceptorPrefix implements ProducerInterceptor<String, String> {
17
18 // 发送成功计数
19 private volatile long sendSuccess = 0;
20
21 // 发送失败计数
22 private volatile long sendFailure = 0;
23
24 /**
25 *
26 */
27 @Override
28 public void configure(Map<String, ?> configs) {
29
30 }
31
32 /**
33 * 发送消息已经操作消息的方法
34 */
35 @Override
36 public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
37 String modifiedValue = "前缀prefix : " + record.value();
38 ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(
39 record.topic(), // 主题
40 record.partition(), // 分区
41 record.timestamp(), // 时间戳
42 record.key(), // key值
43 modifiedValue, // value值
44 record.headers()); // 消息头
45 return producerRecord;
46 }
47
48 /**
49 * ack确认的方法
50 */
51 @Override
52 public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
53 if(exception == null) {
54 sendSuccess++;
55 }else {
56 sendFailure++;
57 }
58 }
59
60 /**
61 * 关闭的方法,发送成功之后会将拦截器关闭,调用此方法
62 */
63 @Override
64 public void close() {
65 double successRation = (double)sendSuccess / (sendSuccess + sendFailure);
66 System.out.println("【INFO 】 发送成功率: " + String.format("%f", successRation * 100) + "%");
67 }
68
69 }
生产者客户端要配置一下Producer的拦截器interceptor,如下所示:
1 package com.demo.kafka.producer;
2
3 import java.util.Properties;
4 import java.util.concurrent.ExecutionException;
5
6 import org.apache.kafka.clients.producer.KafkaProducer;
7 import org.apache.kafka.clients.producer.ProducerConfig;
8 import org.apache.kafka.clients.producer.ProducerRecord;
9 import org.apache.kafka.clients.producer.RecordMetadata;
10 import org.apache.kafka.common.serialization.StringSerializer;
11
12 import com.demo.kafka.listener.ProducerInterceptorPrefix;
13
14 public class KafkaProducerSimple {
15
16 // 设置服务器地址
17 private static final String brokerList = "192.168.110.142:9092";
18
19 // 设置主题
20 private static final String topic = "topic-demo";
21
22 public static void main(String[] args) {
23 Properties properties = new Properties();
24 // 设置key的序列化器
25 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
26
27 // 设置重试次数
28 properties.put(ProducerConfig.RETRIES_CONFIG, 10);
29
30 // 设置值的序列化器
31 properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
32
33 // 打印输出序列化器的路径信息
34 System.err.println(StringSerializer.class.getName());
35
36 // 设置集群地址
37 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
38
39 // 自定义拦截器使用,可以计算发送成功率或者失败率,进行消息的拼接或者过滤操作
40 properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptorPrefix.class.getName());
41
42 // 将参数配置到生产者对象中
43 KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
44
45 for (int i = 0; i < 100000; i++) {
46 // 生产者消息记录
47 ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, "hello world!!!" + i);
48 // 同步获取消息
49 // RecordMetadata recordMetadata = producer.send(record).get();
50 producer.send(record);
51 }
52
53 // 关闭
54 producer.close();
55 }
56
57 }
消费者代码,如下所示:
1 package com.demo.kafka.consumer;
2
3 import java.time.Duration;
4 import java.util.Collections;
5 import java.util.Properties;
6
7 import org.apache.kafka.clients.consumer.ConsumerConfig;
8 import org.apache.kafka.clients.consumer.ConsumerRecord;
9 import org.apache.kafka.clients.consumer.ConsumerRecords;
10 import org.apache.kafka.clients.consumer.KafkaConsumer;
11 import org.apache.kafka.clients.producer.ProducerConfig;
12 import org.apache.kafka.common.serialization.StringDeserializer;
13
14 public class KafkaConsumerSimple {
15
16 // 设置服务器地址
17 private static final String bootstrapServer = "192.168.110.142:9092";
18
19 // 设置主题
20 private static final String topic = "topic-demo";
21
22 // 设置消费者组
23 private static final String groupId = "group.demo";
24
25 public static void main(String[] args) {
26 Properties properties = new Properties();
27 // 设置反序列化key参数信息
28 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
29 // 设置反序列化value参数信息
30 properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
31
32 // 设置服务器列表信息
33 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
34
35 // 设置消费者组信息
36 properties.put("group.id", groupId);
37
38 // 将参数设置到消费者参数中
39 KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
40
41 // 消息订阅
42 consumer.subscribe(Collections.singletonList(topic));
43
44 while (true) {
45 // 每隔一秒监听一次
46 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
47 // 获取到消息信息
48 for (ConsumerRecord<String, String> record : records) {
49 System.err.println(record.toString());
50 }
51 }
52
53 }
54
55 }
2、生产者的acks参数,这个参数用来指定分区中必须有多少副本来收到这条消息,之后生产者才会认为这条消息写入成功的。acks是生产者客户端中非常重要的一个参数,它涉及到消息的可靠性和吞吐量之间的权衡。
1)、ack等于0,生产者在成功写入消息之前不会等待任何来自服务器的响应。如果出现问题生产者是感知不到的,消息就丢失了,不过因为生产者不需要等待服务器响应,所以他可以以网络能够支持的最大速度发送消息,从而达到很高的吞吐量。 2)、acks等于1,默认值为1,只要集群的首领节点收到消息,生产者就会收到一个来自服务器的成功响应。如果消息无法达到首领节点,比如首领节点崩溃,新的首领节点还没有被选举出来,生产者会收到一个错误响应,为了避免数据丢失,生产者会重发消息。但是这样还有可能会导致数据丢失,如果收到写成功通知,此时首领节点还没有来的及同步数据到follower节点,首领节点崩溃,就会导致数据丢失。 3)、acks等于-1,只有当所有参与复制的节点收到消息时候,生产者会收到一个来自服务器额成功响应,这种模式 最安全的,他可以保证不止一个服务器收到消息。
注意,acks参数配置的是一个字符串类型,而不是整数类型,如果配置为整数类型会抛出异常信息。
3、kafka消费者订阅主题和分区,创建完消费者后我们便可以订阅主题了,只需要调用subscribe方法即可,这个方法会接受一个主题列表,如下所示:
另外,我们也可以使用正则表达式来匹配多个主题,而且订阅之后如果又有匹配的新主题,那么这个消费组立即对其进行消费。正则表达式在连接kafka与其他系统非常有用。比如订阅所有的测试主题。
1 package com.demo.kafka.consumer;
2
3 import java.time.Duration;
4 import java.util.Arrays;
5 import java.util.Collections;
6 import java.util.Properties;
7 import java.util.regex.Pattern;
8
9 import org.apache.kafka.clients.consumer.ConsumerConfig;
10 import org.apache.kafka.clients.consumer.ConsumerRecord;
11 import org.apache.kafka.clients.consumer.ConsumerRecords;
12 import org.apache.kafka.clients.consumer.KafkaConsumer;
13 import org.apache.kafka.clients.producer.ProducerConfig;
14 import org.apache.kafka.common.TopicPartition;
15 import org.apache.kafka.common.serialization.StringDeserializer;
16
17 public class KafkaConsumerSimple {
18
19 // 设置服务器地址
20 private static final String bootstrapServer = "192.168.110.142:9092";
21
22 // 设置主题
23 private static final String topic = "topic-demo";
24
25 // 设置主题
26 private static final String topic2 = "topic-demo2";
27
28 // 设置消费者组
29 private static final String groupId = "group.demo";
30
31 public static void main(String[] args) {
32 Properties properties = new Properties();
33 // 设置反序列化key参数信息
34 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
35 // 设置反序列化value参数信息
36 properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
37
38 // 设置服务器列表信息,必填参数,该参数和生产者相同,,制定链接kafka集群所需的broker地址清单,可以设置一个或者多个
39 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
40
41 // 设置消费者组信息,消费者隶属的消费组,默认为空,如果设置为空,则会抛出异常,这个参数要设置成具有一定业务含义的名称
42 properties.put("group.id", groupId);
43
44 // 制定kafka消费者对应的客户端id,默认为空,如果不设置kafka消费者会自动生成一个非空字符串。
45 properties.put("client.id", "consumer.client.id.demo");
46
47 // 将参数设置到消费者参数中
48 KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
49
50 // 消息订阅
51 consumer.subscribe(Collections.singletonList(topic));
52 // 可以订阅多个主题
53 consumer.subscribe(Arrays.asList(topic, topic2));
54 // 可以使用正则表达式进行订阅
55 consumer.subscribe(Pattern.compile("topic-demo*"));
56
57 // 指定订阅的分区
58 consumer.assign(Arrays.asList(new TopicPartition(topic, 0)));
59
60 while (true) {
61 // 每隔一秒监听一次
62 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
63 // 获取到消息信息
64 for (ConsumerRecord<String, String> record : records) {
65 System.err.println(record.toString());
66 }
67 }
68
69 }
70
71 }