首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >如何在异步操作中使用Rx.Nex扩展ForEachAsync

如何在异步操作中使用Rx.Nex扩展ForEachAsync
EN

Stack Overflow用户
提问于 2017-07-28 21:39:59
回答 4查看 1.8K关注 0票数 6

我有代码,它将数据从SQL向下流并将其写入不同的存储区。守则大致如下:

代码语言:javascript
运行
复制
using (var cmd = new SqlCommand("select * from MyTable", connection))
{
     using (var reader = await cmd.ExecuteReaderAsync())
     {
         var list = new List<MyData>();
         while (await reader.ReadAsync())
         {
             var row = GetRow(reader);
             list.Add(row);
             if (list.Count == BatchSize)
             {
                 await WriteDataAsync(list);
                 list.Clear();
             }
         }
         if (list.Count > 0)
         {
             await WriteDataAsync(list);
         }
     }
 }

为此,我想使用反应性扩展。理想情况下,代码如下所示:

代码语言:javascript
运行
复制
await StreamDataFromSql()
    .Buffer(BatchSize)
    .ForEachAsync(async batch => await WriteDataAsync(batch));

但是,扩展方法ForEachAsync似乎只接受同步操作。是否可以编写一个可以接受异步操作的扩展?

EN

回答 4

Stack Overflow用户

回答已采纳

发布于 2017-07-29 01:29:24

是否可以编写一个可以接受异步操作的扩展?

不是直接的。

Rx订阅必然是同步的,因为Rx是一个基于推送的系统.当数据项到达时,它会遍历您的查询,直到到达最终订阅--在本例中,这是执行一个Action

Rx提供的await-able方法是序列本身的await--也就是说,ForEachAsync在序列方面是异步的(您正在异步地等待序列完成),但是ForEachAsync中的订阅(对每个元素采取的操作)仍然必须是同步的。

为了在数据管道中完成同步到异步的转换,您需要一个缓冲区。在异步使用者检索和处理项目时,Rx订阅可以(同步)作为生产者添加到缓冲区中。因此,您需要一个支持同步和异步操作的生产者/使用者队列。

TPL数据流中的各种块类型可以满足这一需求。这样的东西就够了:

代码语言:javascript
运行
复制
var obs = StreamDataFromSql().Buffer(BatchSize);
var buffer = new ActionBlock<IList<T>>(batch => WriteDataAsync(batch));
using (var subscription = obs.Subscribe(buffer.AsObserver()))
  await buffer.Completion;

请注意,没有背压;只要StreamDataFromSql能够推送数据,它将被缓冲并存储在ActionBlock的传入队列中。根据数据的大小和类型,这可以快速地使用大量内存。

票数 5
EN

Stack Overflow用户

发布于 2017-07-29 01:20:03

正确的做法是正确地使用反应性扩展来完成这一任务--所以从创建连接开始直到编写数据为止。

下面是操作步骤:

代码语言:javascript
运行
复制
IObservable<IList<MyData>> query =
    Observable
        .Using(() => new SqlConnection(""), connection =>
            Observable
                .Using(() => new SqlCommand("select * from MyTable", connection), cmd =>
                    Observable
                        .Using(() => cmd.ExecuteReader(), reader =>
                            Observable
                                .While(() => reader.Read(), Observable.Return(GetRow(reader))))))
        .Buffer(BatchSize);

IDisposable subscription =
    query
        .Subscribe(async list => await WriteDataAsync(list));

我无法测试代码,但它应该能工作。这段代码假设WriteDataAsync也可以接受IList<MyData>。如果它不只是掉进一个.ToList()

票数 0
EN

Stack Overflow用户

发布于 2020-11-20 20:42:47

下面是支持异步操作的ForEachAsync方法的一个版本。它将可观察到的源投影到包含异步操作的嵌套IObservable<IObservable<Unit>>中,然后使用Merge运算符将其还原回IObservable<Unit>。最终,可观察到的结果被转换为任务。

默认情况下,这些操作是按顺序调用的,但是可以通过配置可选的maximumConcurrency参数来并发调用它们。

取消可选的cancellationToken参数将导致返回的Task立即完成(取消),可能在取消当前运行的操作之前。

任何可能发生的异常都会通过Task传播,并导致取消所有当前正在运行的操作。

代码语言:javascript
运行
复制
/// <summary>
/// Invokes an asynchronous action for each element in the observable sequence,
/// and returns a 'Task' that represents the completion of the sequence and
/// all the asynchronous actions.
/// </summary>
public static Task ForEachAsync<TSource>(
    this IObservable<TSource> source,
    Func<TSource, CancellationToken, Task> action,
    CancellationToken cancellationToken = default,
    int maximumConcurrency = 1)
{
    // Arguments validation omitted
    return source
        .Select(item => Observable.FromAsync(ct => action(item, ct)))
        .Merge(maximumConcurrency)
        .DefaultIfEmpty()
        .ToTask(cancellationToken);
}

用法示例:

代码语言:javascript
运行
复制
await StreamDataFromSql()
    .Buffer(BatchSize)
    .ForEachAsync(async (batch, token) => await WriteDataAsync(batch, token));
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/45382799

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档