Strom序列化机制

  Storm 中的 tuple可以包含任何类型的对象。由于Storm 是一个分布式系统,所以在不同的任务之间传递消息时Storm必须知道怎样序列化、反序列化消息对象。

  Storm 使用 Kryo库对对象进行序列化。Kryo 是一个灵活、快速的序列化库。Storm 默认支持基础类型、string、byte arrays、ArrayList、HashMap、HashSet 以及 Clojure 的集合类型的序列化。如果需要在tuple中使用其他的对象类型,就需要注册一个自定义的序列化器。

原文和作者一起讨论:http://www.cnblogs.com/intsmaze/p/7044042.html

自定义序列化

  TORM使用Kryo来序列化。要实现自定义序列化器,我们需要使用Kryo注册新的序列化器。添加自定义序列化器是通过拓扑配置的topology.kryo.register属性完成的。它需要一个注册的列表,每个注册项可以采取两种形式:

1:类名注册,在这种情况下,Storm将使用Kryo的FieldsSerializer来序列化该类。这可能是也可能不是该类最好的选择,更多的细节可以查看Kryo文档。

2:实现了com.esotericsoftware.kryo.Serializer接口的类名注册的映射。

Storm为拓扑配置里的注册序列化提供了帮助。Config类中有一个名为registerSerialization的方法,可以把注册添加到配置中。

public void registerSerialization(Class klass);
public void registerSerialization(Class klass, Class<? extends Serializer> serializerClass);

java序列化

  一个拓扑中不同的任务传递消息时Storm发现了一个没有注册序列化器的类型,它会使用 Java 序列化器来代替,如果这个对象无法被Java序列化器序列化,Storm 就会抛出异常。

  注意,Java 自身的序列化机制非常耗费资源,而且不管在 CPU 的性能上还是在序列化对象的大小上都没有优势。强烈建议读者在生产环境中运行topology 的时候注册一个自定义的序列化器。

  可以通过将 Config.TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION 配置为 false 的方式来将禁止序列化器回退到Java的序列化机制。

Config.setFallBackOnJavaSerialization(conf,false);这个时候如果storm使用java序列化就会抛出异常告诉开发人员去注册一个kryo序列化。

实现storm序列化

创建传输的对象。

package cn.intsmaze.serializable.bean;
public class Person {
  private int age;
  private Studnet studnet;
  private ArrayList arrayList=new ArrayList();
  private LinkedList linkedList=new LinkedList();
  public Person() {
  }
  public Person(int age,Studnet s) {
    this.age = age;
    this.studnet=s;
    arrayList.add("ArrayList中的"+s.getName());
    linkedList.add("linkedList中的"+s.getName());
  }
  @Override
  public String toString() {
    return "Person [age=" + age + ", studnet=" + studnet + ", arrayList="
    + arrayList + ", linkedList=" + linkedList + "]";
  }
    get(),set()......
}

package cn.intsmaze.serializable.bean;
public class Studnet {
  private String name;
  public Studnet() {
  }
  public Studnet(String name) {
    this.name = name;
  }
  @Override
  public String toString() {
    return "Studnet [name=" + name + "]";
  } 
  get(),set()......
}

spout和bolt的实现,spout每次会创建一个person对象将该对象发送到bolt,bolt类接收到该对象将该对象打印出来。

package cn.intsmaze.serializable;
import cn.intsmaze.serializable.bean.Person;
import cn.intsmaze.serializable.bean.Studnet;
public class SpoutBean extends BaseRichSpout {
  SpoutOutputCollector collector;
  public void open(Map conf, TopologyContext context,
    SpoutOutputCollector collector) {
    this.collector = collector;
  }
  public void nextTuple() {
    Studnet s=new Studnet("xiaoxi");
    collector.emit(new Values(new Person(100,s)));
    Utils.sleep(500);
  }
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("person"));
  }
} 
 
package cn.intsmaze.serializable;
import cn.intsmaze.serializable.bean.Person;
public class BoltBean extends BaseBasicBolt {
  public void prepare(Map stormConf, TopologyContext context) {
    super.prepare(stormConf, context);
  }
  public void execute(Tuple input, BasicOutputCollector collector) {
    Person person = (Person)input.getValueByField("person");
    System.out.println("接收到spout节点传来的数据:"+person);
  }
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
  }
}

场景一:

使用public void registerSerialization(Class klass);

package cn.intsmaze.serializable;
import java.util.LinkedList;
import cn.intsmaze.serializable.bean.Person;
import cn.intsmaze.serializable.bean.Studnet;
public class TopologyBean {
public static void main(String[] args) throws Exception {
    TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout("spout", new SpoutBean(), 1);
    builder.setBolt("bolt", new BoltBean(), 1).shuffleGrouping("spout");
    Config conf = new Config();
    conf.registerSerialization(Person.class);
    conf.registerSerialization(Studnet.class);
    //注释掉后,但Studnet没实现java序列化,则会报错。有两种方法,一种注册该类,一种实现java序列化。
    conf.registerSerialization(LinkedList.class);
    //这里如果注释掉,则会使用java序列化方式,如果我们取消掉禁止使用java序列化方法,则会提示注册LinkedList类报错。
    conf.setNumWorkers(2);
     // Config.setFallBackOnJavaSerialization(conf, false);//禁止使用java语言自己的序列化
    StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
   }
}

  第11行,我们注册person类使用Kryo序列化,person对象除了有基本类型int字段外,还有arraylist,linkedlist类型以及自定义的student类型。arraylist类型是storm默认已经提供了支持。

  这里如果我们不对linkedlist类型和自定义类型student进行注册则该拓扑在运行时则会报无法序列化student类型异常。

这个时候有两种办法解决:

  一种就是使student实现java的public class Studnet implements Serializable接口,则该拓扑会成功运行。因为storm如果发现传输的对象如果没有注册为Kryo,则就会使用java的序列化对象,而linkedlist默认已经实现了该接口,所以才会出现前面报student对象无法序列化,然后使得student实现java的序列化接口即可。

  第二种方案就是,我们对student类进行注册conf.registerSerialization(Studnet.class);。

虽然linkedlist不注册,会默认使用java的序列化,但是出于效率的考虑,我们将其注册为Kryo。

  提示:因为有些集合类型,storm没有提供序列化支持,但是实现了java序列化接口,所以如果我们不加以控制,会使用java序列化而拖累整个系统。所以推荐使用

Config.setFallBackOnJavaSerialization(conf, false);禁止使用java语言自己的序列化来可以在本地模式时及时发现报错信息,将问题尽早解决。

场景二:

  我们使用kryo序列化,但是有时候我们并不希望传输对象的所有字段,而只是传输对象的某些字段,从而进一步提高消息的传递速率,这个时候我们可以使用kryo的自定义序列化机制来指定传输的值。

package cn.intsmaze.serializable.bean;
import java.util.ArrayList;
import java.util.LinkedList;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
public class PersonSerializable extends Serializer<Person>{
  @Override
  public Person read(Kryo kryo, Input input, Class<Person> arg2) {
    System.out.println("序列化");
    Person person=new Person();
    person.setAge(input.readInt());
    person.setArrayList(kryo.readObject(input, ArrayList.class));
    person.setLinkedList(kryo.readObject(input, LinkedList.class));//该类型,storm默认不支持,所以要在topology中注册该类型,如果不注册,则会使用java序列化。 person.setStudnet(kryo.readObject(input, Studnet.class));//该类型,storm默认不支持,所以要在topology中注册该类型,如果不注册,且java序列化没有实现,则会报错。
    return person;
  }
  @Override
  public void write(Kryo kryo, Output output, Person person) {
    System.out.println("反序列化");
    output.writeInt(person.getAge());
    kryo.writeObject(output, person.getArrayList());
    kryo.writeObject(output, person.getLinkedList());
    kryo.writeObject(output, person.getStudnet());
  }
}
package cn.intsmaze.serializable;
import java.util.LinkedList;
import cn.intsmaze.serializable.bean.Person;
import cn.intsmaze.serializable.bean.PersonSerializable;
import cn.intsmaze.serializable.bean.Studnet;
public class TopologyBean {
  public static void main(String[] args) throws Exception {
      TopologyBuilder builder = new TopologyBuilder();
      builder.setSpout("spout", new SpoutBean(), 1);
      builder.setBolt("bolt", new BoltBean(), 1).shuffleGrouping("spout");
      Config conf = new Config();
      conf.registerSerialization(Studnet.class);
      conf.registerSerialization(LinkedList.class);
      conf.registerSerialization(Person.class, PersonSerializable.class);
      conf.setNumWorkers(2);     
      StormSubmitter.submitTopologyWithProgressBar(args[0], conf,
      builder.createTopology());
    }
}

  因为PersonSerializable类中指定了要传输person对象的int,studne,ArrayList,LinkedList 类型。

如果我们注释掉第12行 conf.registerSerialization(Studnet.class);且Studnet类没有实现java的序列化,则拓扑的任务间传递消息进行序列化时就会报无法序列化该类的错误,感兴趣的同学可以试试注释掉该行,看看storm会报什么异常。

第13行,我们必须注册对LinkedList序列化,storm默认支持了对ArrayList类的序列化,但没有提供对LinkedList序列化,需要我们手动注册,如果不注册,因为LinkedList实现了java的序列化接口,所以会使用java序列化,则不会报错。

  强烈建议,在开发中就算注册了kyro序列化方式,也要设置该conf.setFallBackOnJavaSerialization(false)方法来禁止使用java序列化方式,因为实际开发中,核心架构搭建好了,会让团队成员直接在现成架构上编写,他们不需要了解storm的一些机制,但是这也带来问题,一种场景就是,开发人员对传输对象增加了一个LinkedList字段,但是他没有注册序列化类,storm就会对LinkedList使用java序列化,就会拖累系统的性能,所以在架构的时候,通过设置禁止java序列化方法,就可以在测试中及时发现问题所在。

补充:上面的所有一切,在本地运行以及部署到集群时,work数量设置为1时,都不会生效的。因为同一个对象公有一个内存,不会涉及网络传输的,也就不需要序列化和反序列化。

生产场景回顾:

  本人intsmaze生产上遇见的问题:storm工程中对传输对象使用了conf.registerSerialization(Person.class, PersonSerializable.class);方式来指定序列化该对象的某些字段。初级程序员在storm工程上开发时,因为业务需要对传输对象增加了一个字段,但是没有在PersonSerializable中序列化和反序列化该对象。恰巧的时,初级工程师本地模式和准生产测试时,topology的work的数量都为1,导致对象在bolt和bolt节点传输时并没有走序列化方式,结果测试一切正常,但是上生产后,因为work数量是10个,立马在后一个bolt中报大量的空指针异常,造成很严重的生产问题。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏IT笔记

JAVA实现一个简单的RPC+项目源码

论坛中说到聊一聊RPC远程过程调用协议 http://www.52itstyle.com/thread-22564-1-1.html RPC(Remote Pr...

4769
来自专栏nnngu

百度搜索 “Java面试题” 前200页(面试必看)

本文中的题目来源于网上的一篇文章《百度搜索 “Java面试题” 前200页》,但该文章里面只有题目,没有答案。因此,我整理了一些答案发布于本文。本文整理答案的原...

80511
来自专栏屈定‘s Blog

设计模式--动态代理的思考

在一些第三方框架中经常能看到动态代理的案例,尤其是RPC框架,ORM框架等,该篇将分析这些实现的原理,另外延伸在业务中的使用示例.

1323
来自专栏王小雷

MapReduce的过程(2)

MapReduce的编程思想(1) MapReduce的过程(2) 1. MapReduce从输入到输出 一个MapReduce的作业经过了input、map、...

2705
来自专栏Albert陈凯

4.4 共享变量

4.4 共享变量 一般来说,当一个被传递给Spark操作(例如,Map和Reduce)的函数在一个远程集群上运行时,该函数实际上操作的是它用到的所有变量的独立...

33212
来自专栏Create Sun

基础拾遗------特性详解【含常用过滤器实例】

前言  本来7月份想着是用一个月把基础拾遗写完的,结果断断续续写了4个月了,才写了这几篇,这两天又规划着多写几篇,希望能坚持吧。前两天一次和同事聊天提到了特性...

35317
来自专栏JackieZheng

Hadoop阅读笔记(七)——代理模式

  关于Hadoop已经小记了六篇,《Hadoop实战》也已经翻完7章。仔细想想,这么好的一个框架,不能只是流于应用层面,跑跑数据排序、单表链接等,想得其精髓,...

21510
来自专栏Golang语言社区

GO语言标准库概览

在Go语言五周系列教程的最后一部分中,我们将带领大家一起来浏览一下Go语言丰富的标准库。 Go标准库包含了大量包,提供了丰富广泛的功能特性。这里提供了概览仅仅是...

38410
来自专栏编码小白

activiti学习笔记(五) 流程部署

activiti流程部署 activit部署方法api     activiti共有六种部署方式,最终实现都是DeploymentEntityImpl的addR...

6897
来自专栏Hadoop实操

使用JDBC向Kudu表插入中文字符-双引号的秘密

温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看。 1.问题描述 使用Impala JDBC向Kudu表中插入中文字符,插入的中文字符串乱码,中文字...

3817

扫码关注云+社区

领取腾讯云代金券