前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Kafka 中使用 Avro 序列化框架(二):使用 Twitter 的 Bijection 类库实现 avro 的序列化与反序列化

Kafka 中使用 Avro 序列化框架(二):使用 Twitter 的 Bijection 类库实现 avro 的序列化与反序列化

作者头像
CoderJed
发布2018-09-13 10:35:59
1.2K0
发布2018-09-13 10:35:59
举报
文章被收录于专栏:Jed的技术阶梯Jed的技术阶梯

使用传统的 avro API 自定义序列化类和反序列化类比较麻烦,需要根据 schema 生成实体类,需要调用 avro 的 API 实现 对象到 byte[] 和 byte[] 到对象的转化,而那些方法看上去比较繁琐,幸运的是,Twitter 开源的类库 Bijection 对传统的 Avro API 进行了封装了和优化,让我们可以方便的实现以上操作。

1. 添加 Bijection 类库的依赖,并新建一个 schema 文件

Bijection 类库的依赖如下:

<dependency>
    <groupId>com.twitter</groupId>
    <artifactId>bijection-avro_2.11</artifactId>
    <version>0.9.6</version>
</dependency>

在 maven 工程的 resources 目录下新建一个 schema 文件,名称为"user.json",因为我们不用 avro 生成实体类的方式,所以定义一个普通的 json 文件来描述 schema 即可,另外,在 json 文件中,也不需要"namespace": "packageName"这个限定生成实体类的包名的参数,本文使用的 json 文件内容如下:

{
    "type": "record",
    "name": "User",
    "fields": [
        {"name": "id", "type": "int"},
        {"name": "name",  "type": "string"},
        {"name": "age", "type": "int"}
    ]
}

2. KafkaProducer 使用 Bijection 类库发送序列化后的消息

package com.bonc.rdpe.kafka110.producer;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.util.Properties;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import com.twitter.bijection.Injection;
import com.twitter.bijection.avro.GenericAvroCodecs;

/**
 * @Title BijectionProducer.java 
 * @Description KafkaProducer 使用 Bijection 类库发送序列化后的消息
 * @Author YangYunhe
 * @Date 2018-06-22 10:42:06
 */
public class BijectionProducer {

    public static void main(String[] args) throws Exception {
        
        String schemaFilePath = BijectionProducer.class.getClassLoader().getResource("user.json").getPath();
        FileReader fr = new FileReader(new File(schemaFilePath));
        BufferedReader br = new BufferedReader(fr);
        StringBuilder sb = new StringBuilder();
        String line;
        while((line = br.readLine()) != null) {
            sb.append(line).append("\n");
        }
        String schemaStr = sb.toString();
        br.close();
        fr.close();
        
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.42.89:9092,192.168.42.89:9093,192.168.42.89:9094");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");

        Schema.Parser parser = new Schema.Parser();
        Schema schema = parser.parse(schemaStr);
        Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema);
        
        Producer<String, byte[]> producer = new KafkaProducer<>(props);
        
        for (int i = 0; i < 100; i++) {
            GenericData.Record avroRecord = new GenericData.Record(schema);
            avroRecord.put("id", i);
            avroRecord.put("name", "name" + i);
            avroRecord.put("age", 22);
            byte[] avroRecordBytes = recordInjection.apply(avroRecord);
            ProducerRecord<String, byte[]> record = new ProducerRecord<>("dev3-yangyunhe-topic001", avroRecordBytes);
            producer.send(record);
            Thread.sleep(1000);
        }
        producer.close();
    }
}

3. KafkaConsumer 使用 Bijection 类库来反序列化消息

package com.bonc.rdpe.kafka110.consumer;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.util.Collections;
import java.util.Properties;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import com.bonc.rdpe.kafka110.producer.BijectionProducer;
import com.twitter.bijection.Injection;
import com.twitter.bijection.avro.GenericAvroCodecs;

/**
 * @Title BijectionConsumer.java 
 * @Description KafkaConsumer 使用 Bijection 类库来反序列化消息
 * @Author YangYunhe
 * @Date 2018-06-22 11:10:29
 */
public class BijectionConsumer {
    
    public static void main(String[] args) throws Exception {
        
        String schemaFilePath = BijectionProducer.class.getClassLoader().getResource("user.json").getPath();
        FileReader fr = new FileReader(new File(schemaFilePath));
        BufferedReader br = new BufferedReader(fr);
        StringBuilder sb = new StringBuilder();
        String line;
        while((line = br.readLine()) != null) {
            sb.append(line).append("\n");
        }
        String schemaStr = sb.toString();
        br.close();
        fr.close();
        
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.42.89:9092,192.168.42.89:9093,192.168.42.89:9094");
        props.put("group.id", "dev3-yangyunhe-group001");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);

        consumer.subscribe(Collections.singletonList("dev3-yangyunhe-topic001"));
        Schema.Parser parser = new Schema.Parser();
        Schema schema = parser.parse(schemaStr);
        Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema);
        
        try {
            while(true) {
                ConsumerRecords<String, byte[]> records = consumer.poll(1000);
                for (ConsumerRecord<String, byte[]> record : records) {
                    GenericRecord genericRecord = recordInjection.invert(record.value()).get();
                    System.out.println("value = [user.id = " + genericRecord.get("id") + ", " +
                            "user.name = " + genericRecord.get("name") + ", " +
                            "user.age = " + genericRecord.get("age") + "], " + 
                            "partition = " + record.partition() + ", " + 
                            "offset = " + record.offset());
                }
            }
        } finally {
            consumer.close();
        }
    }
}

4. 测试结果

先运行 KafkaConsumer,没有输出

当运行 KakfaProducer 后,KakfaConsumer 控制台输出:

value = [user.id = 0, user.name = name0, user.age = 22], partition = 2, offset = 662
value = [user.id = 1, user.name = name1, user.age = 22], partition = 1, offset = 663
value = [user.id = 2, user.name = name2, user.age = 22], partition = 0, offset = 663
value = [user.id = 3, user.name = name3, user.age = 22], partition = 2, offset = 663
value = [user.id = 4, user.name = name4, user.age = 22], partition = 1, offset = 664

......

参考文章:

在Kafka中使用Avro编码消息:Producter篇

在Kafka中使用Avro编码消息:Consumer篇

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018.06.22 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 添加 Bijection 类库的依赖,并新建一个 schema 文件
  • 2. KafkaProducer 使用 Bijection 类库发送序列化后的消息
  • 3. KafkaConsumer 使用 Bijection 类库来反序列化消息
  • 4. 测试结果
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档