Netty-整合kryo高性能数据传输

前言

本篇文章是Netty专题的第三篇,前面2篇文章如下:

Netty 是 开源的基于java的网络通信框架,在上篇文章高性能NIO框架Netty-对象传输中对象的传输用的是自定义的编解码器,基于JDK的序列化来实现的,其实Netty自带的Object编解码器就可以实现对象的传输,并且也是基于JDK的序列化,而Kryo是性能更好的java序列化框架,本篇文章我们将用Kryo来替换JDK的序列化实现高性能的数据传输。

Kryo可能大家用的还不是特别多,我第一次见Kryo是在当当扩展的dubbox中,其中有一条主要功能是这么介绍的:

  • 支持基于Kryo和FST的Java高效序列化实现:基于当今比较知名的Kryo和FST高性能序列化库,为Dubbo默认的RPC协议添加新的序列化实现,并优化调整了其序列化体系,比较显著的提高了Dubbo RPC的性能,详见文档中的基准测试报告。

为了提高RPC的性能,增加了Kryo和FST两种高性能的序列化方式,基准测试报告地址:https://dangdangdotcom.github.io/dubbox/serialization.html

Kryo介绍

Kryo是一种快速高效的Java对象序列化框架。 该项目的目标是速度、效率和易于使用的API。 当对象需要持久化时,无论是用于文件、数据库还是通过网络,该项目都很有用。

Kryo还可以执行自动深层浅层的复制/克隆。这是从对象直接复制到对象,而不是object-> bytes-> object。

除了前面介绍的dubbox使用了Kryo,还有很多的开源框架都用到了Kryo,请看下面的列表:

  • KryoNet (NIO networking)
  • Twitter's Scalding (Scala API for Cascading)
  • Twitter's Chill (Kryo serializers for Scala)
  • Apache Fluo (Kryo is default serialization for Fluo Recipes)
  • Apache Hive (query plan serialization)
  • Apache Spark (shuffled/cached data serialization)
  • DataNucleus (JDO/JPA persistence framework)
  • CloudPelican
  • Yahoo's S4 (distributed stream computing)
  • Storm (distributed realtime computation system, in turn used by many others)
  • Cascalog (Clojure/Java data processing and querying details)
  • memcached-session-manager (Tomcat high-availability sessions)
  • Mobility-RPC (RPC enabling distributed applications)
  • akka-kryo-serialization (Kryo serializers for Akka)
  • Groupon
  • Jive
  • DestroyAllHumans (controls a robot!)
  • kryo-serializers (additional serializers)

Kryo简单使用

添加Kryo的Maven依赖,我这边用的是比较老的版本,跟dubbox中的版本一致,当然大家也可以用最新的4.0版本

<!-- kryo -->
<dependency>
    <groupId>com.esotericsoftware.kryo</groupId>
    <artifactId>kryo</artifactId>
    <version>2.24.0</version>
</dependency>


<dependency>
    <groupId>de.javakaffee</groupId>
    <artifactId>kryo-serializers</artifactId>
    <version>0.26</version>
</dependency>

创建一个测试类来演示下序列化和反序列化的功能

import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;

public class KryoTest {

    public static void main(String[] args) throws FileNotFoundException {
        // 序列化
        Kryo kryo = new Kryo();
        Output output = new Output(new FileOutputStream("file.bin"));
        Message someObject = new Message();
        someObject.setContent("测试序列化");
        kryo.writeObject(output, someObject);
        output.close();
        // 反序列化
        Input input = new Input(new FileInputStream("file.bin"));
        Message message = kryo.readObject(input, Message.class);
        System.out.println(message.getContent());
        input.close();
    }
    
}

更多使用方式和细节请查看文档:https://github.com/EsotericSoftware/kryo

Netty整合Kryo进行序列化

  1. 创建一个工厂类KryoFactory,用于创建Kryo对象
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.serializers.DefaultSerializers;
import com.netty.im.core.message.Message;
import de.javakaffee.kryoserializers.*;
import java.lang.reflect.InvocationHandler;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.net.URI;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Pattern;

public abstract class KryoFactory {


    private final static KryoFactory threadFactory = new ThreadLocalKryoFactory();
    
    protected KryoFactory() {
    }
    
    public static KryoFactory getDefaultFactory() {
        return threadFactory;
    }
    
    protected Kryo createKryo() {
        Kryo kryo = new Kryo();
        kryo.setRegistrationRequired(false);
        kryo.register(Message.class);
        kryo.register(Arrays.asList("").getClass(), new ArraysAsListSerializer());
        kryo.register(GregorianCalendar.class, new GregorianCalendarSerializer());
        kryo.register(InvocationHandler.class, new JdkProxySerializer());
        kryo.register(BigDecimal.class, new DefaultSerializers.BigDecimalSerializer());
        kryo.register(BigInteger.class, new DefaultSerializers.BigIntegerSerializer());
        kryo.register(Pattern.class, new RegexSerializer());
        kryo.register(BitSet.class, new BitSetSerializer());
        kryo.register(URI.class, new URISerializer());
        kryo.register(UUID.class, new UUIDSerializer());
        UnmodifiableCollectionsSerializer.registerSerializers(kryo);
        SynchronizedCollectionsSerializer.registerSerializers(kryo);
        kryo.register(HashMap.class);
        kryo.register(ArrayList.class);
        kryo.register(LinkedList.class);
        kryo.register(HashSet.class);
        kryo.register(TreeSet.class);
        kryo.register(Hashtable.class);
        kryo.register(Date.class);
        kryo.register(Calendar.class);
        kryo.register(ConcurrentHashMap.class);
        kryo.register(SimpleDateFormat.class);
        kryo.register(GregorianCalendar.class);
        kryo.register(Vector.class);
        kryo.register(BitSet.class);
        kryo.register(StringBuffer.class);
        kryo.register(StringBuilder.class);
        kryo.register(Object.class);
        kryo.register(Object[].class);
        kryo.register(String[].class);
        kryo.register(byte[].class);
        kryo.register(char[].class);
        kryo.register(int[].class);
        kryo.register(float[].class);
        kryo.register(double[].class);
        return kryo;
    }
    
}

kryo在序列化对象时,首先会序列化其类的全限定名,由于我们通常序列化的对象都是有限范围内的类的实例,这样重复序列化同样的类的全限定名是低效的。通过注册kryo可以将类的全限定名抽象为一个数字,即用一个数字代表全限定名,这样就要高效一些。kryo.register()方法就是将需要序列化的类提前进行注册。

2.创建一个ThreadLocalKryoFactory继承KryoFactory,用来为每个线程创建一个Kryo对象,原因是由于Kryo 不是线程安全的。每个线程都应该有自己的 Kryo,Input 和 Output 实例。此外, bytes[] Input 可能被修改,然后在反序列化期间回到初始状态,因此不应该在多线程中并发使用相同的 bytes[]。

Kryo 实例的创建/初始化是相当昂贵的,所以在多线程的情况下,您应该线程池化 Kryo 实例。简单的解决方案是使用 ThreadLocal 将 Kryo实例绑定到 Threads。

import com.esotericsoftware.kryo.Kryo;

public class ThreadLocalKryoFactory extends KryoFactory {

    private final ThreadLocal<Kryo> holder  = new ThreadLocal<Kryo>() {
        @Override
        protected Kryo initialValue() {
            return createKryo();
        }
    };
    
    public Kryo getKryo() {
        return holder.get();
    }
    
}

3.创建一个序列化的工具类KryoSerializer

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
/**
 * Kryo序列化
 * @author yinjihuan
 *
 */
public class KryoSerializer {

    private static final ThreadLocalKryoFactory factory = new ThreadLocalKryoFactory();
    
    public static void serialize(Object object, ByteBuf out) {
        Kryo kryo = factory.getKryo();
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        Output output = new Output(baos);
        kryo.writeClassAndObject(output, object);
        output.flush();
        output.close();
        byte[] b = baos.toByteArray();
        try {
            baos.flush();
            baos.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
        out.writeBytes(b);
    }
    
    public static Object deserialize(ByteBuf out) {
        if (out == null) {
            return null;
        }
        Input input = new Input(new ByteBufInputStream(out));
        Kryo kryo = factory.getKryo();
        return kryo.readClassAndObject(input);
    }
    
}

4.创建Netty编码器KryoEncoder对数据进行Kryo序列化

import com.netty.im.core.serialize.kryo.KryoSerializer;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

public class KryoEncoder extends MessageToByteEncoder<Message> {

    @Override
    protected void encode(ChannelHandlerContext ctx, Message message, ByteBuf out) throws Exception {
        KryoSerializer.serialize(message, out);
        ctx.flush();
    }
    
}

5.创建Netty解码器KryoDecoder对数据进行Kryo反序列化

import java.util.List;
import com.netty.im.core.serialize.kryo.KryoSerializer;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

public class KryoDecoder extends ByteToMessageDecoder {

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        Object obj = KryoSerializer.deserialize(in);
        out.add(obj);
    }
    
}

6.将Netty服务端和客户端的编解码器都改成Kryo的编解码器即可

ch.pipeline().addLast("decoder", new KryoDecoder());
ch.pipeline().addLast("encoder", new KryoEncoder());

通过上面的步骤我们就在Netty中集成Kryo进行数据的编码传输,替换了上篇文章实现的JDK序列化方式,提高了数据传输的性能。

源码参考:https://github.com/yinjihuan/netty-im

原文发布于微信公众号 - 猿天地(cxytiandi)

原文发表时间:2018-03-05

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏岑玉海

Spark编程指南

1、在maven里面添加引用,spark和hdfs的客户端的。 groupId = org.apache.spark artifactId = spark-co...

38190
来自专栏王小雷

Spark学习之编程进阶——累加器与广播(5)

Spark学习之编程进阶——累加器与广播(5) 1. Spark中两种类型的共享变量:累加器(accumulator)与广播变量(broadcast varia...

22890
来自专栏一名叫大蕉的程序员

Spark你一定学得会(一)No.7

我是小蕉。 上一篇大家说没有干货,妈蛋回南天哪来的干货你告诉我!!!还好这几天天气还不错,干货来了。 首先祭上今天关键代码,要做的事情就是从Hive表中取得年龄...

21550
来自专栏Spark生态圈

[spark] Checkpoint 源码解析

在spark应用程序中,常常会遇到运算量很大经过很复杂的 Transformation才能得到的RDD即Lineage链较长、宽依赖的RDD,此时我们可以考虑将...

24020
来自专栏祝威廉

Spark Streaming 数据清理机制

为啥要了解机制呢?这就好比JVM的垃圾回收,虽然JVM的垃圾回收已经巨牛了,但是依然会遇到很多和它相关的case导致系统运行不正常。

29930
来自专栏牛肉圆粉不加葱

Spark Task 的执行流程③ - 执行 task

创建、分发 Task一文中我们提到 TaskRunner(继承于 Runnable) 对象最终会被提交到 Executor 的线程池中去执行,本文就将对该执行过...

9010
来自专栏Spark学习技巧

必读|spark的重分区及排序

前几天,有人在星球里,问了一个有趣的算子,也即是RepartitionAndSortWithinPartitions。当时浪尖也在星球里讲了一下,整个关于分区排...

16620
来自专栏个人分享

最最简单的~WordCount¬

步骤1:textFile先生成HadoopRDD,然后再通过map操作生成MappedRDD.

11910
来自专栏浪淘沙

SparkStreaming_Kafka_Redis整合

37730
来自专栏行者悟空

Spark RDD的Transformation

11040

扫码关注云+社区

领取腾讯云代金券