写了快两个月Structured Streaming的代码,最近刚把数据迁移代码写完。
今晚有点时间,想着给大家分享一点我在基于Hudi实现CDC的一些经验。
每个公司的场景会有些不一样,
尤其是使用一些之前没有深度使用的技术,
每一种场景的切换,
都需要一路做各种尝试,各种优化,
CDC流式程序听起来简单,但其实还是有很多细节值得去考虑。
我先把这些生产上大概率会遇到的问题放在这,大家看看脑海里是否有答案:
暂时想到这么多,
里面有一些是跟Structured Streaming有关的,
不过很多问题,用其他流计算引擎也都会遇见。
所以,纠结用Spark还是Flink没用,还是要去解决问题。
大家看过了上面的问题,
大部分一说同步程序,
都会觉得CDC的流式应用看起来简单,
——就是解析一下CDC JSON日志,然后写入到Hudi就结束了。
但大家看看,
在写流式程序中碰到了这么多的问题,
都是要去解决的。
不然,它们就会不停地恶心你。
时不时跳出来咬你一口。
所以,架构固然重要,
对于开发来说,
细节也很重要。
上面列出的这些个问题都能解释得很清楚,
而且可以原理上跟面试官聊明白,
如果你去面试的刚好是流式应用开发,
应该是能拿到一个不差的offer。
背生硬的面试题别人是能感受到的。
结合着实际生产再去聊原理才有说服力。
篇幅有限,我试着慢慢来给大家介绍这些问题的处理方案。
肯定有更好的方案,我只说我在一个一般规模的生产上的尝试。
如果做Demo就算了,不会有说服力的。
如果大家将来开发的应用数据量非常小,
不用考虑效率问题,
不需要考虑Kafka积压问题。
那其实,
程序容易开发很多。
代码怎么写都不会死。
但这种场景对大家的技术能力不会有提升,
在将来的面试,
即使用了Hudi、ClickHouse、Flink一些较新的技术也不会有亮点,
大多数人已经不吃这套了。
除非去的那个公司的技术人员压根一点都不懂。
现在在公司里面,Kafka应用还是很多的。
如果数据库开启了CDC,
这些CDC日志都放在一个topic中,
还是说一个表对应一个topic?
这个问题其实要看场景,会有多种选择。
还是那句话,如果表数量小,无所谓了,怎么搞都行。
但如何表数量非常多,每个表对应一个topic,会对ZK产生较大影响。
说几种设计方法供大家参考:
CDC日志一定是有序的。我给大家举个例子就知道了。假设执行以下几个DML操作:
大家想一下,如果顺序出现错乱是导致什么问题?例如,因为网络延迟,进入到Kafka中的数据变成了:
看出来了吗?当第三次去UPDATE的时候,数据已经被清理掉了,如何UPDATE?
再说一个更严重的,假设Kafka CDC日志的有多个分区,且生产者端写入策略是轮询方式。会出现问题?
我们发现,CDC日志全都是乱序的。我们将来看到的数据也都是错的。
有几种办法给大家做参考:
大家如果在跑数百张表的数据CDC到Hudi。
你会惊奇地发现,这跟跑几张表的DEMO完全不是一码事。
就是特别的慢。并行度特别高的情况,HDFS的负载也是特别高。
上百张表如果不去优化,想要跑出来一个不过的效果,轻轻松松吃掉集群几个TB的资源。
所以,我需要来跟大家聊聊我在设计流程序过程中给应用做的优化。
我们都知道Spark是基于DAG来进行stage调度,然后在基于TaskSceduler调度一个个任务的。
因为我们需要对CDC日志进行解析、验证以及转换处理。
所以,每一次计算都有可能会导致从源头重新拉取数据。
我们的CDC程序中要刷入上百张Hudi表,兄弟如果你没有做cache,这意味着:
Streaming程序需要从Kafka重复拉取上百次数据
如果有上千张表就更恐怖了。
大家可以自己去测试一下,在落地到表之前,不做cache的后果。
Kafka的topic中的数据是很大的,单个topic几十亿、上百亿的消息是正常水平。
大家可能会说,没事啊。
Kafka的吞吐超级高,
但考虑一下,吞吐再高,也经不住这样重复消费数据。
而且Kafka的吞吐会受到服务器IO的影响。
如果Kafka没有做限流,
一旦Kafka负载过高,导致其他的系统也无法正常生产、消费Kafka的数据。
一首《凉凉》自己唱吧。
开启了Structured Streaming的cache后,
然后我们发现Kafka的负载下降了很多。
高兴坏了。
然后,发现每次刷入数据到Hudi时,光读取数据就要几分钟。
看了一下DAG,
确实不再从Kafka直接拉数据,
而是从cache中拉取数据,
这个cache也不小呢,每次Batch cache几十GB、上百GB。
每次对表做一次计算,都需要从扫描整个cache。
那么有几百表,
这个cache就需要被扫描几百次,
我需要让每个表后续的计算尽量读取少一些数据。
所以,我在基于batch的cache的基础之上。
再次做了一个针对表的二级缓存。
后续针对表的操作,直接拉取到的二级缓存就只拉取自己的数据即可。
总结下:
刷几张Hudi表,
我们会发现,好快哦!好High哦!
image-20210913232847124
但是随着刷入的表越来越多,
发现Structured Streaming写入Hudi越来越慢。
而且你发现,Spark的任务并发没有利用好。
明明有几百个container,
并行的任务却只有几十个。
一个个的表地写。
所以,根据实践,
我们可以判断在foreachBatch中,Spark是单线程调度。
我们有几百张表需要刷入到Hudi中。
一个个表刷显然太不现实了。
刷入的数据太慢,
Kafka进数非常快,这就会导致,当我们正在消费某个数据。
Kafka积压的数据太多了,
所以触发了清理操作。
然后数据还没有被数据就丢掉了。
所以,根据实践,
我们需要自己来实现多线程调度,
你会用到Java的并发包,
然后一次将数据刷入若干个Hudi表中。
至少,一次启用几十个线程来刷Hudi表是没有问题的。
在开发环境,调通了一个表的CDC日志解析后。
看见 Structured Streaming 能够即时将数据正确地刷入到Hudi。
天哪!历经困难重重,终于把数据刷到湖仓里面。
打开Spark SQL的cli,数据也能够正确的查询查询出来,统一hoodie_record_key对应的数据也能正确更新。
所以,我高兴地将Maven Profile切换到prod。
准备到准生产做一个验证。
几分钟地等待,
Maven把所有的shell、python、配置文件打包到了一个tar.gz。
废了九牛二虎之力,
将tar.gz包上传到准生产。
将要刷入LakeHouse的目标表元数据初始化好。
模拟上线大概几百张目标表。
当YARN把Streaming应用拉起来的时候,
我就发现有点不妙。
这些个表,
跑一次batch发现Web UI就展示了几千个Stage。
准生产的HDFS集群负载一下飚满。
您猜怎么招?
Hudi要处理小文件,
就需要检查HDFS上的文件,
并且将小文件合并。
是不是感觉似曾相识?
我肯定你在Kudu、HBase等LSM结构的Compaction中见过。
写放大。
是不是慢点就慢点?
大不了数据就延迟大点。
不!
这样的写放大,
HDFS负载会猛增,
其他的任务还要不要玩?
还有,你确定Kafka会一直保存那些被积压的数据吗?
Log Compaction和Log Deletion会是摆设?
所以,这程序如果这样,
熬不了一天,在半夜业务库刷数的时候,就会直接因为Kafka数据丢失导致应用退出。
神马?
不退?
任何人都无法保证最终的数据是正确的。
耶稣都保不住,我说的。
你说:是不是该去调Spark、Hudi参数了?
大可以去试试,
在资源有限的情况下,
有很大可能会无功而返。
我问个问题:业务库的表中是不是每个表无时无刻都在刷数?
我想,95%的业务系统不会。
业务库中一定会有一些表是缓慢变化的。
而针对缓慢变化的业务表,根本没有必要每个Batch都去检查小文件、合并。
所以,每当在将数据刷入目标表之前。请一定要检查,当前这个Batch中有哪些目标表需要刷数。
这一期先写到这里啦。
下回我们接着聊基于Hudi的LakeHouse生产实践。