实时计算实践:快速分析实时数据的解决方案

在分布式系统中,根据应用的场景选择对应的数据存储方式是非常重要的一件事。这篇文章讨论的是在实时数据不断进入的情况下,如何结合历史数据进行快速分析。

背景

对于实时数据的存储,为了避免在HDFS(最经典的分布式存储系统)下生成大量的小文件(存储和计算都会产生大量的开销),通常的选择是Hbase。但是Hbase本身的设计更适合写入数据,对于读的优势仅仅限于对Rowkey的查询,不适合类似于Group、聚合等分析型查询。为了避免这种情况,整体架构变成了实时数据通过流计算框架(Storm或者是Spark Streaming)导入Hbase,T+1天/小时后将数据导入到HDFS供业务或者运营人员使用。显然这不是我们想要的实时数据快速分析,业务和运营人员需要等待一段时间才能看到数据。

随着技术的发展,TiDBCockroachDB等分布式数据库逐渐成熟,高可用、无限扩展且同时支持分析型和事务型操作场景。使用分布式数据库应该是最理想的存储实时数据,并提供快速分析的方式,但是TiDBCockroachDB等分布式数据库对服务器和开发人员都有不低的要求(例如TiDB就需要安装机器使用SSD盘),所以更适合非常重要的业务数据,而不是类似于埋点日志等实时数据,。

因此我们需要一个存储方案可以在相对廉价的商用机器上支持实时数据的快速分析。Cloudera公司开发的Kudu就是一个很好的选择。Kudu支持单行数据的插入、更新和删除操作,但是使用Kudu用来存储不再发生改变的历史数据,也显得有些昂贵了。对于历史数据的存储,廉价而稳定的HDFS是一个更好的选择。受实时计算中滑动窗口的启发,基于Impala计算引擎兼容KuduHDFS两个存储引擎的特性,参考Kudu最新的Blog设计了后文的方案,扬HDFSKudu之长,避HDFSKudu之短,从而取得一个相对平衡的效果,实现实时数据的快速分析。

滑动窗口模式

滑动窗口类似于下图,随着时间的前进,Kudu会不断建立新的时间分区,不再变化的历史数据导入HDFS存储,再删除Kudu的历史分区。

我们在Impala中,分别建立一张Kudu表和Paruqet格式的HDFS表,两张表使用同样的时间分区格式,时间分区可以选择天、月、年。对于数据的查询,可以选择建立View视图方便两张表的查询,也可以使用Where+Union All语句对两张表合并查询。Impala可以根据表不同的数据存储方式选择最适合的优化方式。Kudu表和HDFS表的时间分隔界限可以根据实时数据的迟到情况决定。

此时的滑动窗口模式实现了:

  • 业务和运营人员可以立即查询到实时数据的变化。
  • 在一定时间段内可以对错误数据进行修正和补充。
  • 避免了HDFS小文件产生。
  • 减少了Kudu的系统开销。

流程的具体分析

  1. Kudu表、Parquet格式的HDFS表,可以选择建立相应的View视图或者不建立。
  2. Kudu预添加时间分区,防止新来的数据会因为找不到新的分区而报Non-Range的错,丢失数据。
  3. Kudu复制历史的不可变数据到HDFS层。此时的数据有可能会在HDFSKudu层出现冗余,因此需要使用where语句保证数据不会出现冗余。如果有使用View视图的话,需要执行Alter语句更新View
  4. 运行COMPUTE STATS更新Impala的元数据信息,并删除Kudu旧的分区。

实例

  1. 建表
CREATE TABLE db.kudu_table 
(  
    kafka_offset string,
    data         string,
    event_time   timestamp,
   PRIMARY KEY (kafka_offset, event_time)
) 
PARTITION BY 
   HASH (kafka_offset) PARTITIONS 4,
   RANGE (event_time)
     PARTITION '2019-03-28' <= VALUES < '2019-03-29',
     PARTITION '2019-03-30' <= VALUES < '2019-03-31'
   )
COMMENT 'xxx' 
STORED AS KUDU;
CREATE TABLE db.hdfs_table 
(  
    kafka_offset string,
    data         string,
    event_time   timestamp
) 
PARTITION BY (insert_time string COMMENT '历史时间' ) 
COMMENT 'xxx' 
STORED AS PARQUET;

假设我们的数据是从Kafka中实时读取的,为了保证消息的精确一次,可以选择KafkaOffset+Partition进行组合作为Kudu的主键保证消息在数据库存储时有且仅有一次记录。主键HashBucket的个数取决于你的数据更新频率。

注意:下面的任务均由定时调度工具执行,定时调度工具可以选择Airflow、Crontab等工具。

  1. 提前添加Kudu分区。
ALTER TABLE db.kudu_table 
ADD IF NOT EXISTS 
RANGE PARTITION "2019-04-01" <= VALUES < "2019-04-02";
  1. 将两天前的数据更新进HDFS表。这里使用的是Airflow的语法。
INSERT OVERWRITE TABLE db.hdfs_table partition(history_time='{{ macros.ds_add(next_ds, -2}}')
SELECT kafka_offset, data, event_time 
FROM db.kudu_table
where "{{ macros.ds_add(next_ds, -2) }}" <= event_time
and event_time < "{{ macros.ds_add(next_ds, -1) }}";
  1. 确定数据是否更新进去HDFS表。
select count(*) 
from db.hdfs_table 
where history_time='{{ macros.ds_add(next_ds, -2}}'
  1. 为了防止意外会选择删除三天前的分区,而不是当天更新进HDFS层的数据。
ALTER TABLE db.kudu_table 
DROP IF EXISTS 
RANGE PARTITION "{{ macros.ds_add(next_ds, -3}}" <= VALUES < "{{ macros.ds_add(next_ds, -2}}";
  1. 为了Impala获得最好的运行性能,使用COMPUTE STATS语法,预计算和更新Impala的元数据。
COMPUTE STATS db.kudu_table;
COMPUTE INCREMENTAL STATS db.hdfs_table;
  1. 可以建立View视图对数据进行历史数据和实时数据的联合查询,也可只针对kudu_table查询实时数据的更新情况。
CREATE VIEW unified_view AS
SELECT kafka_offset, data, event_time
FROM db.kudu_table
WHERE event_time >= date_add(now(), -2)
UNION ALL
SELECT kafka_offset, data, event_time
FROM db.hdfs_table
WHERE history_time < date_add(now(), -2)

总结

其实我们换个视角,使用The Beam Model去看待整个流程,我们会发现:

  • 计算的结果是什么?数据从Kafka流入Kudu,再转换成Paruqet格式文件。transformations
  • 在事件时间中的哪个位置计算结果?选择滑动窗口,在两天的时间间隔处计算数据,将Kudu数据转换成Parquet格式。windowing
  • 在处理时间中的哪个时刻触发计算结果?使用固定的处理时间(一天间隔)触发数据的转换,迟到两天的数据便不再处理(水印)。triggers + watermarks
  • 如何修正结果?数据只会不断增加,不会修改。accumulation

原文发布于微信公众号 - 鸿的笔记(goodreadman)

原文发表时间:2019-04-08

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

扫码关注云+社区

领取腾讯云代金券