在企业级大数据流处理项目中,往往在项目数据源处需要面临实时海量数据的采集。采集数据的性能一般与网络带宽、机器硬件、数据量等因素有直接关系;当其他因素是固定的,这里我们只考虑数据量的话,那么数据量的传输和存储性能是我们首先需要面对和解决的。 由此我们引入了Avro数据序列化框架,来解决数据的传输性能问题。
Avro特点: 1.丰富的数据结构 2.一个紧凑的,快速的,二进制的数据格式 3.一个容器文件,来存储持久化数据 4.远程过程调用(RPC) 5.简单的动态语言集成。 6.Avro模式是使用JSON定义的 。这有助于以已经具有JSON库的语言实现。 JSON是一种轻量级的数据传输格式,对于大数据集,JSON数据会显示力不从心,因为JSON的格式是key:value型,每条记录都要附上key的名字,有的时候,光key消耗的空间甚至会超过value所占空间,这对空间的浪费十分严重,尤其是对大型数据集来说,因为它不仅不够紧凑,还要重复地加上key信息,不仅会造成存储空间上的浪费,更会增加了数据传输的压力,从而给集群增加负担,进而影响整个集群的吞吐量。而采用Avro数据序列化系统可以比较好的解决此问题,因为用Avro序列化后的文件由schema和真实内容组成,schema只是数据的元数据,相当于JSON数据的key信息,schema单独存放在一个JSON文件中,这样一来,数据的元数据只存了一次,相比JSON数据格式的文件,大大缩小了存储容量。从而使得Avro文件可以更加紧凑地组织数据。
官网地址:http://avro.apache.org/docs/current/gettingstartedjava.html
原生类型如下所示: null: 表示没有值 boolean: 表示一个二进制布尔值 int: 表示32位有符号整数 long: 表示64位有符号整数 float: 表示32位的单精度浮点数 double: 表示64位双精度浮点数 bytes: 表示8位的无符号字节序列 string: Unicode 编码的字符序列 总共就这8种原生数据类型,这些原生数据类型均没有明确的属性。
AVRO支持6种复杂类型,分别是:records, enums, arrays, maps, unions,fixed,这里我门着重讲解Recoeds类型。 1.Records Records使用类型名称“record”,并且支持三个必选属性。 type: 必有属性。 name: 必有属性,是一个JSON string,提供了记录的名字。 namespace,也是一个JSON string,用来限定和修饰name属性。 doc: 可选属性,是一个JSON string,为使用这个Schema的用户提供文档。 aliases: 可选属性,是JSON的一个string数组,为这条记录提供别名。 fields: 必选属性,是一个JSON数组,数组中列举了所有的field。 每一个field都是一个JSON对象,并且具有如下属性: (1)name: 必选属性,field的名字,是一个JSON string。例如:
“fields”: [
{“name”: “name”, “type”: “string”},
{“name”: “age”, “type”: [“int”, “null”]},
{“name”: “address”, “type”: [“string”, “null”]}
]
(2)doc: 可选属性,为使用此Schema的用户提供了描述此field的文档。 (3)type: 必选属性,定义Schema的一个JSON对象,或者是命名一条记录定义的JSON string。 (4)default: 可选属性,即field的默认值,当读到缺少这个field的实例时用到。默认值的允许的范围由这个field的Schama的类型决定。 order: 可选属性,指定这个field如何影响record的排序。有效的可选值为“ascending”(默认),“descending"和"ignore” alias: JSON的string数组,为这个field提供别名。
<dependencies>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.8.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<!--maven编译插件-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<!--Avro编译插件-->
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.8.1</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
<configuration>
<!--Avro源文件-->
<sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
<!--Avro编译生成文件-->
<outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
使用JSON为Avro定义schema。schema由基本类型(null,boolean, int, long, float, double, bytes 和string)和复杂类型(record, enum, array, map, union, 和fixed)组成。
定义一个user的schema,开发步骤: 1.新建文件夹目录src/main/avro和/src/main/java 2.在avro目录下新建文件 user.avsc :
{"namespace": "cn.itcast.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "age", "type": ["int", "null"]},
{"name": "address", "type": ["string", "null"]}
]
}
第一种方式:
编译之后,会在工程目录下生成users.avro文件 第二种方式: 在资料目录“第1章\4.资料\avro\jars”,打开cmd,输入下面命令,也可以获取编译的avro文件。 java -jar avro-tools-1.8.1.jar compile schema user.avsc ./ 注意:需要avro编译工具包:avro-tools-1.8.1.jar
avro-tools-1.8.1.jar包下载地址:http://archive.apache.org/dist/avro/
新建源码目录:cn.itcast.demo,测试类命:AvroDemo 开发步骤: 1.新建对象 2.设置数据 3.序列化 4.反序列化
package cn.itcast.demo;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.specific.SpecificDatumWriter;
import java.io.File;
import java.io.IOException;
public class AvroDemo{
public static void main(String[] args) throws IOException {
//1.新建对象
User user1 = new User();
//2.设置数据
user1.setName("小明");
user1.setAddress("上海市");
user1.setAge(20);
/**
* 构造方法添加参数
*/
User user2 = new User("小红", 7, "red");
/**
* Builder方法构建对象
*/
User user3 = User.newBuilder()
.setName("小李")
.setAddress("北京市")
.setAge(20)
.build();
/**
* 3.序列化操作
*/
DatumWriter<User> userDatumWriter = new SpecificDatumWriter<>(User.class);
DataFileWriter<User> dataFileWriter = new DataFileWriter<>(userDatumWriter);
try {
dataFileWriter.create(user1.getSchema(), new File("users.avro"));
dataFileWriter.append(user1);
dataFileWriter.append(user2);
dataFileWriter.append(user3);
dataFileWriter.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
package cn.itcast.demo;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.io.DatumReader;
import org.apache.avro.specific.SpecificDatumReader;
import java.io.File;
import java.io.IOException;
/**
* 4.反序列化
*/
public class DeserialUserTest {
public static void main(String[] args) {
try {
DatumReader<User> userDatumReader = new SpecificDatumReader<User>(User.class);
DataFileReader<User> dataFileReader;
dataFileReader = new DataFileReader<User>(new File("users.avro"), userDatumReader);
User user = null;
while (dataFileReader.hasNext()) {
user = dataFileReader.next(user);
System.out.println(user);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}