长话短说
2C互联网业务增长,单机多核的共享内存模式带来的排障问题、编程困难;随着多核时代和分布式系统的到来,共享模型已经不太适合并发编程,因此actor-based模型又重新受到了人们的重视。
---------------------------调试过多线程的都懂-----------------------------
.Net TPL Dataflow组件帮助我们快速实现actor-based模型,当有多个必须异步通信的操作或要等待数据可用再进一步处理时,Dataflow组件非常有用。
TPL Dataflow是微软前几年给出的数据处理库, 内置常见的处理块,可将这些块组装成一个处理管道,"块"对应处理管道中的"阶段任务",可类比AspNetCore 中Middleware和Pipeline。
TPL Dataflow核心概念
TPL Dataflow 内置的Block覆盖了常见的应用场景,如果内置块不能满足你的要求,你也可以自定“块”。
Block可以划分为下面3类:
使用以上块混搭处理管道, 大多数的块都会执行一个操作,有些时候需要将消息分发到不同Block,这时可使用特殊类型的缓冲块给管道“”分叉”。
Execution Block
可执行的块有两个核心组件:
消息在输入和输出时能够被暂存:
当输入的消息速度比Func委托的执行速度比快,后续消息将在到达时暂存;
当下一个块的输入暂存区中无可用空间,将在当前块输出时暂存。
每个块我们可以配置:
将块链接在一起形成处理管道,生产者将消息推向管道。
TPL Dataflow有一个基于pull的机制(使用Receive和TryReceive方法),但我们将在管道中使用块连接和推送机制。
该块在需要将消息广播给多个块时很有用(管道分叉)
其他内建Block类型:BufferBlock、WriteOnceBlock、JoinBlock、BatchedJoinBlock,暂时不会深入。
管道连锁反应
当B块输入缓冲区达到上限容量,为其供货的上游A块的输出暂存区将开始被填充,当A块输出暂存区已满时,该块必须暂停处理,直到暂存区有空间,这意味着一个Block的处理瓶颈可能导致所有前面的块的暂存区被填满。
但是不是所有的块暂存区满时都会暂停,BroadcastBlock有1个消息的暂存区,每个消息都会被覆盖, 因此如果这个广播块不能及时将消息转发到下游,则在下个消息到达的时候消息将丢失,某种意义上达到一种限流效果(比较残暴).
编程实践
生产者投递消息
可使用Post或者SendAsync方法向首块投递消息:
Post、SendAsync的不同点在于SendAsync可以延迟投递(后置管道的输入buffer不空,得到异步通知后再投递)。
定义流水线管道
按照上图业务定义流水线:
public EqidPairHandler(IHttpClientFactory httpClientFactory, RedisDatabase redisCache, IConfiguration con, LogConfig logConfig, ILoggerFactory loggerFactory)
{
_httpClient = httpClientFactory.CreateClient("bce-request");
_redisDB0 = redisCache[0];
_redisDB = redisCache;
_logger = loggerFactory.CreateLogger(nameof(EqidPairHandler));
var option = new DataflowLinkOptions { PropagateCompletion = true };
publisher = _redisDB.RedisConnection.GetSubscriber();
_eqid2ModelTransformBlock = new TransformBlock<EqidPair, EqidModel>
(
// redis piublih 没有做在TransformBlock fun里面, 因为publih失败可能影响后续的block传递
eqidPair => EqidResolverAsync(eqidPair),
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = con.GetValue<int>("MaxDegreeOfParallelism")
}
);
// https://docs.microsoft.com/en-us/dotnet/standard/parallel-programming/walkthrough-creating-a-dataflow-pipeline
_logBatchBlock = new LogBatchBlock<EqidModel>(logConfig, loggerFactory);
_logPublishBlock = new ActionBlock<EqidModel>(x => PublishAsync(x) );
_broadcastBlock = new BroadcastBlock<EqidModel>(x => x); // 由只容纳一个消息的缓存区和拷贝函数组成
_broadcastBlock.LinkTo(_logBatchBlock.InputBlock, option);
_broadcastBlock.LinkTo(_logPublishBlock, option);
_eqid2ModelTransformBlock.LinkTo(_broadcastBlock, option);
}
仿IIS日志写入组件
异常处理
上述程序在生产部署时遇到相关的坑位:
在测试环境_eqid2ModelTransformBlock块委托函数稳定执行,程序并未出现异样; 部署到生产之后,该Pipeline运行一段时间就停止工作,一直很困惑。
后来通过监测_eqid2ModelTransformBlock.Completion属性,发现该块在执行某次委托时报错,提前进入完成态。
当TPL Dataflow不再处理消息且保证不再处理消息的时候,就被定义为 "完成态", IDataflow.Completion属性(Task对象)标记该状态,Task对象的TaskStatus枚举值描述此Block进入完成态的真实原因。
官方资料表明:某块进入Fault、Cancel状态,都会导致该块提前进入“完成态”,但因Fault、Canceled进入的“完成态”会导致输入暂存区和输出暂存区被清空。 After Fault has been called on a dataflow block, that block will complete, and its Completion task will enter a final state. Faulting a block, as with canceling a block, causes buffered messages (unprocessed input messages as well as unoffered output messages) to be lost.
故需要严肃对待异常,一般情况下我们使用try、catch包含所有的执行代码以确保所有的异常都被处理。
本文作为TPL Dataflow的入门指南(代码较多建议左下角转向原文)
微软技术栈的可持续关注actor-based模型的流水线处理组件,应对单体程序中高并发,低延迟相当巴适。
+ https://docs.microsoft.com/en-us/dotnet/api/system.threading.tasks.dataflow.broadcastblock-1?view=netcore-3.1
+ https://docs.microsoft.com/en-us/dotnet/api/system.threading.tasks.dataflow.idataflowblock.fault?redirectedfrom=MSDN&view=netcore-2.2#System_Threading_Tasks_Dataflow_IDataflowBlock_Fault_System_Exception_