来源:知乎(https://zhuanlan.zhihu.com/p/77677075)
作者:阿莱克西斯
声明:本文版权归原作者阿莱克西斯所有,未经原作者同意不得转载。
By 暴走大数据
场景描述:本文是我在浏览技术文章时候发现的,写的非常棒,特意跟原作者联系申请的转载,这篇文章从最根本的原因上讨论了流式计算系统中的【端到端一致性】这个问题,建议大家收藏,然后抽时间认真读一读。
关键词:流式计算 端到端一致性
本文是我在浏览技术文章时候发现的,写的非常棒,特意跟原作者联系申请的转载,这篇文章从最根本的原因上讨论了流式计算系统中的【端到端一致性】这个问题,建议大家收藏,然后抽时间认真读一读。
长文预警, 全文两万五千多字, 37页word文档的长度。讨论了分布式最难的2个问题 1. Exactly Once Message processing 2. 保证消息处理顺序。
这篇文章可以说是作者压箱底儿的知识总结(之一,毕竟作者学的东西很杂 ╮( ̄▽ ̄"")╭ )了. 断断续续写了将近三个月, 耗费了大量的精力, 本来的目的本来只是想对比一下各个state of art的流系统有什么不同, 但是写出来之后只是乱糟糟的罗列数据和资料,像这样列数据一样,“介绍下这个framework这样实现的,所以有这样的特性”,“那个是那样的”.... blablabla... 使得文章只停留于表层,这样写并不是一个好的笔记,我想记录的是更深入更本质和更精髓的一些东西:我想更深入的探讨一个分布式系统的设计是被什么现实和本质问题所逼迫的结果, 2个不同的设计到底是在哪个本质问题上分道扬镳才造成了系统设计如此的不同,追寻问题和解法的前因后果,让未来的自己一眼扫过去之后能够自然而然的回忆和理解出: "嗯,对,在这种现实限制下,要达到这种效果就必须这么做", 我想从乱糟糟的观察到的混乱表象中,抽象出流系统所面对的问题和解决方案的本质; 这就是本文的目的,和文章中除开引用文献之外,作者所贡献的一些自己的思考;
就作者学习流系统的感受来看, 流系统有2个难点, 第一是end to end consistency,或者说exactly once msg processing; 第二则是event time based window操作; 本来作者想用一篇文章同时概括和比较这2点; 无奈第一点写完, 文章已经长度爆炸; 于是分开2篇, 此为上篇, 着重于从分布式系统的本质问题出发, 从最底层的各种"不可能", 和它们的解(比如:consensus协议)开始, 一层一层的递进到高层的流系统中, 如何实现容错场景下的end to end consistency,或者说exactly once msg processing。
正文
一些术语
一致性其实就是业务正确性, 在不同的业务场景有不同的意思, 在"流系统中间件"这个业务领域, 端到端的一致性就代表Exact once msg processing, 一个消息只被处理一次,造成一次效果; 注意: 这里的"一个消息"代表"逻辑上的一个", 即application对中间件的期待就是把此消息作为一个来处理, 而不是指消息本身的值相等, 比如要求计数+1的一个消息, 消息本身的内容可能一模一样, 但是application发来2次相同消息的"本意"就是要计数两次, 那么中间件就应该处理两次, 如果application由于超时重发了本意只想让中间件处理一次的+1操作, 那么中间件就应该处理一次; 中间件怎么能区分application的"本意"来决策到底处理一次还是多次, 是end to end consistency的关键.
EOMP
由于Exactly once msg processing太经常出现, 我们用EOMP来代替简写一下;
为了方便讨论,后边谈到failure, 我们指的都是crash failure, 你可以想象是任何可以造成“把机器砸了然后任何本地状态丢失(比如硬盘损坏)一样效果的情况出现"; 在今天的虚拟云时代,这其实很常见,比如container或者虚拟机被resource manager突然kill掉回收了, 那么即使物理机其实没有问题, 你的application的逻辑节点也是被完全销毁的样子;
容错在end to end Consistency的语义下,是指在机器挂了,网络链接断开...等情况下,系统的运算结果和没有任何failure发生时是一摸一样的.
后边我们可以看到, 保证字面上的Exact once msg processing(即整个系统在物理意义上真的只对消息处理一次), 这在需要考虑容错的情况下是不可能做到的; Effective once msg processing是一个更恰当的形容,而所有号称可以做到EOMP的系统,其实都只是能做到Effective once msg processing; 即:中间件, 或者说流处理framework可能在failure发生的情况下处理了多次同一个消息,但是最终的系统计算结果和没有任何failure时, 一个消息真的只处理了一次时计算的结果相等; 这和幂等息息相关;
一个相同的操作, 无论重复多少次, 造成的效果都和只操作一次相等; 比如更新一个keyValue, 无论你update多少次, 只要key和value不变,那么效果是一样的; 再比如更新计数器处理一次消息就计数器+1, 这个操作本身不幂等, 同一个消息被中间件重"发+收"两次就会造成计数器统计两次; 而如果我们的消息有id, 那么更新计数器的逻辑修改为, 把处理过的消息的id全记录起来, 接到消息先查重, 然后才更新计数器, 那么这个"更新计数器的逻辑"就变成幂等操作了;
把本不幂等的操作转化为幂等操作是end to end consistency的关键之一.
和幂等有些类似, 不过是针对一个计算; 相同的input必得到相同的output, 则是一个确定性(deterministic); 比如从一个msg里计算出一个key和一个value, 如果对同一个消息运算无数次得到的key和value都相同, 那么这个计算就是deterministic的, 而如果key里加上一个当前的时钟的字符串表示, 那么这个计算就不是确定性的, 因为如果重新计算一次这个msg, 得到的是完全不同的key;
注意1: 非确定性计算一般会导致不幂等的操作, 比如我们如果要把上边例子里的keyvalue存在数据库里, 重复处理多少次同一个msg, 我们就会重复的插入多少条数据(因为key里的时间戳字符串不同);
注意2: 非确定性计算并非必然导致不幂等的操作,比如这个时间戳没有添加在key里而是添加在value里, 且key总是相同的, 那么这个计算还是"非确定性"计算; 但是当我们存数据的时候先查重才存keyvalue, 那么无论我们重复处理多少次同一个msg, 我们也只会成功存入第一个keyValue, 之后的keyValue都会被过滤掉;
支持非确定业务计算的同时, 还能在容错的情况下达成端到端一致性, 是流系统的大难题, 甚至我们今天会提到的几个state of art的流系统都未必完全支持; (好吧Spark说的就是你)
分布式系统最tricky的问题就是, 问题看起来很普通很简单; 一些问题总是看起来有简单直接的解法,而一个"简单解"被人查出问题时,也总是看起来可以很简单的就可以把这个挑出的edge case很简单的解决掉; 然而我们会立刻发现解决这个edge case而引入的新步骤会引发新的问题... 如此循环, 直到"简单"叠加到"无法解决的复杂".
由于人们对这些问题的"预期是简单的", 所以很多书, online doc, 都大大简化了对问题的描述和对问题的分析; 最普遍的是对failure recovery的介绍, 一般只会简单的写"failure发生时, 系统会怎么recovery", 但是完全不提怎么检测failure和“根本不可能完美检测到failure”这个分布式系统的基本事实, 从而给了读者“failure可以完美检测”的错觉;
这是因为一来说清楚各种edge case会大大增加文档的复杂性, 另外一点是写了读者可能也看不明白, 还有就是广告效应, 比如真正字面意义的exactly once msg processing是不存在的, 但是所有其他做到effective once msg processing的系统都说自己可以支持exactly once, 那自己也得打这个广告不是; 还有就是语焉不详, 比如某stream系统说自己可以实现exactly once msg delivery, 别看delivery和processing好像差不多, 这里边的用词艺术就有意思了,delivery是指消息只在stream里出现一次, 但是在stream里只出现一次的消息却无法保证只被consume一次确根本不提; 再比如某serverless产品处理某stream的消息, 描述是保证旧的消息没有处理之前不会处理新消息, 你会想, 简单描述成保证消息按顺序处理不是一样么? 其实差大了去了, 前者并没有屏蔽掉旧消息突然replay, 覆盖掉新消息的处理结果的edge case, 而这个事实甚至颠覆了很多使用这个服务的Sr. SDE的对其的认知;
没有理解分布式系统的几个简单的本质问题之前, 你读文档的理解很有可能和文档真正精准定义的事实不符; 且读者对“系统保证”的理解, 往往会由于文档"艺术"定义的误导, 而过多的假设系统保证的"强", 直到被坑了去寻根问底, 才会收到"你误读了文档的哪里的详细解释";这是分布式系统"最难的地方在最普通的地方"的直接结果之一;
个人认为最好的办法就是去理解分布式系统软件算法所能达到的上限=>关于各种impossibility的结论的论文,然后去学习克服他们的方法的论文; 这样, 我们才能从各种简化了的 tutorials里, 从API中, 从各种云服务, 框架的广告词背后, 发现“圣光不会告诉你的事", 和"这个世界的真相";(从广告和online doc天花乱坠的描述中看到分布式系统设计真正的取舍, 这是区分API调包侠和分布式系统专家的分水岭之一); 而不是“简单的信了它们的邪”; 而下边,就是学习分布式系统,你所需要了解的最重要事实中, 和end to end consistency相关的几个;
不存在完美的failure detector
很多关于分布式系统的书上都会说,当failure发生时系统应该怎么做来容错, 就好像可以准确的检测到failure一样; 然而事实是, 在目前互联网的物理实现上(share nothing architecture, 只靠网络互联,不直接共享其他比如内存物理硬盘等),我们无法准确的检测到failure;
简单来说,就是当我们发现一个node无反应的时候,比如ping它,给它发消息,request,查询,都没有反应,我们无法知道,这到底是对方已经停止工作了,还是只是处理的很慢而已; 无法制造完美的failure detector, 即使在今天也是分布式系统的基础事实; 本文无意在基础事实上多费唇舌, 无法接受此事实者可以去翻相关论文; ╮( ̄▽ ̄"")╭
Essentially, the impossibility results for Consensus and Atomic Broadcast stem from the inherent difficulty of determining whether a process has actually crashed or is only “very slow”. [30] The fundamental reason why Consensus cannot be solved in completely asynchronous systems is the fact that, in such systems, it is impossible to reliably distinguish a process that has crashed from one that is merely very slow. In other words, Consensus is unsolvable because accurate failure detection is impossible. [30] A fundamental problem in distributed systems is that network partitions (split brain scenarios) and machine crashes are indistinguishable for the observer, i.e. a node can observe that there is a problem with another node, but it cannot tell if it has crashed and will never be available again or if there is a network issue that might or might not heal again after a while. Temporary and permanent failures are indistinguishable because decisions must be made in finite time, and there always exists a temporary failure that lasts longer than the time limit for the decision… A third type of problem is if a process is unresponsive, e.g. because of overload, CPU starvation or long garbage collection pauses. This is also indistinguishable from network partitions and crashes. The only signal we have for decision is “no reply in given time for heartbeats” and this means that phenomena causing delays or lost heartbeats are indistinguishable from each other and must be handled in the same way.[29]
不存在完美的failure detector, 所导致的几个颠覆你认知的问题:
严重注意,在分布式系统里,我们需要单一责任的一个节点/processor/role来做决策或者处理信息时,我们要么不保护系统的高可用性(机器挂了就停止服务),要么解决zombie process会带来的问题;高可用性的系统中, zombie无法消除;这关系到分布式系统设计里的一个核心问题:liveness和safety的取舍;
在缺乏完美的failure detector的情况下, 对方迟迟不回信息(ping它也不回), 不发heartbeat, 那么本机只有2个选择: 1. 认为对方还没有crash, 持续等待; 2. 认为其crash掉了, 进行failover处理;
选择1伤害系统的liveness, 因为如果对方真的挂了,我们会无限等待下去, 系统或者计算就无法进行下去; 选择2伤害系统的safety, 因为如果对方其实没有crash, 那我们就需要处理可能出现的重发去重, 或者zombie问题, 即系统的逻辑节点的“角色唯一性“就会被破坏掉了;
越好的liveness要求越快的响应速度, 而“100%的safety“的意义, 则因系统的具体功能的不同而不同, 但一般都要求系统做决定要小心谨慎, 不能放过一个edge case, 穷尽所有必要的检查来保证"系统不允许出现的行为绝对不会发生"; 在consensus的语义下来说, safety就是绝对不能向外发出不一致的决定(比如向A说决定是X, 后来向B说决定是Y);
可以看到, 系统的edge case越多, safety越难保证, 而edge cases的全集只是可能发生的情况的集合, 而某一次运行只会发生一种情况(且大概率是正常情况); 如果系统不检查最难分辨最耗时的几种小概率发生的edge case, 那么系统大概率(甚至极大概率)也可以完美运转毫无问题几个月, 运气好甚至几年; 这样降低了系统的safety(不再是100%), 但是提高了系统的响应速度(由于是概率上会出问题, 所以即使降低了safety保证, 也不是说就一定会出问题, 只是你把系统的正确性交给了运气和命运); 而如果系统保证检查所有的edge case, 但是系统99.9999%的概率都不会进入一些edge cases, 那么这些检查就会阻碍正常情况的运算速度; Liveness和Safety, 这是分布式系统设计的最基本取舍之一;
而FLP则干脆说: 在分布式consensus这个问题里, 如果你想要获得100%的系统safety, 那么你绝对无法保证系统liveness, 即:系统总是存在活锁的可能性, 算法设计只能减小这个可能性, 而无法绝对消除它;
Kubernetes StatefulSet, 简单说是可以给容器(pod/container)指定一个名字的, 且保证全cluster总是只有一个容器可以有这个名字, 这样application就可以通过这个保证来指定机群中的逻辑角色, 且用这个逻辑容器中保存一些状态; (一般的replicaSet会load balance连接或请求到背后不同的节点, 你的一个请求要求在server本地存一些状态, 下一个请求未必还会到同一个server)
When a stateful pod instance dies (or the node it’s running on fails), the pod instance needs to be resurrected on another node, but the new instance needs to get the same name, network identity, and state as the one it’s replacing. This is what happens when the pods are managed through a StatefulSet. [37]
Kubernetes StatefulSet在liveness和safety里选择了safety, 当statefulSet所在的的物理节点"挂了"之后, kubernetes默认不会重启这个pod到其他节点去, 因为它无法确定这个物理节点到底死没死, 为了保证safety它选择放弃了liveness, 即系统无法自愈, StatefulSet提供所提供的服务不可用, 直到靠人干预来解决问题;
([38] P305: 10.5. Understanding how StatefulSets deal with node failures) Node fail cause daemon of Kubelet could not tell state of pod on the node….StatefulSet guarantees that there will never be two pods running with the same identity and storage...
Akka Cluster也做了相同的选择, 在cluster membership管理中,有一个auto-downing的配置, 如果你打开它, 那么cluster就会完全相信Akka的failure detection而自动把unreachable的机器从cluster中删去, 这意味着一些在这个unreachable节点上的Actor会自动在其他节点重启; Akka Cluster的文档中, auto-downing是强烈不推荐使用的[38], 这是由于Akka Cluster提供的很多feature要求角色的绝对单一性, 比如singleton role这个功能, 在保证“cluster里只有这一个节点扮演这个actor"(safety), 和保证"cluster里总要有一个节点扮演这个actor"(liveness) 中, 选择了safety, 即保证at most one actor存在于cluster中, 一旦次actor的节点变成unreachable(比如机器真的挂了), 那么Akka也无能为力, 只能傻等这个节点回来或者人来干预决策:
The default setting in Akka Cluster is to not remove unreachable nodes automatically and the recommendation is that the decision of what to do should be taken by a human operator or an external monitoring system. [29]
一个商用的Akka拓展(Akka Split Brain Resolver)提供了一些智能点的解决方案(基于quorum), 有兴趣的同学可以看引用文档[29];
This is a feature of the Lightbend Reactive Platform. that is exclusively available for Lightbend customers.[29]
这是因为这两个作为比较底层的平台, 他们需要对上层提供非常大的自由性, 而不能限制上层的活动; 比如kubernetes没有规定用户不能在pod上跑某种程序, Akka也没有规定用户不能写某种actor的code; 这样, 在不限制自己处理能力的同时要保证任何行为都看起来exactly happen once(因为语义上singleton节点只有一个, 那么就不能让用户写的任意单线程程序出现多节点平行执行的外部效果), 而这对中间件来说是不可能的, 这就引出了另外一篇论文: end to end argument[27], 作者已经写过一篇文章详细介绍end to end argument(阿莱克西斯:End to End Argument(可能是最重要的系统设计论文)), 这里不在赘述; 后边我们可以看到Flink, Spark等流系统为了保证exactly once msg processing需要怎样和end to end argument 搏斗;
可以同时保证safety和liveness么
取决于具体情况下对safety和liveness的具体要求, 在流处理的情况下, 至少本文提到的4种流系统都给出了自己的解; 请耐心往下阅读;
由于异步环境下, 钉死了我们不可能有一个完美不犯错的failure detector; 这篇著名的论文Unreliable Failure Detectors for Reliable Distributed Systems[30] 详细描述了即使我们用一个不准确的failure detector, 也可以解决consensus的方法; 但是它并没有推翻FLP impossibility的结论:Consensus还是并非绝对可解; 但是, 如果我们对需要consensus的计算加一个限制,则Consensus可解;
这个限制是: 计算和通讯只需要在"安全时间"内完成即可, 对[30]提供的算法来讲, 提供consensus的系统需要在这段时间内"正确识别crash"即可,也就是说(1)识别出真正挂掉的node, 和(2)不要怀疑正确的node;
怎么理解呢, 这两个看似对立的概念: (1)consensus的有解(比如paxos协议)是对的, (2)consensus的无解证明:FLP impossibility也是对的; 要准确且简单的解释为什么它们都是对的有点难, 推荐还是看论文; 但是用比喻来解释的话, 根据[30], Consensus算法可以看作这样一个东西, 当系统出现crash, failure detector判断错误,或者网络突然延迟...等时候, 算法会进入某种循环而不会轻易作出决定;
for example, there is a crash that no process ever detects, and all correct processes are repeatedly (and forever) falsely suspected — the application may lose liveness but not safety [31]
而只要满足必要的条件时(计算和通讯只需要在"安全时间"内完成), 系统则可以跳出循环让机群达成一致[30,31];
(1) There is a time after which every process that crashes is always suspected by some correct process. (2) There is a time after which some correct process is never suspected by any correct process. The two properties of <>W0 state that eventually something must hold forever; this may appear too strong a requirement to implement in practice. However, when solving a problem that “terminates,” such as Consensus, it is not really required that the properties hold forever, but merely that they hold for a sufficiently long time, that is, long enough for the algorithm that uses the failure detector to achieve its goal.
而FLP impossibility则可以理解为挑刺儿的说, 那这个条件永远无法出现呢? 你的算法就活锁了呀(丢失liveness);
幸运的是, 在现实世界, 我们总可以对消息传递和处理来估计一个上限, 你可以理解为,只要消息处理总是在这个上限之内完成,那么consensus总是可以实现, 而消息处理的时间即使偶尔超过了这个上限, 我们的consensus协议也会进入安全循环自我保护, 从而不会破坏系统的safety, 而系统总是可以再次回归平稳(处理时间回归上限之内); 而FLP则是像说: 你无法证明系统总是可以回归平稳 (确实无法证明, 因为FLP的前提是异步模型, 而我们的真实世界更像是介于异步和同步模型之间的半同步模型, 我们只能说极大概率系统可以"回归平稳", 而无法证明它的绝对保证; =>可以绝对保证"上限"的模型一般称为同步模型);
其实用paxos来模拟出FLP的活锁的例子也很简单, 你把节点间对leader的heartbeat timeout时间设为0.001ms, 那么所有的节点都会忙着说服别的其他节点自己才是leader(因为太短的保活时间, 除了自己, 节点总是会认为其他的任意节点是leader时, leader死了), 那么系统就会进入活锁, 永远无法前进达成cluster内的consensus, 系统丧失liveness;
即使consensus问题解决了, zombie节点也还是大问题, kubernetes和Akka可以选择避开zombie, 损失liveness; 然而对于绝大多数分布式系统来说, 是必须直面zombie节点这个问题的,比如各种分布式系统的master节点, 如果master挂了整个系统不在另外的机器重启master,整个系统就可能变为不可用; 再比如kafka和Kinesis的单一partition只能有一个consumer, 如果这个msg consumer挂了不自动重启, 对消息的处理就会完全停止;
zombie是最容易被忽视的问题, 比如, 即使我们有了paxos, raft, zookeeper这种consensus工具可以帮我们做leader election, 也不要以为你的系统中不会同时有2个leader做决策了; 这是因为先一代的leader可能突然失去任何对外通信,或者cpu资源被其他进程吃光, 或者各种edge case影响, 使得其他节点无法和其通信, 新的leader被选出, 而老的leader其实还没死, 如果老的leader在失去cpu之前的最后一件事是去写只有leader才能写的数据库, 那么当它突然获得cpu时间且网络恢复正常, 那么这个以为自己还是leader的zombie leader就会出乎意料的去写数据库;
这曾经是HBase的一个重大bug[39, Leader Election and External Resources P105],
Apache HBase, one of the early adopters of ZooKeeper, ran into this problem in the field. HBase has region servers that manage regions of a database table. The data is stored on a distributed file system, HDFS, and a region server has exclusive access to the files that correspond to its region. HBase ensures that only one region server is active at a time for a particular region by using leader election through ZooKeeper... The region server is written in Java and has a large memory footprint. When available memory starts getting low, Java starts periodically running garbage collection to find memory no longer in use and free it for future allocations. Unfortunately, when collecting lots of memory, a long stop-the-world garbage collection cycle will occasionally run, pausing the process for extended periods of time. The HBase community found that sometimes this stop-the-world time period would be tens of seconds, which would cause ZooKeeper to consider the region server as dead. When the garbage collection finished and the region server continued processing, sometimes the first thing it would do would be to make changes to the distributed file system. This would end up corrupting data being managed by the new region server that had replaced the region server that had been given up for dead.
(解释不动了, 大家看英文吧...)
其实对付zombie已经是分布式系统的共识了,也有很多标准的解法,以至于各个论文都不会太仔细的去描述, 这里简单介绍几种方法:
zombie fencing设计的关键点在于如何阻止已经“成为zombie的自己”搞乱正常的“下一代的自己”的状态;毕竟无论是zombie还是新的要取代可能死掉的上一代的"下一代", 大家跑的都是相同的代码逻辑,也就是说这同一段代码, zombie来跑就"不能过:"(比如不能对系统的状态造成影响), 但是"下一代"来跑, 就可以正常工作;这一般需要满足以下几点:
如果被影响的系统是自己的一个microservice,那么可以随意设计协议来验证一个请求所携带的epoch number是不是最新的;而当这个被影响的系统是一个外部系统, 比如是业务系统需要用到的一个数据库,由于你没法改数据库的代码和数据库client与server之间的协议, 那么就要利数据库所提供的功能或者说它的协议来设计application层级的zombie fencing协议;比如对提供test and set,compare and swap的kv数据库来说,application设计自己的业务表时,要求所有的表都必须有一个epoch字段,而所有的写入都必须用test and set操作来检测当前epoch字段是否比要写入的请求的epoch字段大或相等, 否则拒绝写入; 这样, 只有"下一代"可以更改zombie写入的数据, 而zombie永远无法更改"下一代"插入或者更新过的数据;
另一方面,很多时候"下一代"需要读取上一代的信息,继承上一代的数据,然后继续上一代的工作;那么如果刚读取完数据,zombie就改变了数据,那么"下一代"对于当前系统状态的判断就会出差错;一个general的解决的方法也很简单,要读先写,“下一代”开始工作前, 如果要先读入数据了解“系统的当前状态”,必须先改变数据的epoch number为自己的epoch number(当然要遵从只增更改原则test and set,如果发现当前数据的epoch number已经比自己的epoch number还大了,则说明自己也已经是zombie了,更新的"下下一代"已经开始工作), 更改数据的epoch number成功之后,再读入数据,就可以保证比自己老的zombie绝对不可能再更改这个数据,而现在读入的数据可以体现系统的最新状态,从而完成对"老一代"数据的继承;而在增加epoch number之前所有被写入的数据; 这里即使是"新一代"启动之后, 读取系统状态之前被zombie写入的数据, 都可以看做老一代的合法数据,只要被新一代开始工作前继承读入即可; 我们所要避免的是"新一代" 所读取的事实被zombie所更改; 而不是在物理时间的意义上在"新一代"启动时就立刻阻止zombie的所有系统改动;
zombie fencing的设计取决于分布式系统的具体情况,比如业务逻辑可能更改的数据范围可能是几百万几千万的数据记录,那么这也意味着zombie可能会修改的数据范围非常大,那么要求"下一代"在开始工作前更改所有数据的epoch number就很不现实;
对于zombie的影响的耐受性也会影响zombie fencing的设计,比如如果"下一代"只需要自己所接触的有限数据在特定时刻之后不被zombie影响就能正确工作, 那么只要在"下一代"需要接触特定数据时才更改此数据的epoch number来屏蔽zombie即可,那么即使业务可能修改的数据范围很大,简单的更改数据的epoch number也还是可以接受的解决方案;
最糟糕的情况,如果"zombie"可能会插入新的数据, 而"下一代"的正常工作 需要不能有非法的新数据插入(比如下一代开始工作前先统计所有资源的个数,然后开始基于这个事实和"只有自己才能更改资源"的假设,作出各种决策, 而此时zombie突然插入了一条新资源记录或者资源使用记录...),如果"新一代"完全无法预测zombie会插入什么记录,要阻止zombie随意插入数据,“新一代”就只能在利用predicate lock来防止新纪录插入,且不说很多数据库根本不支持“锁住不存在的数据”的predicate lock,就算支持此功能的数据库也很有可能是使用表级锁来锁住整张表;如果数据表设计成了需要共享给多个节点使用(比如一张资源表,不同的singleton worker负责维护不同的资源范围),那么表级锁就会妨碍其他worker的工作;
zombie fencing的设计在于如何引入简单的fencing点, 对"新一代"畅通无阻,但是却可以阻止zombie的异常活动, 如果协议设计使得"新一代"可以很容易制造这个fencing点, 则"新一代"在启动或者需要的时候加入fencing点即可, 比如前边说的任何数据都要附带一个epoch值,任何数据写入都要用test and set来对比数据的当前epoch值和请求的epoch值; 对于上文的随机插入的业务需求, 可以要求业务逻辑插入任何数据之前,先在一个注册表的属于自己epoch的一行里记录自己要写的数据的id, 且在记录的时候用test and set来检测自己这一行数据的active值是否被更改为disable了;这样就相当于引入了一个更简单的fencing点,因为"下一代"只要在注册表里把所有上一代的记录写为disable, 就可以阻止zombie的未来任何活动,但是此时无法阻止zombie的最后一个注册的数据插入, 但是"下一代"可以简单的读注册表得知这个数据的id, 从而对这个"最后的zombie写入"采取相应的策略(继承,删除, 甚至fencing, 比如这个id并不存在,那么无法得知是zombie真的在写之前死了所以永远不会插入这个记录了,还是zombie只是卡了, 那么"下一代"可以用自己的epoch和zombie注册的id先插入一条记录来占位,这样无论zombie是真的死了还是卡了,都无法再写入这个数据了);这样,我们就引入了一个连数据插入都可以fencing的fencing点;
Zombie fencing一般都是以上这些套路, 用consensus协议确定epoch number区分"下一代"还是zombie,这个epoch number一般也可以称为fencing token, 通过把fencing token分发给需要拒绝zombie的service,把fencing token和需要保护的数据(防止被zombie修改)存在一起;所以一般论文[7, 26]里只会简单的提到epoch或者sequencer等概念,基本都是zombie fencing的fencing token
三点为 (上游/input提供端)=> (当前计算节点/计算结果发送端) =>(下游/计算接收端)
如果我们考虑必须保证系统的高可用性,即检测到任意process的failure,都会由一个(绝对不死)的高可用的JobManager或者MasterNode,来重启(可能在另外的node)这个process, 所以我们定义这种即使所在的host挂掉, 也会不断重新在其他host上重启的process为逻辑process; 这时我们要面临几种可能造成inconsistent的情况:
"计算接受端"没有成功ack"计算结果发送端"的消息,一般表现为发送端的等待ack 超时;根据之前的讨论,接收端有可能把消息处理完毕了(ack的消息丢失,或者刚处理完消息还没发ack就挂了…等情况),也可能没有处理完毕(没接到或刚接到消息就挂了…等情况);
这种情况发送端可以重发信息, 而发送端是需要“上游input提供端”提供某种数据然后进行某种计算后产生的这个消息/计算结果(设为outputA), 那么"计算结果发送端"有两个策略:
策略1: 利用存储计算结果来尽量避免重算
要实现上下游exact once processing,需要实现4个条件
(a. 结果高可用, b.下游去重, c. 上游可以replay, d. 记录上游进度)
a. 要求结果高可用, 应对timeout时, “下游计算接收端”其实并没有成功处理"计算结果发送端"的计算结果的情况(比如下游挂了), 这时"计算结果发送端"可以把计算的结果存储在高可用的DataStore里(比如DynamoDB,Spanner…或者自己维护的多备份数据库); 这样超时只要重发这个计算结果即可, 自己甚至可以开始去做别的事情, 比如处理和计算下一个来自“上游/input提供端“的event, 而已经被“下游计算接收端”ack的"计算结果"则可以清理,一般由异步的garbage collection清理掉. 注意, 由于存在存储失败的可能性, 或者刚计算完结果还没来得及存储就挂掉重启的可能,我们无法真的保证避免重算;详见:无法避免的重算 的例子
b.下游去重,应对timeout时下游其实已经处理完毕消息的情况
c. 要求触发本次计算的“上游input提供端”可以replay input event,否则刚接到event还没计算就挂掉重启, 则event丢失;
无法避免的重算:任何时候计算没完成,或者计算完成后但是成功储存前(a.结果高可用的需求), 计算节点fail掉重启, 我们都需要replay上次计算过的input event,所以由于计算结果都还没存成功,所以从物理上讲, 此时我们还是重算了的; 所以即使我们采用把计算结果记录下来的策略, 我们无法从物理意义上真正避免重算, 我们避免的是有多个"重复的"成功计算结果提交给下游;而当计算不是deterministic的, 这多个“重复的”计算结果可能是不同的值发送给不同的下游((比如按照计算结果的key发送给下游不同的partition); 那么下游就会处理同一个event所产生的本应只有一个的计算结果两次,且由于非确定性计算的原因,这两个计算结果不一样; 这就会造成event不是EOMP的问题; (不仅在物理上计算了2次, 在效果上也影响了2次下游的计算, 打破的effective process once的要求)
d: 要求记录event处理的进度, 并保证储存计算结果不出现重复; 记录event处理的进度, 使得trigger本次计算的"event"可以被屏蔽(比如, ack“上游input提供端”, 告知其input event处理完毕, 可以发下一个了), 来避免计算的re-trigger; 这要求以下策略2选一
否则要么计算结果还没存, input event就被屏蔽掉了(先记录event处理进度, 再储存计算结果的策略下, 刚记录完event处理进度, 发生failure, 此时由于event处理进度已经前进了, 此event不会再replay, 所以此event引发的计算结果永久丢失), 要么多次计算结果存储在DataStore里造成下游的重复信息(先记录结果, 再储存计算结果的策略下, 刚记录完结果, 发生failure, 那么replay上游计算时会生成一个新的计算结果); 注意, 此时下游是无法分辨这是重复信息的, 因为这是datastore里的"2条的记录", 将会获得不同的message id;
幂等和end2end argument: 所以实现原子操作就不需要幂等了么? 是也不是, 在业务层是的, 比如要实现业务层的幂等,我们可以在存计算结果到datastore里的时候把一个与触发本次计算的event的唯一id记录在一起,这样我们每次存的时候就可以使用乐观锁的方式test-and-set, 来保证如果这个id在数据库里没有才插入(取决于业务,我们也可以用这个id当主key来,那么即使多次写入同样的内容也没关系=>要求计算是deterministic的;) 如果我们保证触发计算的event的"屏蔽"和计算结果的储存是一个原子操作,那么我们就不需要上边这种复杂的存储策略,因为一旦计算结果存储成功,触发计算的event必定被"屏蔽"掉了, 而如果没存储成功, 则event一定会replay来重试;
然而在传输层却不是的,比如储存数据库的tcp有可能丢包重发,依靠tcp的传输层id自动去重,实现tcp的幂等;
策略2: 完全依赖重算:
高可靠重发的问题是,所有信息都必须先记录在高可用性的DataStore里, 相对于重新计算,重发需要的网络IO, 存储,状态管理的cost是很高的;而如果触发计算的event可以replay的话(其实不管重算还是不重算,为了防止“刚接到event, 计算节点就挂掉的情况”, 我们都要求event可以replay), 我们就可以选择重算然后重发来代替存储计算结果的重发;重算需要2个条件:
(在多节点流计算里,要求上游可以重发,意味着上游把计算结果存下来了,或者上游可以重算,如果上游需要重算,那么上游需要上游的上游重发,那么上游的上游可以用储存的结果重发或者重算。。。以此类推)
(2种策略其实都有可能造成重算,也都对event replay有需求;为什么还要浪费资源去存储计算结果呢?这里边的重要区别是,当储存结束时,对触发本次计算的上游event的依赖结束了,而不稳定的下游不会造成额外的重算, 和对上游, 上游的上游....计算的"链式反应", 详见流的EOMP中的讨论)
带状态的计算, 比如流计算的某中间节点需要统计总共都收到多少信息了, 每次从上游收到新信息, 都把自己统计的当前历史信息总数更新并发往下游节点, 那么这个"系统的历史信息"就是这个"统计消息总数"的逻辑节点的状态; 由于状态也需要高保活,所以它也一定需要储存在远端dataStore里, 这样储存状态的远端datastore就相当于一个特殊的下游; 不同点在于, 当采用策略2:重算, 而不存储中间计算结果的话, 重算时则需要datastore可以把它所记录的状态rollback到最初刚开始处理此event的那个点; 这里我们只能rollback, 而不能只是依靠幂等来保证“状态的更新是exactly once”的原因是, 节点在处理任意消息时的状态也和当前信息的数据一样是本次计算的input, 而更新后的状态则是本次消息处理的output, 如果重算时不rollback节点的状态, 那么我们就会用一个被本消息"影响过"的状态来进行计算, 而这是会违反exactly once msg processing语义的; 比如节点的本地状态是上次收到的信息的数据上记录的时间戳, 节点的运算是计算2个event数据之间的时间戳差距; 假设eventA发生在时刻0, eventB发生在时刻10, 那么eventB引发的计算应往下游发送10, 并把节点的本地状态更新为10, 此时如果eventB的这个计算需要重算, 但是我们不rollback状态10回到0的话, eventB重算所得的结果就会变成0;
注意: 由于state更新也是处理event的"下游", 那么计算过程中的所有状态更新都可以算作“计算结果”的一部分, 所以当我们需要储存计算结果时,则需要把
这3个操作作为一个原子操作(以后我们称之为"原子完成"来省略篇幅); 而任何时候需要重算的话, 状态必须恢复到处理event之前的样子。
加入state,我们需要把(d. 要求记录event处理的进度, 并保证储存计算结果不出现重复, 更改为 (d+. 要求记录event处理的进度, 并保证储存计算结果和state的更新不出现重复,
并加入要求(e. state需要在replay 上游event的时候rollback到处理event之前时的状态
这些要求稍有抽象,让我们看一下流系统一般怎么达成这些要求;
考虑一个多节点的流系统,如果我们把上游所发来的计算结果当成前边所说的“触发计算的event”,而自己的发给下游的计算结果msg作为触发下游计算的event;那么我们就可以用上边的模型保证两两节点之间的exact once msg processing,从而最终实现端到端的exact once msg processing; 这就是Google MillWheel(DataFlow) 和Kafka Stream的解决方案; 他们都选择把每个节点的计算结果储存起来,并保证即使non-deterministic的计算, 也只有一次的计算会起作用, 而不会出现(策略2-1中提到的non-deterministic造成的不一致);他们的区别是
Google MillWheel(DataFlow)
Kafka Stream
Kafka Stream是建立在kafka分布式队列功能上个一个library, 所以在介绍kafka Stream之前, 我们先来讲一下Kafka
简单介绍Kafka Topic
Kafka的topic可以看作一个partition的queue, 通过发给topic时指定partition(或者用一个partitioner 比如按key做hash来指定使用那个partition), 不同的key的消息可以发送到不同的partition, 相同key的message则可以保证发送到同一个partition去, topic里的信息可以靠一个确定的index来访问, 就好像一个数据库一样,所以只要在data retention到期之前,consumer都可以用同一个index来访问之前已经访问过的数据;
Kafka Transactional Messaging
前边说过, Kafka Stream是建立在kafka分布式队列功能上个一个library, 主要依靠kafka的Transactional Messaging来实现end2end exactly once msg processing;
Transactional Messaging是指用户可以通过类似以下code来定义哪些对kafka topic的写属于一个transaction, 并进一步保证tx的atomic和Isolation
producer.initTransactions();
try {
// called right before sending any records
producer.beginTransaction();
//sending some messages...
// when done sending, commit the transaction
producer.commitTransaction();
} catch (Exception e) {
producer.close();
} catch (KafkaException e) {
producer.abortTransaction();
}
Kafka transaction保证了, beginTransactions之后的, 所有往不同Kafka topic里发送的消息, 要么在commitTransaction之后才能被read-committed的consumer看到, 要么由于close或者failure而全部作废, 从而不为read-committed的consumer所见;
而kafka stream通过用kafka本身的分布式queue的功能来实现了state和记录处理event进度的功能; 因为
这3点, Kafka Stream都是用写消息到kafka topic里实现的, (1)自不必说,本来就是往topic里写数据,(2)其实是写当前consume position的topic; (注意Kafka Stream的上下游消息传递考的是一个中间隐藏的Kafka topic, 上游往这个topic写, 下游从这个topic读, 上游不需要下游的ack,只要往topic里写成功即可, 也不需要管下游已经处理到那里了;而下游则需要维护自己"处理到那里了"这个信息,储存在consume position的topic, 这样比如机器挂掉需要在另外的host上重启计算节点,则计算节点可以从记录consume position的topic里读出自己处理到那里然后从失败的地方重洗开始) (3)其实是写一个内部隐藏的state的change log的topic,和一个本地key value表(也就是本计算节点的state); failover的时候, 之前的"本地"表丢失没关系, 可以冲change log里恢复出失败前确定commit的所有state;
(1)(2)(3)的topic都只是普通的Kafka topic; 只不过(2)(3)由Kafka Stream自动创建和维护(一部分用来支持高层API的(1)也是自动创建)
幂等producer主要解决这么一个问题: Kafka的消息producer, 也就是往Kafka发消息的client 如果不幂等, 那么因为Kafka的接受消息的broker和producer之间在什么是“重复信息”上没有共识的话,则broker无法分辨两个前后一模一样的消息, 到底是producer的本意就是要发两次,还是由于producer的重发(比如:producer在收到broker的"接受成功"的ack之前就挂了,所以不知道之前的消息有没有成功被broker接收, 因此重启后重发了此信息)。此时broker只能选择接受消息,这就造成了同一个消息的多次接受;
同时我们也要解决zombie producer的问题: 如果我们保证producer高可用, 重启我们认为fail掉的producer, 那么其实没死的zombie producer的信息则会造成,重复且乱序的发布消息; (由于zombie的存在, 会有2个producer同时发布我们以为只有一个producer会按顺序发布的消息,这样就无法保证顺序: 比如zombie在发送A, B, C...的时候, 新启动的producer也开始发送A, B, C... )
Kafka的解法:
(Further, since each new instance of a producer is assigned a new, unique, PID, we can only guarantee idempotent production within a single producer session ---- Idempotent Producer Guarantees [26])
在保证两两节点之间的EOMP来实现整个流的EOMP的模型里,如果我们某一个或多个节点的状态和计算结果根本不记录在高可用DataStore里,我们还是可以实现EOMP, 我们只需要(1)replay这个节点的上游来重算这个节点的状态和发给下游的计算结果, (2)下游去重;
如果上游也没计算结果记, 那么replay上游的上游即可, 如果上游的上游也没记....一直追溯到记录了计算结果的上游节点即可;
如果一直都没有failure,那么比起Dataflow和Kafka Stream那种记录所有计算结果的模型 我们少记录一些额外的计算结果和状态就减少了很多系统负载; 这就是重算与记录计算结果模型的结合;
考虑 A,B,C, D 4个节点, A的计算结果传给B, 而B则把一部分计算结果给C一部分给D, 如果B没有记录自己的output, 则Cfail掉之后需要replay上游的input,这就需要B的一些重算来重新制造C所需要的input, 即使B的input(即A)记录了所有的计算结果, 我们还需要"恰巧可以产生这些历史计算结果的"B的历史状态,才能重算出C所需要的input; (所以B必须保存历史状态或者用某种方法重建自己的历史状态才能保证可以重算C所需要的input)
如果C的状态也丢失了, 那么对上游的负担则更重些, B需要重新计算来提供所有的历史计算结果(即C的所有历史input)来让C重建自己的历史状态
可以看到, 任意一个节点的某状态S(n+1)是
同时作为输入而得到的输出; 而这个过程中又会向下游发出一些计算结果O(n+1)
所以M(n) + S(n) => S(n+1) + O(n+1), 当下游crash重启需要O(n+1)时, 我们则有2种选择:
1.是记录计算结果, 2是重算; 两者并用的好处在于, 1可以异步batch进行而不需要节点必须等待O(n+1)记录成功才往下游发送O(n+1); 而2保证了当1还没有完成时, 系统也有足够的信息可以重建O(n+1);
这是一个链式反应, 当重算需要M(n)和S(n)时, 而如果M(n)并没有存则需要上游重算M(n), 上游还没存这些重算M(n)的信息则需要replay上游的上游来重算这些信息,这就是所谓的链式反应...;最极端的情况是什么都没存,那么需要从头开始跑我们的stream程序;
可以看到, 如果没有存中间计算结果或者状态, 那么当这个数据被下游重算需要的时候, 需要我们重算这个数据, 这就会产生对上游的计算结果或者状态的需求, 这就要求我们如果不存下这些数据, 我们就需要记住计算这个数据的数据依赖图, 所以要么把"中间"数据和状态存起来待用, 要么记住他们的数据依赖图; 而这些记录的中间结果只有当对其的所有依赖从计算图中消失时, 我们才可以垃圾回收/删除这些数据(比如所有基于某状态的计算结果都已经存下来了, 那么这个状态的数据就可以删除, 再比如某计算结果所引发的下游计算结果和状态都已经存下来了, 那么此计算结果的数据就可以删除了);从而不会造成储存数据爆炸;
这, 也就是Spark Streaming的解法;
Spark有三种Stream...
(3)还在实验状态, 基本上是把底层都改掉来使用了和Flink相同的Chamdy-Lamport算法[20], 但是貌似还有很多问题需要解决所以目前不支持EOMP, 这里不多聊了;
根据Structural Streaming的论文[12], (2)和(1)使用了相似的方法来保证EOMP, 但是其实作者发现(2)比起(1)还是有一些性能上的改进[21], 但是总体原则还是和(1)类似的利用一个重算关系图lineage来维护各个状态计算结果的依赖关系, 通过异步的checkpoint来截断lineage也就是各个节点状态和计算结果复杂的关系(比如一个数据如果已经checkpoint了, 那么它所依赖的所有状态和计算结果都可以在关系图里删去, 因为replay如果依赖于这个数据, 那么使用它的checkpoint即可, 而不需要知道这个数据是怎么算出来的, 如果还没checkpoint成功, 则需要根据数据依赖图来重算这个数据), 像这样利用checkpoint, 就可以防止lineage无限增长;
但是维护关系图需要利用micro-batch来平衡"关系维护"造成的cost, 否则每一条信息的process都产生一个新状态和新计算结果的话, 关系图会爆炸式增长(用micro-batch, 可能1000条信息会积累起来当作"一个信息"发给下游, 只需要在关系图里记录一个batch-id即可, 而不是1000个msg id, 对与状态来说也是这样,处理1000个msg之前的状态分配一个id, 处理这1000个信息之后的状态一个id, 而不需要记录1000个状态id, 同时他们之间的联系线也从1000条降低为1条;这样就大大减小了关系图维护的负担);
但这样造成的结果是micro-batch会造成很高的端到端处理的latency, 因为micro-batch里的第一条信息要等待micro-batch里的最后一条信息来了之后才能一起传给下游; 而这个等待是叠加的,当stream的层数越深,每一层的micro-batch的第一条信息都需要等待最后一条信息被处理完毕,相比在每一层都毫不等待,micro-batch造成的额外latency就会叠加式的增高;
注意, Spark Structured Stream提供了一种continuous mode[11,12,13,20]来替代micro-batch,解决了latency的问题,但是目前支持的operator很少,且不能做到exact once msg processing, 这里不多加讨论了(不过将来有望做成和flink一样的模式, 毕竟也用的Chandy-Lamport Distributed Snapshot algorithm). : Note that any time you switch to continuous mode, you will get at-least-once fault-tolerance guarantees.[13]
spark的micro-batch会造成严重的latency问题, 而Dataflow和Kafka Stream的方案要求记录每一个计算结果, 则会在大大增加系统负担的同时也会有不小的latency附加; 那么有没有一种方法可以不记录所有中间计算结果, 并且也不使用micro-batch呢?
我们来看看flink的艺术;
如果我们不储存流系统中间节点的计算结果在高可用DataStore里, 也不想维护复杂的数据依赖图(需要micro-batch的根源), 那么当一个节点fail掉需要replay上游的input的时候,上游就必定需要replay自己的上游,且自己的状态要rollback到没有接收这些要replay的消息之前的状态;对上游的上游就有相同的要求,那么最终所有节点的上游最终会归向数据源节点,并要求"重新replay";总而言之2个要求:
为了方便讨论,我们定义2里所提到的global的状态为一个全局稳定点; 显然,如果我们一条消息一条消息的处理,数据源节点等待直到所有流节点处理完这条消息所产生的蝴蝶效应信息之后,才发出下一个消息B0,那么在消息B要发出但是没发出之前,所有的节点的状态就满足我们对全局稳定点的需要; 比如当我们持续处理B1,B2,B3...B100, 这时一个节点fail掉了,那么我们只要流系统的所有节点rollback他们的状态到发出B0前的"全局稳定点", 整个系统的计算和状态就会干净的回道任何节点都不曾被任何B0-B100所影响的状态, 那么此时从数据源节点replay B0, B1, B2... B100 成功, 这些消息就"exactly once process"掉了。所以,我们找到了第一个不需要micro-batch, 也不需要记录中间节点计算结果,就能实现EOMP的方法:
每n条信息, 或者每一段时间, 数据源节点(或者流系统的第一个入口节点)停止向下游发送任何信息, 直到所有节点报告说有关这条信息的所有派生信息(由于这条信息引起的第一个计算节点的计算结果会发送给它的下游, 下游的计算结果又会发送给它的下游...等等这些都是派生信息)都已经处理完毕, 此时把所有节点的状态checkpointing在高可用DataStore里, 建立一个全局稳定状态集(由流系统中每个计算节点各自的全局稳定状态所组成), 数据源才开始继续发送信息...这样, 任意的节点fail掉, 我们只要在别的机器上重启这个计算节点并download之前checkpoint的状态,流系统的所有节点也rollback到上一个全局稳定状态即可, 由于数据源发送数据的进度也属于全局稳定状态集中的一员, 所以当数据源rollback自己的状态,则可以开始replay 全局稳定点checkpoint之后才发送的信息,而此时所有节点都已经rollback到一个"从没见过这些信息和它们的派生信息"的状态了,整个系统就好像从来没有见过这些信息一样, 从而实现即使failure发生,我们的系统也可以实现EOMP;
更进一步, 我们来看如何不停住数据源的信息接收,我们所需要处理的问题;
问题1意味着我们不能用物理时间来建立全局一致状态集, 那么既然流的不同节点接收到数据源任意消息x的派生消息的时间不同, 那么只要我们能让所有节点分清哪些是x的消息和派生消息, 哪些是x之后的消息和派生消息, 所有节点就可以在处理完x的派生消息之后把本地状态复制一份储存在高可用DataStore里, 作为全局一致状态集的一员;
问题2意味着即使允许计算节点连续处理input而不必等待所有下游建立好全局一致状态才发下一个计算结果, 计算节点也不能盲目的不加考虑的处理上游信息, 我们要使得计算节点的状态变迁过程中, 至少全局一致状态是可以出现的;
Flink的解法就是由一个高可用的coordinator连续发出不同的stage barrier(比如先给所有src发1,然后1分钟后发2,2分钟后发3..... 如此增长), 夹杂在发给数据源发出的数据流里, 所有的节点都必须忠实的转发这个stage barrier, 这样所有的节点的
那么如果所有节点都遵循2个原则:
而当所有的节点都保证"自己发送的barrier-a之前的计算结果只和自己接收的barrier-a前的input集合相关", barrier-a就成了系统系统的分隔点,而所有节点遵循原则-1所建立的本地状态备份, 也绝对没有被数据源发出的在barrier-a之后的信息和它们的派生信息所影响; 而这些所有本地状态备份的全集,则组成了全局一致状态集;
一个细节, 当一个节点只有一个input channel的时候, 只要按顺序处理input信息即可; 而当一个节点有多于一个input channel的时候, 一个input channel的barrier-a已经接收到, 但是其他channel的barrier-a还没有收到怎么办呢?
由1,2就很清楚可以推理出flink的算法了:
异步checkpoint可以使得, checkpoint本身不会block流本身的计算,增量checkpoint避免了,每次一点小变动都需要checkpoint全部的state,可以节省计算机资源(比如网络压力)
flink和spark这种需要checkpoint的系统都可以做到异步增量checkpoint, 且这个技术也很成熟了, 本文只选flink的方法[35]来简单说明一下 , Spark的可以看[21]
Flink使用RocksDB 作为本地状态储存, RocksDB本质上就是一个LSM tree, 对状态的写会写在内存的memtable, 一般是一个linked hashmap, 写到一定大小就存到硬盘里变成sstable(sorted-string-table), 不再更改; 此后会开一个新的memtable来接受新的写; 这样会按历史时间来生成很多小文件, 读的时候先读memtable,如果里边有想要的key对应的value,必定是最新的,否则按历史时间顺来查sstable(sstable有自己的cache, 所以未必需要读硬盘); 对于flink来说, 当需要checkpoint的时候, 只需要把当时的memtable写在硬盘里即可, 这是唯一一个需要block住当前计算的操作, 此后也只需要把从上个checkpoint开始, 新生成的sstable异步发送到高可用的远程文件系统即可(比如S3, HDFS); 这样就做到了异步(发送到高可用datastore是异步执行的),和增量(只发送新增文件);
注意, 由于太多的小文件的sstable会造成读的性能问题, 所以RocksDB需要异步的compact这些小文件到一个大文件, 对此flink也需要做出一些应对, 详见[35], 例子给的非常清楚,这里不再赘述;
以上的讨论都是关于中间件内部如何实现EOMP, 但是由于end to end argument的影响, 中间件提供的保证再多, 没有source的支持, 它也无法区分source(流系统的event来源)发来的2个内容一样的event, 到底是"同一个"信息的重发, 还是"本意"就是想要中间件处理两次的两个"不同"event; 对sink(流系统计算结果的去处)来说,由于failure造成的重算,zombie的存在, 则需要sink能够"融入"到流系统的EOMP体系中去; 对于source的要求基本就是重发和对消息提供能区分到底是不是一个event的eventId,一般就是Kafka那样就OK, 比较简单就不多讨论了; 这里着重聊一下sink; Sink主要有两种手段来配合流系统中间件的EOMP, 幂等和2阶段提交(2PC)
最简单的来配合流系统EOMP的策略就是幂等, 由于是外部系统, 所以重用我们的"两节点EOMP模型"基本不可能, 因为基本不可能用一个tx来把要写外部系统的操作和记录已经处理过这个操作用一个原子tx来commit, 这也是流系统为什么要支持2PC的原因;
由于幂等保证对同一个计算结果写多次和写一次一样, 所以无论是什么流系统, 无论系统是重算型, 还是记录计算结果来避免重算型, 幂等的sink都可以很好的支持; 所以Dataflow/Spark/Kafka Stream都是靠幂等的sink来实现EOMP
幂等的问题在于无法应对需要重算, 且计算可以是non-deterministic的情况, 详见: 后边(Latency, 幂等和non-deterministic)一节的讨论; 这也是Spark Streaming, 使用幂等sink的Flink无法支持non-deterministic计算的本质原因;
相比之下, dataflow总是记录计算结果来避免重算(即使重算也只会有一次重算的结果会影响下游), Kafka Stream支持tx可以保证只有一次计算结果可以被commit到Kafka Stream里, 如果sink也只读committed上游kafka stream, 则可以保证即使计算是non-deterministic的, 也只会有唯一commit的计算结果被读到(其他的计算结果没有commit marker而被Kafka data comsume API忽略)从而影响sink的外部系统; 而Flink的2PC sink也做到了重算会直接导致sink的外部系统可以配合flink的global rollback, 所以只会有一次的计算结果被外部系统接受(commit);
所以Spark Stream在4个流系统里, 是唯一一个完全无法支持non-deterministic计算的流系统;
2PC对很多熟悉数据库的人来说应该是臭名昭著了, 这是很复杂和很容易造成问题而需要极力避免的东西; 但是时代在变化, 2PC在新时代也有了弥补自己问题的很多解法了,这里简单介绍一下;
2PC协议由一个coordinator,和很多参与2PC的异构系统组成,发起2PC的时候 coodinator要求所有人pre-commit,这是2PC的第一个P(phase),如果所有tx参与者都可以pre-commit并告知coordinator,则coordinator告诉所有人commit,否则告诉所有人abort,这是2PC的第二个P(phrase)
2PC最大的问题是它是一个blocking协议,blocking的点在于当coordinator和某一个2PC的参与者A挂了,其他参与者无法作出任何决定,只能等待coordinator或者死掉的那个参与者A上线,因为这时所有其他参与者都无法判断以下两种情况到底那种发生了,从而无法决定到底是commit还是abort
在情况1. 所有其他参与者都应该commit,在情况2,所有其他参与者都应该abort;由于无法辨别到底是情况1. 还是2. 所有其他参与者必须block等待,这对很多数据库来说意味着为此tx加的锁都不能放掉,从而影响数据库的其他不参与2PC的操作,甚至锁死整个数据库;而如果coordinator或者参与者A无法再上线或者状态丢失,则需要非常复杂的人工操作来解决其他参与者应该如何决策的问题;
虽然2PC有各种问题, 但是在consensus协议早已经成功分布式系统的基石, 各种开源和标准实现可以被轻松获得的今天, 用consensus协议来弥补2PC的问题已经成为一个"已经解决的问题", 如[25]4.2 The Paxos Commit Algorithm 中所说:
…We could make that fault-tolerant by simply using a consensus algorithm to choose the committed/aborted decision, letting the TM be the client that proposes the consensus value…
解决2PC问题的关键在于保持coordinator状态的高可用性, 那么只要coordinator保证把commit或者abort的决定记录在一个consensus cluster里即可,比如etcd或者zookeeper,这样coordintor死了,重启从consensus cluster里恢复状态重新告知所有参与者到底应该commit还是abort即可; 这也是为什么各种流行的分布式系统实现分布式tx都是用2PC的原因, 比如dynamoDB, Spanner, Flink, Kafka...
Flink的2PC Sink
2PC的第一个P的关键在于所有tx参与者在不知道其他参与者状态的情况下,承诺未来一定可以前进commit成功或者干净的回退abort;当前的tx参与者准备好了,且同意commit,2PC的第二个P的关键点在于整体系统的”唯一决定”统一的推进或者回退各个参与者的状态; 而Flink的global state其实可以看做一个2PC,当一个节点收到所有的上游的barrier-n时,这个“契机”可以看做收到了coordinator的可不可以precommit的问询,而当localstate已经在remote 存好之后,当前节点就可以告诉coordinator它准备好了,这可以看做回复precommit(如果此节点在发给precommit);而当所有的节点都通知coordinator“准备好了”之后,coordinator就可以记录下barrier-n的global state完整checkpoint的这个事实,这相当于一个不需要发给tx参与者的commit;这是由于当failover的时候,是由coordinator告诉所有节点应该从哪个checkpoint点来恢复本地状态,所以各个节点的localstate到底是commit了还是rollback了,可以完全由“有没有记录下barrier-n的global state完整checkpoint成功”这个metadata推算出来,所以也就不需要单独给各个节点发commit/abort信息来让各个节点commit或者abort了。
当系统状态只涉及到flink的内部状态时(flink提供的stateApi所提供的statestore), 如果一个某节点X在回复precommit之后挂了,coordinator还是可以选择commit,因为组成global state的节点X的local state已经完整的存储在remote的datastore里了;但是如果涉及到外部状态,比如sink需要把计算结果存储到一个非flink控制的数据库中去时,flink的sink节点就相当于这个外部数据库的client,需要连接外部数据库并把数据存入外部数据库;要使得外部数据库的状态和flink的状态保持一致,则需要sink把外部数据库的状态引入到flink global state的2PC里,而coordinator在决定commit或者abort的时候,必须通知sink来执行外部状态的commit或者abort,因为coordinator是不知道外部状态到底是什么,也无法简单的用通知sink从不同的globalstate点恢复来代替2PC的commit/abort通知; 同时sink收到barrier-n时,sink要保证外部数据库里与barrier-(n-1)到barrier-n之间信息相关的数据更改,处于一种“在任何情况下都一定可以commit成功,但是还没有真的commit,所以外部数据库的消费者不可见这些状态,且可以rollback的,可进可退的状态”; [40]给出了如何用文件实现的一个例子;我这里给出一个如何使用支持transaction的数据库的例子;
首先为了避免产生歧义, 我们定义:
注意1: 可以使用2PC作为sink的关键是, 你的sink可以保证在ack pre-commit之后, 保证无论任何情况都可以成功commit; 这不是说你的sink所连接的外部系统支持tx就可以的, 需要application设计者根据情况具体设计; [1]的P213页, 就描述了sink是用kafka transaction记录计算结果到kafka,但是即使用了transaction也可能丢数据的一种edge case; 而[41] Kafka 0.11 and newer=>Caveats 里也有提到;
丢失数据的原因就在于, kafka sink的默认实现:FlinkKafkaProducer011, 在precommit的时候没有真的commit数据, 因此当kafka sink fail掉没有及时重启, 一旦kafka tx超时, 所有tx里的数据都会丢失, 而此时如果coordinator已经决定commit就绝不会再重发数据(source也已经commit发出的消息的index),从而kafka sink的此次tx的所有数据永久丢失;
这里提供的DB版本的sink实现思路, 在precommit阶段就commit数据, 来保证“无论如何数据都不会丢”, 但是用app level的flag屏蔽外部可见; 这样做的原因就是为了克服类似kafka sink的这种缺陷.
注意2: 使用2PC Sink的Flink应该是可以应对non-deterministic计算的, 因为一旦failure发生, 所有之前的状态和对sink的写入都会被rollback; 但是这样的话, Flink在sink端就变成了micro-batch模型, batch大小取决于发barrier的频率; 但是即使这样, 由于只有sink需要聚集一个batch才能做一次2PC, 但是中间节点往下游发送计算结果还是即算即发的, 所以比起Spark这种所有中间计算都是micro-batch,micro-batch造成的额外latency会叠加式的增高的模型, 端到端的latency应该还是会要小一些;
利用幂等的sink可以做到实时记录计算结果, 达到最小的end to end latency; 因为sink根本不需要等待barrier, 来一条计算结果就向外部系统commit一条记录就好, 而由幂等保证了就算整个系统开始重算, 在sink端也会表现出每个source端的event只产生了一次效果的结果;
但是幂等是很难克服non-deterministic计算的; 因为non-deterministic计算使得同一个source发出的event引起千变万化的"蝴蝶效应" (比如第一次计算event生成的Key是A, 第二次重算生成的Key是B, 如果下一个节点是partitionByKey, 那么这里的2次计算结果就会发送给了完全不同的下游节点, 考虑几百次不确定计算引起的不同蝴蝶效应, 等计算结果到达各个sink节时, 计算的key和value甚至结果的个数和在sink节点的distribution都完全不同了, 那么sink也就完全无法利用幂等来屏蔽掉同一个event replay所造成的"蝴蝶效应"了)
相比之下, 如果整个流系统的计算都是确定性的, 那么无论在source端replay多少次同一个event, 它所产生的"蝴蝶效应"在sink端也必定相同, 则application设计者则可以很容易设计出幂等操作来屏蔽掉重复的计算结果;
如果业务里无法去除non-determnistic的计算, 那么你只能选择Google Dataflow, KafkaStream,或者Flink+2PCSink; 而只支持幂等的Spark和利用幂等sink的Flink无法支持non-determnistic的业务计算。