首页
学习
活动
专区
工具
TVP
发布

Kafka之文件消费者应用案例

【导读:数据是二十一世纪的石油,蕴含巨大价值,这是·情报通·大数据技术系列第[61]篇文章,欢迎阅读和收藏】

1 基本概念

消息系统的组成是生产者、存储系统和消费者,消费者会从存储系统读取生产者写入的消息。Kafka 作为消息系统在主题当中保存消息的信息。生产者向主题写入数据,消费者从主题读取数据。由于 Kafka 的特性是支持分布式,同时也是基于分布式的,所以主题也是可以在多个节点上被分区和覆盖的。Kafka 集成了 Producer/Consumer 链接的 Broker 的客户端工具,但是在消息处理方便,这两者主要用于服务端( Broker )的简单操作,如:创建 Topic 、罗列出已存在的 Topic 、对已有的 Topic 的 Producer/Consumer 测试。跟其他消息系统一样, Kafka 提供了多种不同语言实现的客户端 API ,如 Java 、 Python 、 Ruby 、 Go 等。这些 API 极大的方便用户使用 Kafka 集群。

2 术语解释

Kafka 包含四种核心的 API

1、 Producer API :支持应用将数据流发送到 Kafka 集群的主题。

2、 Consumer API :支持应用从 Kafka 集群的主题中读取数据流。

3、 Streams API :支持数据流从输入主题转化到输出主题

4、 Connect API :支持实现持续的从一些源系统或应用划入 Kafka 或者从 Kafka 推入一些原系统或应用的接口。

3 基于 Application 的生产者消费者实例

1、 创建 Kafka 生产者

创建 Kafka 生产者有三个基本属性:

n Bootstrap.servers: 指定了生产者建立初始连接的 broke 列表

n Key.serializer: 指定了用来序列化键值的类

n Value.serializer: 指定了用来序列化消息记录的类,与 key.serializer 差不多

创建完生产者后,就可以发送消息, Kafka 中有三种发送消息的方式:

n 只发不管结果( fire-and-forget ) : 只调用接口发送消息到 Kafka 服务器,但不管成功写入与否。由于 Kafka 是高可用的,因此大部分情况下消息都会写入,但在异常情况下会丢消息。

n 同步发送( Synchronous send ):调用 send ()方法返回一个 Future 对象,我们可以使用它的 get ()方法来判断消息发送是否成功。

n 异步发送( Asynchronous send ):调用 send ()时提供一个回调方法,当接受到 broker 结果后回调此方法 .

下面是单线程发送的简单代码:

public static void produce() throws Exception {

//topic

String topic = "mytopic";

// 配置

Properties properties = new Properties();

properties.put("bootstrap.servers", "127.0.0.1:9092");

// 序列化类型 properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");

// 创建生产者

KafkaProducer pro = new KafkaProducer(properties);

for (int i=0;i

pro.send(new ProducerRecord(topic, "message: " + i++));

//sleep

Thread.sleep(1000);

}

2、 创建消费者:

创建 Kafka 消费者有两个基本属性:

n Zookeeper.connect: 声明 zookeeper 服务器和端口

n Group.id :指定消费者分组

示例代码如下:

private String topic;

public MyConsumer(String topic){

super();

this.topic = topic;

}

public void run() {

ConsumerConnector consumer = createConsumer();

Map topicCountMap = new HashMap();

topicCountMap.put(topic, 1); // 一次从主题中获取一个数据

Map>> messageStreams = consumer.createMessageStreams(topicCountMap);

KafkaStream stream = messageStreams.get(topic).get(0);// 获取每次接收到的这个数据

ConsumerIterator iterator = stream.iterator();

while(iterator.hasNext()){

String message = new String(iterator.next().message());

}

}

private ConsumerConnector createConsumer() {

Properties properties = new Properties();

properties.put("zookeeper.connect", "127.0.0.1:2181");// 声明 zk

properties.put("group.id", "group5");// 必须要使用别的组名称, 如果生产者和消费者都在同一组,则不能访问同一组内的 topic 数据

return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));

}

  • 发表于:
  • 原文链接https://kuaibao.qq.com/s/20200216A0063800?refer=cp_1026
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券