Kafka 自定义序列化器和反序列化器

1. 案例说明

有一个 Java 实体类 Customer,定义如下:

package com.bonc.rdpe.kafka110.beans;

/**
 * @Title Customer.java
 * @Description JavaBean类,测试 Kafka 自定义序列化器使用
 * @Author YangYunhe
 * @Date 2018-06-21 11:28:15
 */
public class Customer {

    private int cid;
    private String cname;
    
    public Customer() {}
    
    public Customer(int cid, String cname) {
        this.cid = cid;
        this.cname = cname;
    }

    public int getCid() {
        return cid;
    }

    public void setCid(int cid) {
        this.cid = cid;
    }

    public String getCname() {
        return cname;
    }

    public void setCname(String cname) {
        this.cname = cname;
    }

    @Override
    public String toString() {
        return "Customer [cid=" + cid + ", cname=" + cname + "]";
    }
}

现 Kafka Producer 需要把 Customer 类的对象序列化成字节数组发送给 Kafka Broker,同时 Kafka Consumer 需要把字节数组反序列化为一个 Customer 对象

2. 自定义序列化器和反序列化器

(1) 自定义序列化器

package com.bonc.rdpe.kafka110.serializer;

import java.nio.ByteBuffer;
import java.util.Map;

import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;

import com.bonc.rdpe.kafka110.beans.Customer;

/**
 * @Title CustomerSerializer.java 
 * @Description 自定义的序列化器,用于发送 Customer对象
 * @Author YangYunhe
 * @Date 2018-06-21 11:32:30
 */
public class CustomerSerializer implements Serializer<Customer> {
    
    /**
     * 把 Java 对象序列化为字节数组的核心方法
     * Customer对象被序列化成:
     * 表示customerID的4字节整数
     * 表示customerName长度的4字节整数(如果customerName为空,则长度为0)
     * 表示customerName的N个字节
     */
    @Override
    public byte[] serialize(String topic, Customer data) {
        try {
            byte[] serializedName;
            int serializedSize;
            if(data == null) {
                return null;
            }else {
                if(data.getCname() != null) {
                    // Customer 的 cname 属性
                    serializedName = data.getCname().getBytes("UTF-8");
                    serializedSize = serializedName.length;
                }else {
                    serializedName = new byte[0];
                    serializedSize = 0;
                }
            }
            // 创建一个 ByteBuffer,容量为 4 + 4 + serializedSize
            ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + serializedSize);
            buffer.putInt(data.getCid());
            buffer.putInt(serializedSize);
            buffer.put(serializedName);
            return buffer.array();
        } catch (Exception e) {
            throw new SerializationException("Error when serializing Customer to byte[] " + e);
        }
    }

    // 不做任何处理
    @Override
    public void configure(Map<String, ?> arg0, boolean arg1) {}
    
    // 不做任何处理
    @Override
    public void close() {}

}

(2) 自定义反序列化器

package com.bonc.rdpe.kafka110.deserializer;

import java.nio.ByteBuffer;
import java.util.Map;

import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;

import com.bonc.rdpe.kafka110.beans.Customer;

/**
 * @Title CustomerDeserializer.java 
 * @Description 自定义的反序列化器,用于解析接收到的 byte[]
 * @Author YangYunhe
 * @Date 2018-06-22 12:42:33
 */
public class CustomerDeserializer implements Deserializer<Customer>{

    @Override
    public void close() {}
    
    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {}

    @Override
    public Customer deserialize(String topic, byte[] data) {
        int cid;
        int nameSize;
        String cname;
        try {
            if(data == null) {
                return null;
            }
            if(data.length < 8) {
                throw new SerializationException("Size of data received by IntegerDeserializer is shorter than expected!");
            }
            ByteBuffer buffer = ByteBuffer.wrap(data);
            cid = buffer.getInt();
            nameSize = buffer.getInt();
            byte[] nameBytes = new byte[nameSize];
            buffer.get(nameBytes);
            cname = new String(nameBytes, "UTF-8");
            return new Customer(cid, cname);
        }catch (Exception e) {
            throw new SerializationException("Error when deserializing byte[] to Customer: " + e.getMessage());
        }
    }
}

3. 发送和消费消息

(1) Kafka Producer 使用自定义的序列化器发送消息

package com.bonc.rdpe.kafka110.producer;

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import com.bonc.rdpe.kafka110.beans.Customer;

/**
 * @Title Producer04.java 
 * @Description 使用自定义序列化器发送Customer对象
 * @Author YangYunhe
 * @Date 2018-06-21 11:26:53
 */
public class CustomerProducer {
    
    public static void main(String[] args) throws Exception {

        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.42.89:9092,192.168.42.89:9093,192.168.42.89:9094");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // 设置序列化类为自定义的序列化类
        props.put("value.serializer", "com.bonc.rdpe.kafka110.serializer.CustomerSerializer");

        Producer<String, Customer> producer = new KafkaProducer<>(props);
        Customer customer = new Customer(1, "Jed");
        ProducerRecord<String, Customer> record = new ProducerRecord<>("dev3-yangyunhe-topic001", customer);
        producer.send(record);
        producer.close();
    }

}

(2) Kafka Consumer 使用自定义的反序列器解析消息

package com.bonc.rdpe.kafka110.consumer;

import java.util.Collections;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import com.bonc.rdpe.kafka110.beans.Customer;

/**
 * @Title CustomerConsumer.java 
 * @Description 使用自定义反序列化器解析Customer对象
 * @Author YangYunhe
 * @Date 2018-06-22 12:53:58
 */
public class CustomerConsumer {
    
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.42.89:9092,192.168.42.89:9093,192.168.42.89:9094");
        props.put("group.id", "dev3-yangyunhe-group002");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        // 设置反序列化类为自定义的反序列化类
        props.put("value.deserializer", "com.bonc.rdpe.kafka110.deserializer.CustomerDeserializer");
        KafkaConsumer<String, Customer> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("dev3-yangyunhe-topic001"));
        
        try {
            while(true) {
                ConsumerRecords<String, Customer> records = consumer.poll(100);
                for (ConsumerRecord<String, Customer> record : records) {
                    Customer customer = record.value();
                    System.out.println(customer.toString());
                }
            }
        } finally {
            consumer.close();
        }
    }
}

4. 测试结果

先启动 CustomerConsumer 程序,再启动 CustomerProducer 程序发送一个 Customer 对象,CustomerConsumer 消费到消息后在控制台打印:

Customer [cid=1, cname=Jed]

5. 说明

如果发送到 Kafka 的对象不是简单的字符串或整型,那么可以使用序列化框架来创建消息记录,如 Avro、Thrift 或 Protobuf,或者使用自定义序列化器。建议使用通用的序列化框架,因为自定义的序列化器和反序列化器把生产者和消费者紧紧地耦合在一起,很脆弱,并且容易出错。关于 Kafka 如何使用 Avro 序列化框架,可以参考以下三篇文章: Kafka 中使用 Avro 序列化框架(一):使用传统的 avro API 自定义序列化类和反序列化类 Kafka 中使用 Avro 序列化框架(二):使用 Twitter 的 Bijection 类库实现 avro 的序列化与反序列化 Kafka 中使用 Avro 序列化组件(三):Confluent Schema Registry

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Web项目聚集地

手写一个Mybatis框架

在手写自己的Mybatis框架之前,我们先来了解一下Mybatis,它的源码中使用了大量的设计模式,阅读源码并观察设计模式在其中的应用,才能够更深入的理解源...

11820
来自专栏大内老A

Enterprise Library深入解析与灵活应用(9):个人觉得比较严重的关于CachingCallHandler的Bug

微软EnterLib的Policy Injection Application Block(PIAB)是一个比较好用的轻量级的AOP框架,你可以通过创建自定义的...

21990
来自专栏小樱的经验随笔

Codeforces 706B Interesting drink

B. Interesting drink time limit per test:2 seconds memory limit per test:256 meg...

30880
来自专栏菩提树下的杨过

linq to sql中慎用Where<T>(Func<TSource, bool> predicate),小心被Linq给"骗"了!

近日在一个大型Web项目中,采用Linq to Sql替换原来的sqlcommand/sqldatareader方式来获取数据,上线后刚开始一切正常,但是随着访...

20150
来自专栏张善友的专栏

Dynamite动态排序库

易于使用和高性能动态排序库支持类似 SQL 语法和嵌套/复杂的表达式,使用 System.Linq.Expression 动态生成快速比较器。 使用此库就可以使...

231100
来自专栏DOTNET

C#要点补充

1字符串与时间的互转 DateTime.TryParse将空字符串、为null或格式不正确,则转换为的DateTime所代表的值为:0001/1/1 0:00...

30150
来自专栏me的随笔

使用AutoMapper进行对象间映射

在开发过程中,难免遇到下面这种情况:两个(或多个)对象所拥有的大多数属性是重复的,我们需要在对象间进行映射(即将一个对象的属性值赋给另一个对象。通常我们可以进行...

67820
来自专栏我叫刘半仙

原自己手写一个Mybatis框架(简化)

       继上一篇手写SpringMVC之后,我最近趁热打铁,研究了一下Mybatis。MyBatis框架的核心功能其实不难,无非就是动态代理和jdbc的操...

2K60
来自专栏算法修养

PAT 甲级 1021 Deepest Root (并查集,树的遍历)

1021. Deepest Root (25) 时间限制 1500 ms 内存限制 65536 kB 代码长度限制 16000 B ...

42370
来自专栏木宛城主

浪客剑心:位图法Bitmap算法分析

看了一篇文章《一道腾讯前端试题,谁来试试身手》,正好以前了解过位图法,确实不错。位图法适用于大规模数据,但数据状态又不是很多的情况。通常是用来判断某个数据存不...

40060

扫码关注云+社区

领取腾讯云代金券