专栏首页Jed的技术阶梯Kafka 中使用 Avro 序列化框架(二):使用 Twitter 的 Bijection 类库实现 avro 的序列化与反序列化

Kafka 中使用 Avro 序列化框架(二):使用 Twitter 的 Bijection 类库实现 avro 的序列化与反序列化

使用传统的 avro API 自定义序列化类和反序列化类比较麻烦,需要根据 schema 生成实体类,需要调用 avro 的 API 实现 对象到 byte[] 和 byte[] 到对象的转化,而那些方法看上去比较繁琐,幸运的是,Twitter 开源的类库 Bijection 对传统的 Avro API 进行了封装了和优化,让我们可以方便的实现以上操作。

1. 添加 Bijection 类库的依赖,并新建一个 schema 文件

Bijection 类库的依赖如下:

<dependency>
    <groupId>com.twitter</groupId>
    <artifactId>bijection-avro_2.11</artifactId>
    <version>0.9.6</version>
</dependency>

在 maven 工程的 resources 目录下新建一个 schema 文件,名称为"user.json",因为我们不用 avro 生成实体类的方式,所以定义一个普通的 json 文件来描述 schema 即可,另外,在 json 文件中,也不需要"namespace": "packageName"这个限定生成实体类的包名的参数,本文使用的 json 文件内容如下:

{
    "type": "record",
    "name": "User",
    "fields": [
        {"name": "id", "type": "int"},
        {"name": "name",  "type": "string"},
        {"name": "age", "type": "int"}
    ]
}

2. KafkaProducer 使用 Bijection 类库发送序列化后的消息

package com.bonc.rdpe.kafka110.producer;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.util.Properties;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import com.twitter.bijection.Injection;
import com.twitter.bijection.avro.GenericAvroCodecs;

/**
 * @Title BijectionProducer.java 
 * @Description KafkaProducer 使用 Bijection 类库发送序列化后的消息
 * @Author YangYunhe
 * @Date 2018-06-22 10:42:06
 */
public class BijectionProducer {

    public static void main(String[] args) throws Exception {
        
        String schemaFilePath = BijectionProducer.class.getClassLoader().getResource("user.json").getPath();
        FileReader fr = new FileReader(new File(schemaFilePath));
        BufferedReader br = new BufferedReader(fr);
        StringBuilder sb = new StringBuilder();
        String line;
        while((line = br.readLine()) != null) {
            sb.append(line).append("\n");
        }
        String schemaStr = sb.toString();
        br.close();
        fr.close();
        
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.42.89:9092,192.168.42.89:9093,192.168.42.89:9094");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");

        Schema.Parser parser = new Schema.Parser();
        Schema schema = parser.parse(schemaStr);
        Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema);
        
        Producer<String, byte[]> producer = new KafkaProducer<>(props);
        
        for (int i = 0; i < 100; i++) {
            GenericData.Record avroRecord = new GenericData.Record(schema);
            avroRecord.put("id", i);
            avroRecord.put("name", "name" + i);
            avroRecord.put("age", 22);
            byte[] avroRecordBytes = recordInjection.apply(avroRecord);
            ProducerRecord<String, byte[]> record = new ProducerRecord<>("dev3-yangyunhe-topic001", avroRecordBytes);
            producer.send(record);
            Thread.sleep(1000);
        }
        producer.close();
    }
}

3. KafkaConsumer 使用 Bijection 类库来反序列化消息

package com.bonc.rdpe.kafka110.consumer;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.util.Collections;
import java.util.Properties;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import com.bonc.rdpe.kafka110.producer.BijectionProducer;
import com.twitter.bijection.Injection;
import com.twitter.bijection.avro.GenericAvroCodecs;

/**
 * @Title BijectionConsumer.java 
 * @Description KafkaConsumer 使用 Bijection 类库来反序列化消息
 * @Author YangYunhe
 * @Date 2018-06-22 11:10:29
 */
public class BijectionConsumer {
    
    public static void main(String[] args) throws Exception {
        
        String schemaFilePath = BijectionProducer.class.getClassLoader().getResource("user.json").getPath();
        FileReader fr = new FileReader(new File(schemaFilePath));
        BufferedReader br = new BufferedReader(fr);
        StringBuilder sb = new StringBuilder();
        String line;
        while((line = br.readLine()) != null) {
            sb.append(line).append("\n");
        }
        String schemaStr = sb.toString();
        br.close();
        fr.close();
        
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.42.89:9092,192.168.42.89:9093,192.168.42.89:9094");
        props.put("group.id", "dev3-yangyunhe-group001");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);

        consumer.subscribe(Collections.singletonList("dev3-yangyunhe-topic001"));
        Schema.Parser parser = new Schema.Parser();
        Schema schema = parser.parse(schemaStr);
        Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema);
        
        try {
            while(true) {
                ConsumerRecords<String, byte[]> records = consumer.poll(1000);
                for (ConsumerRecord<String, byte[]> record : records) {
                    GenericRecord genericRecord = recordInjection.invert(record.value()).get();
                    System.out.println("value = [user.id = " + genericRecord.get("id") + ", " +
                            "user.name = " + genericRecord.get("name") + ", " +
                            "user.age = " + genericRecord.get("age") + "], " + 
                            "partition = " + record.partition() + ", " + 
                            "offset = " + record.offset());
                }
            }
        } finally {
            consumer.close();
        }
    }
}

4. 测试结果

先运行 KafkaConsumer,没有输出 当运行 KakfaProducer 后,KakfaConsumer 控制台输出:

value = [user.id = 0, user.name = name0, user.age = 22], partition = 2, offset = 662
value = [user.id = 1, user.name = name1, user.age = 22], partition = 1, offset = 663
value = [user.id = 2, user.name = name2, user.age = 22], partition = 0, offset = 663
value = [user.id = 3, user.name = name3, user.age = 22], partition = 2, offset = 663
value = [user.id = 4, user.name = name4, user.age = 22], partition = 1, offset = 664

......

参考文章: 在Kafka中使用Avro编码消息:Producter篇 在Kafka中使用Avro编码消息:Consumer篇

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • HBase Shell常用Shell命令

    scan的用法很多,参数,过滤条件可以很多,各种组合, 在此不列举过多的例子,参考 help 'scan'

    CoderJed
  • HBase Java API 03:HBase与MapReduce整合

    编写MapReduce程序,把"student"表中"info"列族下的"name"那一列抽取出来,存入新HBase表"student_extract"中,要求...

    CoderJed
  • Hive窗口函数04-LAG、LEAD、FIRST_VALUE、LAST_VALUE

    Hive窗口函数LAG、LEAD、FIRST_VALUE、LAST_VALUE入门

    CoderJed
  • 飞步神速!何晓飞团队完成无人车深度学习芯片流片,算力创国内新高

    何晓飞教授自开启无人车创业以来,一如治学,过程中始终低调,但并不意味着没有大进展。

    量子位
  • springboot&ajax&has been blocked by CORS policy: No 'Access-Control-Allow-Origin

    Access to XMLHttpRequest at 'http://localhost:8080/user/login1' from origin 'htt...

    微风-- 轻许--
  • vue如何在页面上面输出html代码效果

    一般情况下vue中使用双大括号{{这里是变量}} 这种形式输出变量的话,即使变量中是html代码,它输出的结果也会将html代码转为普通文本输出。

    刘金玉编程
  • Python爬虫之三:抓取猫眼电影TOP100

    运行平台: Windows Python版本: Python3.6 IDE: Sublime Text 其他工具:Chrome浏览器

    王强
  • pygrametl的使用--python

    pygrametl是一个python的package用于ETL(Extract-Transform-Load )

    py3study
  • 一文读懂如何在 Kubernetes 上轻松实现自动化部署 Prometheus

    Prometheus 是当下火热的监控解决方案,尤其是容器微服务架构,Kubernetes 的首选监控方案。关于为什么要用 Prometheus,我这里就不多讲...

    kubernetes中文社区
  • Istio 1.2发布:版本迭代加快,流量管理与安全增强

    不同于1.1版本万众期待,发布时间一推再推,最后历时七个半个月,1.2的发布效率确实出乎很多人意外。但是注意到1.2版本前1.1版本足足发满了1.1.1~1.1...

    Jintao Zhang

扫码关注云+社区

领取腾讯云代金券