腾讯云大数据平台的产品组件介绍及测试方法

一套完整的大数据平台,应该包括如下几个基本的处理过程:数据采集->数据存储->数据处理->数据展现(可视化、报表、监控):

本文将基于这个生命周期,描述一些大数据平台中使用的一些技术,对技术的框架、使用场景进行描述。

一个用户从数据上云到数据展示,可能用到腾讯云大数据的产品的场景大致是这样的:用户首先使用CDP将自己的数据收集起来,对于一些小型用户,他们对于存储和计算能力要求不是非常高的话,他们会选择将从CDP收集来的数据导入到TDF进行存储以及些简单的计算,包括hive查询、Map-Reduce计算等;对于一些大的用户,存储的数据量较大,并且要求有较高的计算性能,这时候用户会选择EMR产品,根据需要我们可以为他部署相应的组件,交付到用户手中一个Hadoop集群,用户可以将CDP收集到的数据直接导入到HDFS,在集群上进行一系列计算,此外我们打通了HDFS与腾讯云存储产品COS,使得用户也可以将存储放在COS上,集群专注于计算,与此同时,EMR集群还具有规模动态可调整这样的优势,用户可以根据其业务的不同阶段调整集群规模,达到节约成本的目的。最后,通过腾讯云大数据可视交互系统RayData,将计算的结果展示给用户。

总结上面的过程,用户会用到的产品就包括CDP、TDF、EMR、COS以及RadData。

一、数据采集:

在实际的应用场景中,用户手上可能会有许多实时的日志数据或者离线的文件、数据表等信息,为了解决用户本地的存储压力,他们会选择将数据上云,利用云计算提供的大规模存储、高性能计算,为他们节约存储成本,并且高效的计算。

在大数据的应用背景下,数据采集存在的难度主要包括数据源多样且复杂,数据量巨大,数据变化快,在采集数据时要求高可靠的性能,数据的去重以及数据准确性等的问题。因此,一套完整的大数据平台所提供的数据采集器,必须能够面对上面提到的种种压力。

腾讯云这边的数据采集的产品主要是Cloud DataPipeline(CDP),这个产品所使用到的主要的开源组件有flume和kafka,cdp整体架构描述如图:

测试思路:前台创建kafka的topic,以及nifi的integrator(创建时,需要指定TDF中的表),将topic名和integrator名写到flume svr的配置中,启动flume svr监听配置文件中指定的端口号,启动flume client向flume svr对应的端口发送数据。

测试方法:

1、前台创建project、topic、integrator:

新建topic的时候需要指定表结构:

新建Integrator的时候,需要指定落地到TDF具体哪个位置:

创建好了,启动Integrator:

2、修改Flume svr的配置文件:

目录/data/apache-flume-1.7.0-bin/conf,这里面有若干个配置文件,可以复制一个然后修改成自己的名字,启动Flume svr的时候,就可以指定以这个配置文件来启动。来看一下配置文件中需要指定哪些东西:

3、启动Flume svr:

bin/flume-ng agent -c conf/ -f conf/f1.conf -Dflume.root.logger=debug,console -n agent-1

4、发送数据:

./bin/flume-ng avro-client -c conf/ -H 10.104.126.127 -p 41415 -F files/file_26000 -Dflume.root.logger=DEBUG,console

5、检查:

到TDF的表中查询是否正确导入

开源组件介绍:

1、Flume:是基于JRuby构建的,运行环境依赖于Java,基本架构:

通过一些Agent,在源和目的之间建立通道。每个Agent包含三个组件:

Flume的客户端主要有:Avro、Log4J、syslog、HTTP Post以及本地进程,Flume也为用户提供SDK,当上面的客户端不能满足用户需求时,用户可以自定义客户端。

2、Kafka:是一个机基于zk的消息中间件,主要的特性包括分布式、分区、多副本、多订阅者。Kafka的总体架构如图:

Broker:消息中间件的处理节点,真正处理的逻辑放在Broker,多个Broker形成一个Kafka集群;

ZK:用来管理集群配置,选主节点,平衡Consumer;

Producer:Push模式将消息推给Broker;

Consumer:Pull模式将消息从Broker中拉回来;

Topic:要传递的消息,有由Kafka集群负责分发;

Partition:topic上的物理分组,一个topic由若干个partition组成,每个partition内部是一个有序的队列;

Segment:partition中的物理分组;

Offset:每个partition由一系列有序的、不可变的消息组成,消息来了之后就放到partition后面,partition中的消息的编号就是offset,它是一串连续的序号。

Kafka通过事务机制保证数据的一致性。

二、数据存储

大数据时代很显著的特征就是数据的来源越来越多样化。大数据时代之前,产生的数据通常是结构化的,使用传统的关系型数据库就可以解决数据存储的问题;而现在,移动互联网的发展,产生大量非结构化的数据,图片、视频、文档、XML等等,这些数据的存储的传统的关系型数据库不能解决的。NoSQL、MongoDB、iBase等非结构化的数据库,包括MySQL5.7版本,也越来越能支持非结构化数据的存储。

分布式存储,使得大量的数据可以存储在不同的网络节点,降低网络压力,在大数据时代也是不得不使用的存储技术。Hadoop系统,则是一套能够高速运算和存储的软件框架。

我们这里就介绍几个hadoop家族的存储。

1、HDFS

Hadoop实现了一个分布式的文件系统,HDFS。HDFS的架构图,这张图可以说是相当经典了:

NameNode:管理数据块映射、处理客户端的读写请求、配置副本策略、管理HDFS的命名空间;

Secondary NameNode:Name Node的冷备(1个小时);

Data Node:负责存储数据块block,执行数据块的读写操作。

HDFS提供了一些可以操作的命令,可以对HDFS进行一些基本操作:

登录到我们的一个集群上,查看hdfs上存储的数据:

2、Hive

Hive是基于hadoop的分布式的数据仓库,可以将SQL语句转化成Map-Reduce的任务,实现快速的查询功能。

hive的操作与操作关系型数据库十分相似,但不同的是,hive使用的文件系统是hdfs,而关系数据库使用的本地文件系统,hive的计算模型是Map-Reduce,当然hive没有办法处理的是那种实时的场景,只能对离线数据进行统计分析、数据挖掘。

集群中安装了hive组建后,可以通过hive命令直接进入hive命令行,然后做一些查询操作:

3、HBase

HBase是一种分布式、面向列的存储系统,是一种key-value型数据库。

数据模型:

Row key:是用来检索记录的主键。

Timestamp:时间戳,记录数据的版本号。

Column Family:水平方向有一个或多个列簇,列簇又有多个列组成

物理模型:

HBase一个很重要的特性是空值不会被保存,也就是说,在应用层设计表结构的时候,可以不用考虑设计的非常紧凑来节省存储空间。

1、表中所有行都按照row key的字典序排列;

2、随着表的不断增加在行的方向上分割为多个Region;

3、Region按大小分割的,每个表开始只有一个region,随着数据增多,region不断增大,当增大到一个阀值的时候,region就会等分会两个新的region,之后会有越来越多的region;

4、Region是Hbase中分布式存储和负载均衡的最小单元,不同Region分布到不同RegionServer上。

5、Region虽然是分布式存储的最小单元,但并不是存储的最小单元。Region由一个或者多个Store组成,每个store保存一个columns family;每个Strore又由一个memStore和0至多个StoreFile组成,StoreFile包含HFile;memStore存储在内存中,StoreFile存储在HDFS上。

4、COS

Cos是腾讯云的对象存储产品,既然是对象也就是支持非结构化的存储。腾讯云的大数据产品EMR,通过打通COS与HDFS实现数据与计算相分离,打破传统大数据套件的一些局限。举个栗子,比如计算组件spark需要做版本升级,由于传统大数据套件计算与存储混部,所以必须要重新拉起一个集群,部署新版本的spark,然后再把数据迁过来;打通了cos与HDFS,就可以将数据存在cos,需要的时候载入HDFS参与计算就行。

对cos的访问也非常方便,可以通过url方式访问到存储的对象。比如:cosn://emrtest/data/hive

关于COS的架构,这里不做介绍了。

三、数据计算

Hadoop生态圈中,有许多关于计算的组件,这些组件的数据源往往都是存储在HDFS上的,通过腾讯云的EMR(弹性Map-Reduce)集群提供的存储能力和计算能力,用户根据自己业务的需求,自定义程序实现对数据的计算。

本文将介绍一些用于大数据计算的组件的基本架构,然后看一下测试组件是否正确工作的命令。

1、Map-Reduce:是一种大规模计算的编程模型,基本思想是分而治之:

用户的业务逻辑分别写在Mapper和Reducer中,通过继承Mapper和Reducer类,并重写map()和reduce()方法。

大致描述一下Map-Reduce计算的流程,客户端提交job,JobTrack做一些检查和初始化工作,生成一个调度队列;然后获取输入的分片信息并创建map任务;jobtracker侦测tasktracker的状态和进度,分配map任务的执行者;建map的输出转化为reduce输入,这个过程称为shuffle,将map的输出做内存缓存,缓冲区超过内存一定比例,换出内存;最终执行reduce任务,输出结果文件到HDFS上。

Map-Reduce的测试脚本:

/usr/local/service/auto_test/mr_test.sh (安装我们自己的镜像,就会有这个路径)

执行这个脚本,并查看日志:/usr/local/service/auto_test/log/mr.log

2、Spark

由于Map-Reduce在计算处理的实时性等的一些局限,Spark提出了基于内存的计算模型。Spark的核心是RDD(Resilient Distributed Datasets,弹性分布式数据集),是分布式数据的逻辑抽象,物理数据存储在不同节点,但对用户是透明的。运行的流程如下:

首先构建spark应用运行的环境,启动spark context,由context向资源管理器申请执行器资源并启动StandaloneExecutorBackend,执行器向Context申请Task,Context构建DAG,并将DAG分解成Stage,发送给Task Scheduler,由Task Scheduler将task分配给执行器。Task运行完,释放所有资源。

Spark的测试脚本,通过spark-submit指令提交一个spark任务,分别覆盖jar包和py文件两种类型的源代码,指定几种不同类型的参数:

--master:指定主节点的位置,

--deploy-mode:driver运行的位置,client就是运行在本地,会在本地打印日志;cluster表示运行在集群上,运行信息会打印在spark日志中;

--class:主类类名,含包名。

运行日志:

3、Strom

Storm是一个分布式的实时计算系统,其计算的总体架构也是采用主从的方式,大概长成这个样子:

Nimbus:计算的主节点,用来管理资源分配和任务调度;

Zk:协调器,记录nimbus和supervisor的进程状态;

Supervisor:接收nimbus分配的任务,启动和停止自己管理的worker进程;

Worker:运行具体的计算处理逻辑;

Task:worker中运行的spout/bolt线程称为一个worker。

客户端(可以是Kafka、RockerMq等消息中间件)每次向nimbus提交一个topology(相当于mr中的一个job),一个topology必须同时包含至少一个Spout和一个Bolt,Spout组件通过不停地调用nextTuple()方法,向Bolt中输入数据,源源不断的数据形成了一个Stream。Bolt收到消息后调用处理函数进行计算。拓扑结构中的每一个Tuple都会被确保被完整的送到拓扑结构的最后一个节点,保证消息处理的可靠性。

关于消息怎么分配,Storm提供6中消息的分组方式,通过不同的方式可以指定如何分发处理消息。

另外,Storm有一套容错的的机制。Nimbus可以通过心跳方式获取worker状态,一旦worker失效,nimbus重新分配任务至其他节点;集群中的节点失效时,有一个超时机制,nimbus可以感知,然后进行重新分配;Nimbus和Supervisor都被设计成快速失败(遇到未知错误,自我失败)和无状态(状态信息保存在zk或磁盘上),一旦nimbus或者supervisor失败,可以立刻启动恢复,工作进程worker也不会受到失败的影响继续执行。

我们的测试脚本,使用示例jar包进行wordcount计算:

查看运行的日志:

4、Presto

Presto是一个分布式查询引擎,能够更加高效的处理执行SQL语句,其基本框架长这样:

Client:通过HTTP请求向Coordinator发送要执行的SQL语句;

Discovery:注册中心,Worker向注册中心注册服务;

Coordinator:接收并解析SQL语句,通过Connector Plugin读取存储的元数据,根据元数据生成生成一个查询计划,计划的生成是基于一个有向无环图(DAG),采用流水线的方式将SQL任务转换成多个stage,根据SQL将任务分发给各个Worker;

Worker:接收SQL任务,节点内部是基于内存的流水线计算,各节点并行读取存储的原始数据,读取HDFS文件数据的方式就是调用的HDFS InputSplit API,然后每个InputSplit分配一个Worker节点去执行。(这里会在本地进行一次优化,会选择与split用一个host的worker进行计算,减少网络成本)

对Presto组件的测试:

presto --server localhost:9000 --catalog hive --schema patrick_hivetest --user hadoop --execute "select * from patrick_hivetest;"

这一条命令做指定了服务端、用户、要操作的表、执行的SQL语句,查询的结果就是表中的数据:

5、Flink

Flink是一个针对流数据和批量数据的分布式处理引擎,它会把任务当做流来处理。基本架构图:

Client将任务提交给Job Manager,由Job Manager将任务俸给Task Manager 去执行,Task Manager会以心跳的方式汇报状态。Task Manager之间的以流的方式交换数据。

Flink集群中,计算资源叫做Task Slot,Job Manager以Slot为单位调度任务,每个Task Manager有一个或多个Slots。

Flink也有多种部署方式,在我们的EMR产品中,flink是部署在yarn集群中的,我们可以通过yarn来启动Job Manager和Task Mananger。

测试脚本:

Step1:-m执行JobManager地址,-yn指定YARN容器分配的数量,-yjm指定JobManager分配的内存大小,-ytm指定TaskManager容器的内存大小,后面给出要运行的jar包文件(测试脚本中的文件是做单词统计的);

Step2:以yarn-session的方式启动flink,-n指定TaskManager的数量,-d选项表示开始执行分发,启动flink的JobManager和TaskManager后,向flink提交任务。

测试结果文件,wordcount的结果:

6、Sqoop

Sqoop组件是把sql和hadoop连接起来的一个桥梁,名字也是这么由来的。Sqoop在导入数据时设置一个split-by参数,根据这个参数切分数据,然后数据分配到不同的map中,每个map再从数据库中一行一行的取数据写到HDFS中。

测试脚本:

①mysql导入hive:

在hive中建表:

/usr/local/service/sqoop/bin/sqoop create-hive-table --connect jdbc:mysql://$host:3306/test --username root --password $pawd --table sql_test --hive-table patrick_hivetest.sqoop_test --fiel

ds-terminated-by ","  --lines-terminated-by "\n"

执行导入:

/usr/local/service/sqoop/bin/sqoop import --connect jdbc:mysql://$host:3306/test --username root --password $pawd --table sql_test --hive-import --hive-table patrick_hivetest.sqoop_test --s

plit-by id --fields-terminated-by ",";

查询目标表中的信息:

hive -e "use patrick_hivetest;select * from sqoop_test;"

②mysql导入hdfs:

执行导入:

/usr/local/service/sqoop/bin/sqoop import --connect jdbc:mysql://$host:3306/test --username root --password $pawd --table sql_test --split-by id --validate;

查看hdfs上的导入文件:

hdfs dfs -cat /user/hadoop/sql_test/*

③hdfs导出到mysql:

/usr/local/service/sqoop/bin/sqoop export --connect jdbc:mysql://$host:3306/test --username root --password $pawd --table export_test --export-dir /user/hadoop/sql_test/ --validate

到mysql中执行自定义查询:

/usr/local/service/sqoop/bin/sqoop eval --connect jdbc:mysql://$host:3306/test --username root --password $pawd --verbose --e "select * from export_test"

④删除mysql中的表,逻辑闭环方便以后测试:

/usr/local/service/sqoop/bin/sqoop eval --connect jdbc:mysql://$host:3306/test --username root --password $pawd --verbose --e "delete from export_test"

四、数据展示

通过统计分析得到一系列运算的结果,最终将这些结果以可见的方式描绘出来,实现数据可视化,为商业分析提供价值。

可视化的工具包括有RayData、BI、DataV,通过一些实时建模、渲染,间数据以酷炫狂拽吊炸天的方式展现出来,让你忍不住说一声“Amazing!”

笔者水平有限,望各位老板多多指教。有什么写错的地方,求帮忙指正。然后有啥不懂的也可以下来交流,我能答上来的一定全力以赴;答不上来的,我会查资料搞清楚。

原创声明,本文系作者授权云+社区-专栏发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

编辑于

王燚的专栏

1 篇文章1 人订阅

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Hadoop实操

当Impala碰到由Hive生成的timestamp数据

1522
来自专栏JAVA烂猪皮

基于 Docker 的微服务架构实践

基于 Docker 的容器技术是在2015年的时候开始接触的,两年多的时间,作为一名 Docker 的 DevOps,也见证了 Docker 的技术体系的快速发...

692
来自专栏贾老师の博客

Lua 游戏开发学习

1252
来自专栏涤生的博客

服务框架的技术栈

随着业务规模的扩张,为了满足业务对技术的要求,技术架构需要从单体应用架构升级到分布式服务架构,来降低公司的技术成本,更好的适应业务的发展。分布式服务架构的诸多优...

832
来自专栏Java职业技术分享

zookeeper-架构设计与角色分工-《每日五分钟搞定大数据》

zookeeper作为一个分布式协调系统,很多组件都会依赖它,那么此时它的可用性就非常重要了,那么保证可用性的同时作为分布式系统的它是怎么保证扩展性的?问题很多...

730
来自专栏Java架构师历程

6、选择部署策略

本书主要介绍关于如何使用微服务构建应用程序,这是本书的第六章。第一章介绍了微服务架构模式,讨论了使用微服务的优点与缺点。之后的章节讨论了微服务架构的方方面面:使...

1003
来自专栏CSDN技术头条

大数据实时处理实战

随着互联网时代的发展,运营商作为内容传送的管道服务商,在数据领域具有巨大的优势,如何将这些数据转化为价值,越来越被运营商所重视。 运营商的大数据具有体量大,种类...

36910
来自专栏编程

初学Java编程需要知道的几大重点步骤

初学Java编程需要知道的几大重点步骤 ? 一、学习前的准备工作 java语言一般用于大型的服务器程序开发,所有有必要了解如下内容:Unix开发环境Unix系统...

1808
来自专栏java达人

自学Apache Spark博客(节选)

作者:Kumar Chinnakali 译者:java达人 来源:http://dataottam.com/2016/01/10/self-learn-yo...

1979
来自专栏ImportSource

微服务应具备的12个属性

该文翻译自Pivotal公司的 Matt Stine大牛的书籍《Migrating to Cloud Native Application Architectu...

2639

扫码关注云+社区