前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Kafka 自定义序列化器和反序列化器

Kafka 自定义序列化器和反序列化器

作者头像
CoderJed
发布2018-09-13 10:34:42
2.1K0
发布2018-09-13 10:34:42
举报
文章被收录于专栏:Jed的技术阶梯Jed的技术阶梯

1. 案例说明

有一个 Java 实体类 Customer,定义如下:

代码语言:javascript
复制
package com.bonc.rdpe.kafka110.beans;

/**
 * @Title Customer.java
 * @Description JavaBean类,测试 Kafka 自定义序列化器使用
 * @Author YangYunhe
 * @Date 2018-06-21 11:28:15
 */
public class Customer {

    private int cid;
    private String cname;
    
    public Customer() {}
    
    public Customer(int cid, String cname) {
        this.cid = cid;
        this.cname = cname;
    }

    public int getCid() {
        return cid;
    }

    public void setCid(int cid) {
        this.cid = cid;
    }

    public String getCname() {
        return cname;
    }

    public void setCname(String cname) {
        this.cname = cname;
    }

    @Override
    public String toString() {
        return "Customer [cid=" + cid + ", cname=" + cname + "]";
    }
}

现 Kafka Producer 需要把 Customer 类的对象序列化成字节数组发送给 Kafka Broker,同时 Kafka Consumer 需要把字节数组反序列化为一个 Customer 对象

2. 自定义序列化器和反序列化器

(1) 自定义序列化器

代码语言:javascript
复制
package com.bonc.rdpe.kafka110.serializer;

import java.nio.ByteBuffer;
import java.util.Map;

import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;

import com.bonc.rdpe.kafka110.beans.Customer;

/**
 * @Title CustomerSerializer.java 
 * @Description 自定义的序列化器,用于发送 Customer对象
 * @Author YangYunhe
 * @Date 2018-06-21 11:32:30
 */
public class CustomerSerializer implements Serializer<Customer> {
    
    /**
     * 把 Java 对象序列化为字节数组的核心方法
     * Customer对象被序列化成:
     * 表示customerID的4字节整数
     * 表示customerName长度的4字节整数(如果customerName为空,则长度为0)
     * 表示customerName的N个字节
     */
    @Override
    public byte[] serialize(String topic, Customer data) {
        try {
            byte[] serializedName;
            int serializedSize;
            if(data == null) {
                return null;
            }else {
                if(data.getCname() != null) {
                    // Customer 的 cname 属性
                    serializedName = data.getCname().getBytes("UTF-8");
                    serializedSize = serializedName.length;
                }else {
                    serializedName = new byte[0];
                    serializedSize = 0;
                }
            }
            // 创建一个 ByteBuffer,容量为 4 + 4 + serializedSize
            ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + serializedSize);
            buffer.putInt(data.getCid());
            buffer.putInt(serializedSize);
            buffer.put(serializedName);
            return buffer.array();
        } catch (Exception e) {
            throw new SerializationException("Error when serializing Customer to byte[] " + e);
        }
    }

    // 不做任何处理
    @Override
    public void configure(Map<String, ?> arg0, boolean arg1) {}
    
    // 不做任何处理
    @Override
    public void close() {}

}

(2) 自定义反序列化器

代码语言:javascript
复制
package com.bonc.rdpe.kafka110.deserializer;

import java.nio.ByteBuffer;
import java.util.Map;

import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;

import com.bonc.rdpe.kafka110.beans.Customer;

/**
 * @Title CustomerDeserializer.java 
 * @Description 自定义的反序列化器,用于解析接收到的 byte[]
 * @Author YangYunhe
 * @Date 2018-06-22 12:42:33
 */
public class CustomerDeserializer implements Deserializer<Customer>{

    @Override
    public void close() {}
    
    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {}

    @Override
    public Customer deserialize(String topic, byte[] data) {
        int cid;
        int nameSize;
        String cname;
        try {
            if(data == null) {
                return null;
            }
            if(data.length < 8) {
                throw new SerializationException("Size of data received by IntegerDeserializer is shorter than expected!");
            }
            ByteBuffer buffer = ByteBuffer.wrap(data);
            cid = buffer.getInt();
            nameSize = buffer.getInt();
            byte[] nameBytes = new byte[nameSize];
            buffer.get(nameBytes);
            cname = new String(nameBytes, "UTF-8");
            return new Customer(cid, cname);
        }catch (Exception e) {
            throw new SerializationException("Error when deserializing byte[] to Customer: " + e.getMessage());
        }
    }
}

3. 发送和消费消息

(1) Kafka Producer 使用自定义的序列化器发送消息

代码语言:javascript
复制
package com.bonc.rdpe.kafka110.producer;

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import com.bonc.rdpe.kafka110.beans.Customer;

/**
 * @Title Producer04.java 
 * @Description 使用自定义序列化器发送Customer对象
 * @Author YangYunhe
 * @Date 2018-06-21 11:26:53
 */
public class CustomerProducer {
    
    public static void main(String[] args) throws Exception {

        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.42.89:9092,192.168.42.89:9093,192.168.42.89:9094");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // 设置序列化类为自定义的序列化类
        props.put("value.serializer", "com.bonc.rdpe.kafka110.serializer.CustomerSerializer");

        Producer<String, Customer> producer = new KafkaProducer<>(props);
        Customer customer = new Customer(1, "Jed");
        ProducerRecord<String, Customer> record = new ProducerRecord<>("dev3-yangyunhe-topic001", customer);
        producer.send(record);
        producer.close();
    }

}

(2) Kafka Consumer 使用自定义的反序列器解析消息

代码语言:javascript
复制
package com.bonc.rdpe.kafka110.consumer;

import java.util.Collections;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import com.bonc.rdpe.kafka110.beans.Customer;

/**
 * @Title CustomerConsumer.java 
 * @Description 使用自定义反序列化器解析Customer对象
 * @Author YangYunhe
 * @Date 2018-06-22 12:53:58
 */
public class CustomerConsumer {
    
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.42.89:9092,192.168.42.89:9093,192.168.42.89:9094");
        props.put("group.id", "dev3-yangyunhe-group002");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        // 设置反序列化类为自定义的反序列化类
        props.put("value.deserializer", "com.bonc.rdpe.kafka110.deserializer.CustomerDeserializer");
        KafkaConsumer<String, Customer> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("dev3-yangyunhe-topic001"));
        
        try {
            while(true) {
                ConsumerRecords<String, Customer> records = consumer.poll(100);
                for (ConsumerRecord<String, Customer> record : records) {
                    Customer customer = record.value();
                    System.out.println(customer.toString());
                }
            }
        } finally {
            consumer.close();
        }
    }
}

4. 测试结果

先启动 CustomerConsumer 程序,再启动 CustomerProducer 程序发送一个 Customer 对象,CustomerConsumer 消费到消息后在控制台打印:

代码语言:javascript
复制
Customer [cid=1, cname=Jed]

5. 说明

如果发送到 Kafka 的对象不是简单的字符串或整型,那么可以使用序列化框架来创建消息记录,如 Avro、Thrift 或 Protobuf,或者使用自定义序列化器。建议使用通用的序列化框架,因为自定义的序列化器和反序列化器把生产者和消费者紧紧地耦合在一起,很脆弱,并且容易出错。关于 Kafka 如何使用 Avro 序列化框架,可以参考以下三篇文章:

Kafka 中使用 Avro 序列化框架(一):使用传统的 avro API 自定义序列化类和反序列化类

Kafka 中使用 Avro 序列化框架(二):使用 Twitter 的 Bijection 类库实现 avro 的序列化与反序列化

Kafka 中使用 Avro 序列化组件(三):Confluent Schema Registry

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018.06.22 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 案例说明
  • 2. 自定义序列化器和反序列化器
    • (1) 自定义序列化器
      • (2) 自定义反序列化器
      • 3. 发送和消费消息
        • (1) Kafka Producer 使用自定义的序列化器发送消息
          • (2) Kafka Consumer 使用自定义的反序列器解析消息
          • 4. 测试结果
          • 5. 说明
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档