一、产生背景
MR性能差,资源消耗大,如:Hive作业之间的数据不是直接流动的,而是借助HDFS作为共享数据存储系统,即一个作业将处理好的数据写入HDFS,下一个作业再从HDFS重新读取数据进行处理。很明显更高效的方式是,第一个作业直接将数据传递给下游作业。 MR 默认了map和reduce阶段,map会对中间结果进行分区、排序,reduce会进行合并排序,这一过程并不适用于所有场景。 引擎级别的Runtime优化:MR执行计划在编译时已经确定,无法动态调整(?)。然而在执行ETL和Ad-hoc等任务时,根据实际处理的表大小,动态调整join策略、任务并行度将大大缩短任务执行时间。
二、原理 2.1 DAG https://hortonworks.com/blog/expressing-data-processing-in-apache-tez/
Vertex:定义了用户逻辑(如:map/reduce)与相关的资源与环境 Edge:定义了上下游Vertex之间的连接方式。 Edge相关属性:Data movement:定义了producer与consumer之间数据流动的方式。
One-To-One: 第i个producer产生的数据,发送给第i个consumer。这种上下游关系属于Spark的窄依赖。
Broadcast: producer产生的数据路由都下游所有consumer。这种上下游关系也属于Spark的窄依赖。
Scatter-Gather: producer将产生的数据分块,将第i块数据发送到第i个consumer。这种上下游关系属于Spark的宽依赖。 Scheduling:定义了何时启动consumer Task Sequential: Consumer task 需要producer task结束后启动,如:MR。
Concurrent: Consumer task 与producer task一起启动,如:流计算。
Data source:定义了任务outp的生命周期与可靠性。
Persisted: 当任务退出后,该任务output依然存在,但经过一段时间后,可能会被删除,如:Mapper输出的中间结果。 Persisted-Reliable: 任务output总是存在,比如,MR中reducer的输出结 果,存在HDFS上。
Ephemeral: 任务输出只有当该task在运行的时候,才存在,如:流计算的 中间结果。
一个DAG图中只有两个Vertex,Map Vertex与Reduce Vertex。连接Map Vertex与Reduce Vertex的Edge有以下属性:
Data movement:Scatter-Gather Scheduling:Sequential Data Source:Map Vertex的Data Source为Persisted-Reliable,reduce Vertex 的 Data Source为Persisted 2.2 Runtime API——Input/Processor/Output Task是Tez的最小执行单元,Vertex中task的数量与该vertex的并行度一致。以下是Input、Processor、Output均需要实现的接口:
List<Event> initialize(Tez*Context) -This is where I/P/O receive their corresponding context objects. They can, optionally, return a list of events.
handleEvents(List<Event> events) – Any events generated for the specific I/P/O will be passed in via this interface. Inputs receive DataMovementEvent(s) generated by corresponding Outputs on this interface – and will need to interpret them to retrieve data. At the moment, this can be ignored for Outputs and Processors.
List<Event> close() – Any cleanup or final commits will typically be implemented in the close method. This is generally a good place for Outputs to generate DataMovementEvent(s). More on these events later.
2.3 Runtime优化 任务运行时,程序知晓更多任务相关的信息,通过这些信息,我们可以动态修改修改执行计划,比如:修改mapper或reducer数量,决定何时启动reducer等。在Tez中,不同组件通过不同事件类型,进行通信。
动态修改reducer并行度:MapTask通过VertexManager类型的事件向ShuffleVertextManager发送信息,比如:所处理的partition大小等。ShuffleVertexManager通过所获得的信息,可以估算出所有Task的输出数据大小,最后来调整下游reduce Vertex的并行度,如下图: reducer"慢"启动(预先启动):上游MapTask通过事件不断向ShuffleVertexManager汇报任务完成情况,ShuffleVertexManager通过这些信息,可以判断何时启动下游reduceTask与需要启动的reduceTask数量。 2.4 从逻辑执行计划到物理执行计划 2.5 其他优化措施 Tez Session: 与数据库session相似,在同一个Tez Session中,可串行执行多个Tez Dag。Tez Session避免了AM的多次启动与销毁,在有多个DAG图的Tez作业(HQL任务)中大大减小了任务执行时间。 这也是为什么在Tez-UI中,一个HQL任务,只有一个Application,却有多个DAG(MR中一个HQL任务,有多个Application)。
Tez相关参数:
问题:container的资源兼容?被先后调度到同一个container的多个task所需要的资源,必须与container的资源相互兼容。也就是说,container拥有的资源,如:jar包,Memory,CPU等,需要是task所需资源的“超集”。 怎么调度?进行container复用时,Tez对Task进行调度。Tez会依据:任务本地性、任务所需资源、pending任务的优先级等因素,进行任务调度。 优点:减少作业执行过程中JVM的创建与销毁带来的开销 减小对RM的请求压力 运行在同一container上task之间的数据共享。比如,MapJoin中可以通过共享小表数据的方式,减少资源消耗。 相关参数:
三、优缺点 优点:避免中间数据写回HDFS,减小任务执行时间 vertex management模块使runtime动态修改执行计划变成可能 input/processor/output编程模型,大大提高了任务模型的灵活性 提供container复用机制与Tez Session,减少资源消耗 缺点:出现数据重复问题等数据质量问题 Tez与Hive捆绑,在其他领域应用较少 社区不活跃 四、Hive On Tez: Hive On Tez初始并行度算法
https://cwiki.apache.org/confluence/display/TEZ/How+initial+task+parallelism+works Hive 调节mapper与reducer数量
http://www.openkb.info/2017/05/hive-on-tez-how-to-control-number-of.html Tez Configuration
https://tez.apache.org/releases/0.9.0/tez-api-javadocs/configs/TezConfiguration.html Hive on Tez 配置参数
https://cwiki.apache.org/confluence/display/Hive/Configuration+Properties#ConfigurationProperties-Tez 参考链接:
https://hortonworks.com/blog/apache-tez-a-new-chapter-in-hadoop-data-processing/ http://web.eecs.umich.edu/~mosharaf/Readings/Tez.pdf