我有代码,它将数据从SQL向下流并将其写入不同的存储区。守则大致如下:
using (var cmd = new SqlCommand("select * from MyTable", connection))
{
using (var reader = await cmd.ExecuteReaderAsync())
{
var list = new List<MyData>();
while (await reader.ReadAsync())
{
var row = GetRow(reader);
list.Add(row);
if (list.Count == BatchSize)
{
await WriteDataAsync(list);
list.Clear();
}
}
if (list.Count > 0)
{
await WriteDataAsync(list);
}
}
}为此,我想使用反应性扩展。理想情况下,代码如下所示:
await StreamDataFromSql()
.Buffer(BatchSize)
.ForEachAsync(async batch => await WriteDataAsync(batch));但是,扩展方法ForEachAsync似乎只接受同步操作。是否可以编写一个可以接受异步操作的扩展?
发布于 2020-11-20 20:42:47
下面是支持异步操作的ForEachAsync方法的一个版本。它将可观察到的源投影到包含异步操作的嵌套IObservable<IObservable<Unit>>中,然后使用Merge运算符将其还原回IObservable<Unit>。最终,可观察到的结果被转换为任务。
默认情况下,这些操作是按顺序调用的,但是可以通过配置可选的maximumConcurrency参数来并发调用它们。
取消可选的cancellationToken参数将导致返回的Task立即完成(取消),可能在取消当前运行的操作之前。
任何可能发生的异常都会通过Task传播,并导致取消所有当前正在运行的操作。
/// <summary>
/// Invokes an asynchronous action for each element in the observable sequence,
/// and returns a 'Task' that represents the completion of the sequence and
/// all the asynchronous actions.
/// </summary>
public static Task ForEachAsync<TSource>(
this IObservable<TSource> source,
Func<TSource, CancellationToken, Task> action,
CancellationToken cancellationToken = default,
int maximumConcurrency = 1)
{
// Arguments validation omitted
return source
.Select(item => Observable.FromAsync(ct => action(item, ct)))
.Merge(maximumConcurrency)
.DefaultIfEmpty()
.ToTask(cancellationToken);
}用法示例:
await StreamDataFromSql()
.Buffer(BatchSize)
.ForEachAsync(async (batch, token) => await WriteDataAsync(batch, token));https://stackoverflow.com/questions/45382799
复制相似问题