BufferBlock是C#中的一个数据流块(Dataflow Block),它提供了一个有界或无界的缓冲区,用于存储数据。类似于BlockingCollection,你可以使用Post
方法往BufferBlock中添加数据,也可以通过Receive
方法阻塞或异步地读取数据。在高性能方面,BufferBlock是C#中一种常用的选择。
Post
方法可以将数据放入缓冲区,而 ReceiveAsync
方法用于异步读取缓冲区中的数据。这确保了线程安全的数据处理。CancellationToken
进行 取消操作。这意味着可以在等待数据的过程中取消异步操作,使得程序更加灵活。Post
方法添加的数据。这个缓冲区可以是有界的(有限数量的元素)或无界的(可以无限增长)。ReceiveAsync
方法时,如果缓冲区中有数据,该方法会立即返回一个包含缓冲区中的数据的Task。如果缓冲区为空,ReceiveAsync
方法会等待,直到有数据可用为止。Post
方法用于将数据放入缓冲区,而ReceiveAsync
方法用于从缓冲区中异步读取数据。这种生产者-消费者模型确保了数据的同步访问,避免了多线程访问缓冲区时可能发生的竞态条件。数据流是一种用于处理异步和并发编程的机制。数据流提供了一种有效的方式来协调多个任务之间的数据交换。在C#中,有一种称为TPL(任务并行库)的机制,它包括了数据流组件,用于处理并发数据操作。
以下是关于C#数据流的主要概念:
思路引导
在日常的复杂应用场景中,会有可能遇到某些事件会在短时间内重复触发或者短时间内有大量的请求这个这个时候就可以使BufferBlock限流能力,在短时间内限制触发频率达到限流的效果,在这种情况下可以考虑使用BufferBlock。
如何实现限流?
BufferBlock
的容量被设置为2,即同时只能处理两个请求。当超过容量时,新的请求将被阻塞,直到有处理完成的请求释放出空间。请根据实际需求调整BoundedCapacity
的值,以满足系统的处理能力。
using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
class Program
{
static async Task Main(string[] args)
{
var bufferBlock = new BufferBlock<int>(new DataflowBlockOptions
{
BoundedCapacity = 2 // 设置限制的容量
});
// 模拟多个请求
for (int i = 1; i <= 5; i++)
{
var requestNumber = i;
await Task.Delay(100); // 模拟请求的处理时间
// 尝试发送请求,如果超过限流容量,将会被阻塞
await bufferBlock.SendAsync(requestNumber);
Console.WriteLine($"Request {requestNumber} sent at {DateTime.Now}");
// 异步处理请求
_ = ProcessRequest(bufferBlock, requestNumber);
}
Console.WriteLine("Press any key to exit...");
Console.ReadKey();
}
static async Task ProcessRequest(BufferBlock<int> bufferBlock, int requestNumber)
{
// 等待接收请求
var receivedRequest = await bufferBlock.ReceiveAsync();
Console.WriteLine($"Request {receivedRequest} processed at {DateTime.Now}");
}
}
基础使用示例
using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
class Program
{
static async Task Main(string[] args)
{
// 创建BufferBlock,存储整数类型数据
var bufferBlock = new BufferBlock<int>();
// 生产者任务,往BufferBlock写入数据
var producer = Task.Run(async () =>
{
for (int i = 0; i < 10; i++)
{
await Task.Delay(TimeSpan.FromSeconds(1)); // 模拟耗时操作
bufferBlock.Post(i); // 写入数据到BufferBlock
Console.WriteLine($"Produced: {i}");
}
bufferBlock.Complete(); // 数据生产完成,标记BufferBlock为完成状态
});
// 消费者任务,从BufferBlock读取数据
var consumer = Task.Run(async () =>
{
while (await bufferBlock.OutputAvailableAsync())
{
var data = bufferBlock.Receive(); // 从BufferBlock读取数据
Console.WriteLine($"Consumed: {data}");
}
});
// 等待生产者和消费者任务完成
await Task.WhenAll(producer, consumer);
}
}