一套完整的大数据平台,应该包括如下几个基本的处理过程:数据采集->数据存储->数据处理->数据展现(可视化、报表、监控):
本文将基于这个生命周期,描述一些大数据平台中使用的一些技术,对技术的框架、使用场景进行描述。
一个用户从数据上云到数据展示,可能用到腾讯云大数据的产品的场景大致是这样的:用户首先使用CDP将自己的数据收集起来,对于一些小型用户,他们对于存储和计算能力要求不是非常高的话,他们会选择将从CDP收集来的数据导入到TDF进行存储以及些简单的计算,包括hive查询、Map-Reduce计算等;对于一些大的用户,存储的数据量较大,并且要求有较高的计算性能,这时候用户会选择EMR产品,根据需要我们可以为他部署相应的组件,交付到用户手中一个Hadoop集群,用户可以将CDP收集到的数据直接导入到HDFS,在集群上进行一系列计算,此外我们打通了HDFS与腾讯云存储产品COS,使得用户也可以将存储放在COS上,集群专注于计算,与此同时,EMR集群还具有规模动态可调整这样的优势,用户可以根据其业务的不同阶段调整集群规模,达到节约成本的目的。最后,通过腾讯云大数据可视交互系统RayData,将计算的结果展示给用户。
总结上面的过程,用户会用到的产品就包括CDP、TDF、EMR、COS以及RadData。
在实际的应用场景中,用户手上可能会有许多实时的日志数据或者离线的文件、数据表等信息,为了解决用户本地的存储压力,他们会选择将数据上云,利用云计算提供的大规模存储、高性能计算,为他们节约存储成本,并且高效的计算。
在大数据的应用背景下,数据采集存在的难度主要包括数据源多样且复杂,数据量巨大,数据变化快,在采集数据时要求高可靠的性能,数据的去重以及数据准确性等的问题。因此,一套完整的大数据平台所提供的数据采集器,必须能够面对上面提到的种种压力。
腾讯云这边的数据采集的产品主要是Cloud DataPipeline(CDP),这个产品所使用到的主要的开源组件有flume和kafka,cdp整体架构描述如图:
测试思路:前台创建kafka的topic,以及nifi的integrator(创建时,需要指定TDF中的表),将topic名和integrator名写到flume svr的配置中,启动flume svr监听配置文件中指定的端口号,启动flume client向flume svr对应的端口发送数据。
1、前台创建project、topic、integrator:
新建topic的时候需要指定表结构:
新建Integrator的时候,需要指定落地到TDF具体哪个位置:
创建好了,启动Integrator:
2、修改Flume svr的配置文件:
目录/data/apache-flume-1.7.0-bin/conf,这里面有若干个配置文件,可以复制一个然后修改成自己的名字,启动Flume svr的时候,就可以指定以这个配置文件来启动。来看一下配置文件中需要指定哪些东西:
3、启动Flume svr:
bin/flume-ng agent -c conf/ -f conf/f1.conf -Dflume.root.logger=debug,console -n agent-1
4、发送数据:
./bin/flume-ng avro-client -c conf/ -H 10.104.126.127 -p 41415 -F files/file_26000 -Dflume.root.logger=DEBUG,console
5、检查:
到TDF的表中查询是否正确导入
1、Flume:是基于JRuby构建的,运行环境依赖于Java,基本架构:
通过一些Agent,在源和目的之间建立通道。每个Agent包含三个组件:
Flume的客户端主要有:Avro、Log4J、syslog、HTTP Post以及本地进程,Flume也为用户提供SDK,当上面的客户端不能满足用户需求时,用户可以自定义客户端。
2、Kafka:是一个机基于zk的消息中间件,主要的特性包括分布式、分区、多副本、多订阅者。Kafka的总体架构如图:
Broker:消息中间件的处理节点,真正处理的逻辑放在Broker,多个Broker形成一个Kafka集群;
ZK:用来管理集群配置,选主节点,平衡Consumer;
Producer:Push模式将消息推给Broker;
Consumer:Pull模式将消息从Broker中拉回来;
Topic:要传递的消息,有由Kafka集群负责分发;
Partition:topic上的物理分组,一个topic由若干个partition组成,每个partition内部是一个有序的队列;
Segment:partition中的物理分组;
Offset:每个partition由一系列有序的、不可变的消息组成,消息来了之后就放到partition后面,partition中的消息的编号就是offset,它是一串连续的序号。
Kafka通过事务机制保证数据的一致性。
大数据时代很显著的特征就是数据的来源越来越多样化。大数据时代之前,产生的数据通常是结构化的,使用传统的关系型数据库就可以解决数据存储的问题;而现在,移动互联网的发展,产生大量非结构化的数据,图片、视频、文档、XML等等,这些数据的存储的传统的关系型数据库不能解决的。NoSQL、MongoDB、iBase等非结构化的数据库,包括MySQL5.7版本,也越来越能支持非结构化数据的存储。
分布式存储,使得大量的数据可以存储在不同的网络节点,降低网络压力,在大数据时代也是不得不使用的存储技术。Hadoop系统,则是一套能够高速运算和存储的软件框架。
我们这里就介绍几个hadoop家族的存储。
Hadoop实现了一个分布式的文件系统,HDFS。HDFS的架构图,这张图可以说是相当经典了:
NameNode:管理数据块映射、处理客户端的读写请求、配置副本策略、管理HDFS的命名空间;
Secondary NameNode:Name Node的冷备(1个小时);
Data Node:负责存储数据块block,执行数据块的读写操作。
HDFS提供了一些可以操作的命令,可以对HDFS进行一些基本操作:
登录到我们的一个集群上,查看hdfs上存储的数据:
Hive是基于hadoop的分布式的数据仓库,可以将SQL语句转化成Map-Reduce的任务,实现快速的查询功能。
hive的操作与操作关系型数据库十分相似,但不同的是,hive使用的文件系统是hdfs,而关系数据库使用的本地文件系统,hive的计算模型是Map-Reduce,当然hive没有办法处理的是那种实时的场景,只能对离线数据进行统计分析、数据挖掘。
集群中安装了hive组建后,可以通过hive命令直接进入hive命令行,然后做一些查询操作:
HBase是一种分布式、面向列的存储系统,是一种key-value型数据库。
数据模型:
Row key:是用来检索记录的主键。
Timestamp:时间戳,记录数据的版本号。
Column Family:水平方向有一个或多个列簇,列簇又有多个列组成
HBase一个很重要的特性是空值不会被保存,也就是说,在应用层设计表结构的时候,可以不用考虑设计的非常紧凑来节省存储空间。
1、表中所有行都按照row key的字典序排列;
2、随着表的不断增加在行的方向上分割为多个Region;
3、Region按大小分割的,每个表开始只有一个region,随着数据增多,region不断增大,当增大到一个阀值的时候,region就会等分会两个新的region,之后会有越来越多的region;
4、Region是Hbase中分布式存储和负载均衡的最小单元,不同Region分布到不同RegionServer上。
5、Region虽然是分布式存储的最小单元,但并不是存储的最小单元。Region由一个或者多个Store组成,每个store保存一个columns family;每个Strore又由一个memStore和0至多个StoreFile组成,StoreFile包含HFile;memStore存储在内存中,StoreFile存储在HDFS上。
Cos是腾讯云的对象存储产品,既然是对象也就是支持非结构化的存储。腾讯云的大数据产品EMR,通过打通COS与HDFS实现数据与计算相分离,打破传统大数据套件的一些局限。举个栗子,比如计算组件spark需要做版本升级,由于传统大数据套件计算与存储混部,所以必须要重新拉起一个集群,部署新版本的spark,然后再把数据迁过来;打通了cos与HDFS,就可以将数据存在cos,需要的时候载入HDFS参与计算就行。
对cos的访问也非常方便,可以通过url方式访问到存储的对象。比如:cosn://emrtest/data/hive
关于COS的架构,这里不做介绍了。
Hadoop生态圈中,有许多关于计算的组件,这些组件的数据源往往都是存储在HDFS上的,通过腾讯云的EMR(弹性Map-Reduce)集群提供的存储能力和计算能力,用户根据自己业务的需求,自定义程序实现对数据的计算。
本文将介绍一些用于大数据计算的组件的基本架构,然后看一下测试组件是否正确工作的命令。
1、Map-Reduce:是一种大规模计算的编程模型,基本思想是分而治之:
用户的业务逻辑分别写在Mapper和Reducer中,通过继承Mapper和Reducer类,并重写map()和reduce()方法。
大致描述一下Map-Reduce计算的流程,客户端提交job,JobTrack做一些检查和初始化工作,生成一个调度队列;然后获取输入的分片信息并创建map任务;jobtracker侦测tasktracker的状态和进度,分配map任务的执行者;建map的输出转化为reduce输入,这个过程称为shuffle,将map的输出做内存缓存,缓冲区超过内存一定比例,换出内存;最终执行reduce任务,输出结果文件到HDFS上。
Map-Reduce的测试脚本:
/usr/local/service/auto_test/mr_test.sh (安装我们自己的镜像,就会有这个路径)
执行这个脚本,并查看日志:/usr/local/service/auto_test/log/mr.log
2、Spark
由于Map-Reduce在计算处理的实时性等的一些局限,Spark提出了基于内存的计算模型。Spark的核心是RDD(Resilient Distributed Datasets,弹性分布式数据集),是分布式数据的逻辑抽象,物理数据存储在不同节点,但对用户是透明的。运行的流程如下:
首先构建spark应用运行的环境,启动spark context,由context向资源管理器申请执行器资源并启动StandaloneExecutorBackend,执行器向Context申请Task,Context构建DAG,并将DAG分解成Stage,发送给Task Scheduler,由Task Scheduler将task分配给执行器。Task运行完,释放所有资源。
Spark的测试脚本,通过spark-submit指令提交一个spark任务,分别覆盖jar包和py文件两种类型的源代码,指定几种不同类型的参数:
--master:指定主节点的位置,
--deploy-mode:driver运行的位置,client就是运行在本地,会在本地打印日志;cluster表示运行在集群上,运行信息会打印在spark日志中;
--class:主类类名,含包名。
运行日志:
3、Strom
Storm是一个分布式的实时计算系统,其计算的总体架构也是采用主从的方式,大概长成这个样子:
Nimbus:计算的主节点,用来管理资源分配和任务调度;
Zk:协调器,记录nimbus和supervisor的进程状态;
Supervisor:接收nimbus分配的任务,启动和停止自己管理的worker进程;
Worker:运行具体的计算处理逻辑;
Task:worker中运行的spout/bolt线程称为一个worker。
客户端(可以是Kafka、RockerMq等消息中间件)每次向nimbus提交一个topology(相当于mr中的一个job),一个topology必须同时包含至少一个Spout和一个Bolt,Spout组件通过不停地调用nextTuple()方法,向Bolt中输入数据,源源不断的数据形成了一个Stream。Bolt收到消息后调用处理函数进行计算。拓扑结构中的每一个Tuple都会被确保被完整的送到拓扑结构的最后一个节点,保证消息处理的可靠性。
关于消息怎么分配,Storm提供6中消息的分组方式,通过不同的方式可以指定如何分发处理消息。
另外,Storm有一套容错的的机制。Nimbus可以通过心跳方式获取worker状态,一旦worker失效,nimbus重新分配任务至其他节点;集群中的节点失效时,有一个超时机制,nimbus可以感知,然后进行重新分配;Nimbus和Supervisor都被设计成快速失败(遇到未知错误,自我失败)和无状态(状态信息保存在zk或磁盘上),一旦nimbus或者supervisor失败,可以立刻启动恢复,工作进程worker也不会受到失败的影响继续执行。
我们的测试脚本,使用示例jar包进行wordcount计算:
查看运行的日志:
4、Presto
Presto是一个分布式查询引擎,能够更加高效的处理执行SQL语句,其基本框架长这样:
Client:通过HTTP请求向Coordinator发送要执行的SQL语句;
Discovery:注册中心,Worker向注册中心注册服务;
Coordinator:接收并解析SQL语句,通过Connector Plugin读取存储的元数据,根据元数据生成生成一个查询计划,计划的生成是基于一个有向无环图(DAG),采用流水线的方式将SQL任务转换成多个stage,根据SQL将任务分发给各个Worker;
Worker:接收SQL任务,节点内部是基于内存的流水线计算,各节点并行读取存储的原始数据,读取HDFS文件数据的方式就是调用的HDFS InputSplit API,然后每个InputSplit分配一个Worker节点去执行。(这里会在本地进行一次优化,会选择与split用一个host的worker进行计算,减少网络成本)
对Presto组件的测试:
presto --server localhost:9000 --catalog hive --schema patrick_hivetest --user hadoop --execute "select * from patrick_hivetest;"
这一条命令做指定了服务端、用户、要操作的表、执行的SQL语句,查询的结果就是表中的数据:
5、Flink
Flink是一个针对流数据和批量数据的分布式处理引擎,它会把任务当做流来处理。基本架构图:
Client将任务提交给Job Manager,由Job Manager将任务俸给Task Manager 去执行,Task Manager会以心跳的方式汇报状态。Task Manager之间的以流的方式交换数据。
Flink集群中,计算资源叫做Task Slot,Job Manager以Slot为单位调度任务,每个Task Manager有一个或多个Slots。
Flink也有多种部署方式,在我们的EMR产品中,flink是部署在yarn集群中的,我们可以通过yarn来启动Job Manager和Task Mananger。
测试脚本:
Step1:-m执行JobManager地址,-yn指定YARN容器分配的数量,-yjm指定JobManager分配的内存大小,-ytm指定TaskManager容器的内存大小,后面给出要运行的jar包文件(测试脚本中的文件是做单词统计的);
Step2:以yarn-session的方式启动flink,-n指定TaskManager的数量,-d选项表示开始执行分发,启动flink的JobManager和TaskManager后,向flink提交任务。
测试结果文件,wordcount的结果:
6、Sqoop
Sqoop组件是把sql和hadoop连接起来的一个桥梁,名字也是这么由来的。Sqoop在导入数据时设置一个split-by参数,根据这个参数切分数据,然后数据分配到不同的map中,每个map再从数据库中一行一行的取数据写到HDFS中。
测试脚本:
①mysql导入hive:
在hive中建表:
/usr/local/service/sqoop/bin/sqoop create-hive-table --connect jdbc:mysql://$host:3306/test --username root --password $pawd --table sql_test --hive-table patrick_hivetest.sqoop_test --fiel
ds-terminated-by "," --lines-terminated-by "\n"
执行导入:
/usr/local/service/sqoop/bin/sqoop import --connect jdbc:mysql://$host:3306/test --username root --password $pawd --table sql_test --hive-import --hive-table patrick_hivetest.sqoop_test --s
plit-by id --fields-terminated-by ",";
查询目标表中的信息:
hive -e "use patrick_hivetest;select * from sqoop_test;"
②mysql导入hdfs:
执行导入:
/usr/local/service/sqoop/bin/sqoop import --connect jdbc:mysql://$host:3306/test --username root --password $pawd --table sql_test --split-by id --validate;
查看hdfs上的导入文件:
hdfs dfs -cat /user/hadoop/sql_test/*
③hdfs导出到mysql:
/usr/local/service/sqoop/bin/sqoop export --connect jdbc:mysql://$host:3306/test --username root --password $pawd --table export_test --export-dir /user/hadoop/sql_test/ --validate
到mysql中执行自定义查询:
/usr/local/service/sqoop/bin/sqoop eval --connect jdbc:mysql://$host:3306/test --username root --password $pawd --verbose --e "select * from export_test"
④删除mysql中的表,逻辑闭环方便以后测试:
/usr/local/service/sqoop/bin/sqoop eval --connect jdbc:mysql://$host:3306/test --username root --password $pawd --verbose --e "delete from export_test"
通过统计分析得到一系列运算的结果,最终将这些结果以可见的方式描绘出来,实现数据可视化,为商业分析提供价值。
可视化的工具包括有RayData、BI、DataV,通过一些实时建模、渲染,间数据以酷炫狂拽吊炸天的方式展现出来,让你忍不住说一声“Amazing!”
笔者水平有限,望各位老板多多指教。有什么写错的地方,求帮忙指正。然后有啥不懂的也可以下来交流,我能答上来的一定全力以赴;答不上来的,我会查资料搞清楚。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。