首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >gRPC间歇性高延迟

gRPC间歇性高延迟
EN

Stack Overflow用户
提问于 2021-07-23 11:29:06
回答 2查看 1.1K关注 0票数 3

我有一个服务器应用程序(C#和.Net 5),它公开一个gRPC双向端点。此端点接受二进制流,其中服务器分析并生成发送回gRPC响应流的响应。

通过gRPC发送的每个文件都是几兆字节,gRPC调用完成流(没有延迟)只需几分钟。由于延迟,这一次有时会增加50%。

在客户机上,我有两个任务(Task.Run)正在运行,一个使用FileStream从客户端的文件系统流文件,另一个从服务器(gRPC)读取响应。

在服务器上,我还运行了两个任务,一个是从gRPC请求流读取消息并将它们推入队列(DataFlow.BufferBlock<byte[]>),另一个任务是处理队列中的消息,以及编写对gRPC的响应。

问题:

如果我禁用(注释掉)所有服务器处理代码,并简单地从gRPC读取和记录消息,那么客户机到服务器的延迟几乎为0。

当服务器已启用处理时,客户端在写入grpcClient时会看到延迟。

只要使用10个活动的并行会话(gRPC调用),这些延迟就可以提高到10-15秒。

PS: --只有在多个客户端运行时才会发生这种情况,并发客户机数量越多意味着延迟越长。

客户端代码看起来有点像下面的代码:

代码语言:javascript
复制
FileStream fs = new(audioFilePath, FileMode.Open, FileAccess.Read, FileShare.Read, 1024 * 1024, true);

byte[] buffer = new byte[10_000];

GrpcClient client = new GrpcClient(_singletonChannel); // using single channel since only 5-10 clients are there right now
BiDiCall call = client.BiDiService(hheaders: null, deadline: null, CancellationToken.None);

var writeTask = Task.Run(async () => {
    while (fs.ReadAsync(buffer, 0, buffer.Length))
    {
        call.RequestStream.WriteAsync(new() { Chunk = ByteString.CopyFrom(buffer) });
    }
    await call.RequestStream.CompleteAsync();
});

var readTask = Task.Run(async () => {
    while (await call.ResponseStream.MoveNext())
    {
        // write to log call.ResponseStream.Current
    }
});

await Task.WhenAll(writeTask, readTask);
await call;

服务器代码如下所示:

代码语言:javascript
复制
readonly BufferBlock<MessageRequest> messages = new();
MessageProcessor _processor = new();

public override async Task BiDiService(IAsyncStreamReader<MessageRequest> requestStream,
    IServerStreamWriter<MessageResponse> responseStream, 
    ServerCallContext context)
{
    var readTask = TaskFactory.StartNew(() => {
        while (await requestStream.MoveNext())
        {
            messages.Post(requestStream.Current);  // add to queue
        }
        messages.Complete();
    }, TaskCreationOptions.LongRunning).ConfigureAwait(false);

    var processTask = Task.Run(() => {
        while (await messages.OutputAvailableAsync())
        {
            var message = await messages.ReceiveAsync();  // pick from queue
            // if I comment out below line and run with multiple clients = latency disappears
            var result = await _processor.Process(message); // takes some time to process
            if (result.IsImportantForClient())
                await responseStrem.WriteAsync(result.Value);
        }
    });

    await Task.WhenAll(readTask, processTask);
}
EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2021-08-30 06:33:33

因此,事实证明,这个问题是由于ThreadPool产生的工作线程数量的延迟造成的。

ThreadPool需要更多的时间来生成线程来处理这些任务,导致gRPC读取出现明显的滞后。

这是修正后,增加minThread计数的产卵请求使用ThreadPool.SetMinThreadsMSDN参考

票数 2
EN

Stack Overflow用户

发布于 2021-08-02 02:02:18

对于SO的最初问题,有许多很有希望的评论,但想要解释一下我认为重要的是什么:

  1. 调用2的外部异步方法。
  2. Task.Run()s-使用TaskCreationOptions.LongRunning选项包装异步循环,最后是一个
  3. 返回一个Task.WhenAll()重新加入这两个任务..。Alois Kraus提供了OS任务调度程序是一个OS,它的调度可以抽象出您认为更有效的东西--这很可能是真的,如果是这样的话。

我建议尝试删除异步处理,看看不同的同步/异步混合可能对您的特定场景有什么样的好处。需要确保记住的一件事是,异步/等待逻辑块以牺牲自动线程管理为代价--这对于单路径ish I/O绑定处理(如ex)是很好的。在进入下一个执行步骤之前,需要调用db/webservice ),并且随着您转向计算机绑定处理(需要显式重新连接的执行-异步/等待隐式地处理任务重新连接),这可能不太有益。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/68498455

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档