我有一个服务器应用程序(C#和.Net 5),它公开一个gRPC双向端点。此端点接受二进制流,其中服务器分析并生成发送回gRPC响应流的响应。
通过gRPC发送的每个文件都是几兆字节,gRPC调用完成流(没有延迟)只需几分钟。由于延迟,这一次有时会增加50%。
在客户机上,我有两个任务(Task.Run)正在运行,一个使用FileStream从客户端的文件系统流文件,另一个从服务器(gRPC)读取响应。
在服务器上,我还运行了两个任务,一个是从gRPC请求流读取消息并将它们推入队列(DataFlow.BufferBlock<byte[]>),另一个任务是处理队列中的消息,以及编写对gRPC的响应。
问题:
如果我禁用(注释掉)所有服务器处理代码,并简单地从gRPC读取和记录消息,那么客户机到服务器的延迟几乎为0。
当服务器已启用处理时,客户端在写入grpcClient时会看到延迟。
只要使用10个活动的并行会话(gRPC调用),这些延迟就可以提高到10-15秒。
PS: --只有在多个客户端运行时才会发生这种情况,并发客户机数量越多意味着延迟越长。
客户端代码看起来有点像下面的代码:
FileStream fs = new(audioFilePath, FileMode.Open, FileAccess.Read, FileShare.Read, 1024 * 1024, true);
byte[] buffer = new byte[10_000];
GrpcClient client = new GrpcClient(_singletonChannel); // using single channel since only 5-10 clients are there right now
BiDiCall call = client.BiDiService(hheaders: null, deadline: null, CancellationToken.None);
var writeTask = Task.Run(async () => {
while (fs.ReadAsync(buffer, 0, buffer.Length))
{
call.RequestStream.WriteAsync(new() { Chunk = ByteString.CopyFrom(buffer) });
}
await call.RequestStream.CompleteAsync();
});
var readTask = Task.Run(async () => {
while (await call.ResponseStream.MoveNext())
{
// write to log call.ResponseStream.Current
}
});
await Task.WhenAll(writeTask, readTask);
await call;服务器代码如下所示:
readonly BufferBlock<MessageRequest> messages = new();
MessageProcessor _processor = new();
public override async Task BiDiService(IAsyncStreamReader<MessageRequest> requestStream,
IServerStreamWriter<MessageResponse> responseStream,
ServerCallContext context)
{
var readTask = TaskFactory.StartNew(() => {
while (await requestStream.MoveNext())
{
messages.Post(requestStream.Current); // add to queue
}
messages.Complete();
}, TaskCreationOptions.LongRunning).ConfigureAwait(false);
var processTask = Task.Run(() => {
while (await messages.OutputAvailableAsync())
{
var message = await messages.ReceiveAsync(); // pick from queue
// if I comment out below line and run with multiple clients = latency disappears
var result = await _processor.Process(message); // takes some time to process
if (result.IsImportantForClient())
await responseStrem.WriteAsync(result.Value);
}
});
await Task.WhenAll(readTask, processTask);
}发布于 2021-08-30 06:33:33
因此,事实证明,这个问题是由于ThreadPool产生的工作线程数量的延迟造成的。
ThreadPool需要更多的时间来生成线程来处理这些任务,导致gRPC读取出现明显的滞后。
这是修正后,增加minThread计数的产卵请求使用ThreadPool.SetMinThreads。MSDN参考
发布于 2021-08-02 02:02:18
对于SO的最初问题,有许多很有希望的评论,但想要解释一下我认为重要的是什么:
我建议尝试删除异步处理,看看不同的同步/异步混合可能对您的特定场景有什么样的好处。需要确保记住的一件事是,异步/等待逻辑块以牺牲自动线程管理为代价--这对于单路径ish I/O绑定处理(如ex)是很好的。在进入下一个执行步骤之前,需要调用db/webservice ),并且随着您转向计算机绑定处理(需要显式重新连接的执行-异步/等待隐式地处理任务重新连接),这可能不太有益。
https://stackoverflow.com/questions/68498455
复制相似问题