前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >5.Java Kafka Demo

5.Java Kafka Demo

作者头像
ParkJun
发布2020-07-14 10:26:47
2.4K0
发布2020-07-14 10:26:47
举报
文章被收录于专栏:ParkJun随笔

直接开始吧

首先maven引入jar包

代码语言:javascript
复制
<!-- kafka -->
<dependency>
 	<groupId>org.apache.kafka</groupId>
	 <artifactId>kafka_2.12</artifactId>
	 <version>1.0.0</version>
		 <exclusions>
			<exclusion>
				<groupId>org.apache.zookeeper</groupId>
				<artifactId>zookeeper</artifactId>
			</exclusion>
			<exclusion>
				<groupId>org.slf4j</groupId>
				<artifactId>slf4j-log4j12</artifactId>
			</exclusion>
			<exclusion>
				<groupId>log4j</groupId>
				<artifactId>log4j</artifactId>
			</exclusion>
		</exclusions>
		<scope>provided</scope> 
	</dependency>


	<dependency>
   		 <groupId>org.apache.kafka</groupId>
   		 <artifactId>kafka-clients</artifactId>
  		  <version>1.0.0</version>
	</dependency>
	
	<dependency>
	    <groupId>org.apache.kafka</groupId>
	    <artifactId>kafka-streams</artifactId>
	    <version>1.0.0</version>
	</dependency>

新建Producer的Class

代码语言:javascript
复制
public class KafkaProducerTest implements Runnable {
private final KafkaProducer<String, String> producer;
private final String topic;


public KafkaProducerTest(String topicName) {
	Properties props = new Properties();
	props.put("bootstrap.servers", "x.x.x.x:9092");
	//acks=0:如果设置为0,生产者不会等待kafka的响应。
	//acks=1:这个配置意味着kafka会把这条消息写到本地日志文件中,但是不会等待集群中其他机器的成功响应。
	//acks=all:这个配置意味着leader会等待所有的follower同步完成。这个确保消息不会丢失,除非kafka集群中所有机器挂掉。这		是最强的可用性保证。
	props.put("acks", "all");
	//配置为大于0的值的话,客户端会在消息发送失败时重新发送。
	props.put("retries", 0);
	//当多条消息需要发送到同一个分区时,生产者会尝试合并网络请求。
	props.put("batch.size", 16384);
	props.put("key.serializer", StringSerializer.class.getName());
	props.put("value.serializer", StringSerializer.class.getName());
	this.producer = new KafkaProducer<String, String>(props);
	this.topic = topicName;
}

public void run() {
	int messageNo = 1;
	try {
		for(;;) {
        
			String messageStr="你好,这是第"+messageNo+"条数据";
			producer.send(new ProducerRecord<String, String>(topic, "Message", messageStr));			
			messageNo++;
            Thread.sleep(1000);
			}
		}
	} catch (Exception e) {
		e.printStackTrace();
	} finally {
		producer.close();
	}
}

public static void main(String args[]) {
	KafkaProducerTest test = new KafkaProducerTest("KAFKA_TEST");
	Thread thread = new Thread(test);
	thread.start();
}

新建Consumer的Class

代码语言:javascript
复制
    public class KafkaConsumerTest implements Runnable {

    private final KafkaConsumer<String, String> consumer;
    private ConsumerRecords<String, String> msgList;
    private  String topic;
    private static final String GROUPID = "groupA";



    public KafkaConsumerTest(String topicName) {
        Properties props = new Properties();
        //kafka消费的的地址
        props.put("bootstrap.servers", "master:9092,slave1:9092,slave2:9092");
        //组名 不同组名可以重复消费
        props.put("group.id", GROUPID);
        //是否自动提交
        props.put("enable.auto.commit", "true");
        //从poll(拉)的回话处理时长
        props.put("auto.commit.interval.ms", "1000");
        //超时时间
        props.put("session.timeout.ms", "30000");
        //一次最多拉取的条数
        props.put("max.poll.records", 1000);
//		earliest当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 
//		latest 
//		当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 
//		none 
//		topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
        props.put("auto.offset.reset", "earliest");
        //序列化
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", StringDeserializer.class.getName());
        this.consumer = new KafkaConsumer<String, String>(props);
        this.topic = topicName;
        //订阅主题列表topic
        this.consumer.subscribe(Arrays.asList(topic));

    }

    public void run() {
        int messageNo = 1;
        System.out.println("---------开始消费---------");
        try {
            for (;;) {
                    msgList = consumer.poll(10);//一次拉取10条
                    if(null!=msgList&&msgList.count()>0){
                    for (ConsumerRecord<String, String> record : msgList) {                                             
                            System.out.println(messageNo+"=======receive: key = " + record.key() + ", value = " + record.value()+" offset==="+record.offset());
                            messageNo++;
                    }
                }else{	
                    Thread.sleep(1000);
                }
            }		
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            consumer.close();
        }
    }

    public static void main(String args[]) {
        KafkaConsumerTest test1 = new KafkaConsumerTest("KAFKA_TEST");
        Thread thread1 = new Thread(test1);
        thread1.start();
    }
}

到此一个最简单的demo 就可以运行起来了,当然,看起来简单,内部还有很多深层次的东西,我们会在后续谈到!

这些配置可以在

org.apache.kafka.clients.consumer.ConsumerConfig 以及 org.apache.kafka.clients.producer.ProducerConfig 上

org.apache.kafka.clients.producer.ProducerConfig 下 中找到

本文归作者所有,未经作者允许,不得转载

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 直接开始吧
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档