我们最近迁移了我们的ASP.NET Core,它使用Dapper
到.NET Core3.1。迁移之后,我们觉得有机会使用来自C# 8
的最新的C# 8
特性作为我们的端点之一。
以下是更改之前的伪代码:
public async Task<IEnumerable<Item>> GetItems(int id)
{
var reader = await _connection.QueryMultipleAsync(getItemsSql,
param: new
{
Id = id
});
var idFromDb = (await reader.ReadAsync<int?>().ConfigureAwait(false)).SingleOrDefault();
if (idFromDb == null)
{
return null;
}
var items = await reader.ReadAsync<Item>(buffered: false).ConfigureAwait(false);
return Stream(reader, items);
}
private IEnumerable<Item> Stream(SqlMapper.GridReader reader, IEnumerable<Item> items)
{
using (reader)
{
foreach (var item in items)
{
yield return item;
}
}
}
在IAsyncEnumerable
代码更改后:
// Import Nuget pacakage: System.Linq.Async
public async Task<IAsyncEnumerable<Item>> GetItems(int id)
{
var reader = await _connection.QueryMultipleAsync(getItemsSql,
param: new
{
Id = id
});
var idFromDb = (await reader.ReadAsync<int?>().ConfigureAwait(false)).SingleOrDefault();
if (idFromDb == null)
{
return null;
}
var items = await reader.ReadAsync<Item>(buffered: false).ConfigureAwait(false);
return Stream(reader, items);
}
private IAsyncEnumerable<Item> Stream(SqlMapper.GridReader reader, IEnumerable<Item> items)
{
using (reader)
{
await foreach (var item in items.ToAsyncEnumerable())
{
yield return item;
}
}
}
上面的方法是使用ToAsyncEnumerable
是从这个职位松散的灵感,但我不能100%确定我是否在正确的地方/上下文中使用它。
问题:
IEnumerable
,但是我们可以像上面那样使用ToAsyncEnumerable
将其转换为async
stream
的IAsyncEnumerable
吗?Note:这个问题看起来类似于如果与异步/等待一起使用(使用Dapper从Server传输数据),返回IEnumerable会发生什么?,但我不认为这回答了我的问题。
发布于 2021-03-20 15:52:57
更新:我第一次写这个答案时并不知道异步迭代器。感谢西奥多·祖利亚斯的指点。有鉴于此,可以采取一种简单得多的办法:
using var reader = await connection.ExecuteReaderAsync(query, parameters);
var rowParser = reader.GetRowParser<T>();
// Consider using reader.NextResultAsync(). Follow github issue for details:
while (await reader.ReadAsync()) {
yield return rowParser(reader);
}
参考文献:https://github.com/DapperLib/Dapper/issues/1239#issuecomment-1035507322
原始答案:
下面是我编写的一个IAsyncEnumerable
包装器,它可以帮助那些想要使用异步/等待来流非缓冲数据的人,也可以使用Dapper类型映射的强大功能:
public class ReaderParser<T> : IAsyncEnumerable<T> {
public ReaderParser(SqlDataReader reader) {
Reader = reader;
}
private SqlDataReader Reader { get; }
public IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default) {
return new ReaderParserEnumerator<T>(Reader);
}
}
public class ReaderParserEnumerator<T> : IAsyncEnumerator<T> {
public ReaderParserEnumerator(SqlDataReader reader) {
Reader = reader;
RowParser = reader.GetRowParser<T>();
}
public T Current => Reader.FieldCount == 0 ? default(T) : RowParser(Reader);
private SqlDataReader Reader { get; }
private Func<IDataReader, T> RowParser { get; }
public async ValueTask DisposeAsync() {
await Reader.DisposeAsync();
}
public async ValueTask<bool> MoveNextAsync() {
return await Reader.ReadAsync();
}
}
用法:
var reader = await command.ExecuteReaderAsync();
return new ReaderParser<T>(reader);
然后,package System.Linq.Async
基本上添加了您所知道和喜爱的所有漂亮的IEnumerable
扩展,例如在我的用法中:
var streamData = await repo.GetDataStream();
var buffer = await streamData.Take(BATCH_SIZE).ToListAsync();
发布于 2022-11-23 17:32:07
戴夫的回答提供了所需的一切,但对我来说不够直截了当。
因此,下面是一种用Dapper获取IAsyncEnumerable
的扩展方法:
/// <summary>
/// Asynchronously enumerates the results of a query.
/// </summary>
/// <typeparam name="T">The type of result to return.</typeparam>
/// <param name="cnn">The connection to query on.</param>
/// <param name="sql">The SQL to execute for the query.</param>
/// <param name="param">The parameters to pass, if any.</param>
/// <param name="transaction">The transaction to use, if any.</param>
/// <returns>An asynchronous enumerator of the results.</returns>
/// <remarks>See <see href="https://stackoverflow.com/a/66723553/1178314"/> and
/// <see href="https://github.com/DapperLib/Dapper/issues/1239#issuecomment-1035507322"/>.</remarks>
public static async IAsyncEnumerable<T> EnumerateAsync<T>(this DbConnection cnn, string sql, object param = null, IDbTransaction transaction = null)
{
var reader = await cnn.ExecuteReaderAsync(sql, param, transaction).ConfigureAwait(false);
var rowParser = reader.GetRowParser<T>();
while (await reader.ReadAsync().ConfigureAwait(false))
{
yield return rowParser(reader);
}
while (await reader.NextResultAsync().ConfigureAwait(false)) { }
}
注意,您必须使用DbConnection
,而不是IDbConnection
。否则,reader.ReadAsync
将不可用。
那么在您的代码中,您可以这样做:
var asyncEnumerable = connection.EnumerateAsync<YourDto>("select ...", yourParameters);
https://stackoverflow.com/questions/59956623
复制相似问题