首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >使用IAsyncEnumerable与Dapper

使用IAsyncEnumerable与Dapper
EN

Stack Overflow用户
提问于 2020-01-28 20:35:06
回答 2查看 6.3K关注 0票数 18

我们最近迁移了我们的ASP.NET Core,它使用Dapper到.NET Core3.1。迁移之后,我们觉得有机会使用来自C# 8的最新的C# 8特性作为我们的端点之一。

以下是更改之前的伪代码:

代码语言:javascript
运行
复制
public async Task<IEnumerable<Item>> GetItems(int id)
{
    var reader = await _connection.QueryMultipleAsync(getItemsSql,
       param: new
       {
           Id = id
       });

    var idFromDb = (await reader.ReadAsync<int?>().ConfigureAwait(false)).SingleOrDefault();
    if (idFromDb == null)
    {
       return null;
    }

    var items = await reader.ReadAsync<Item>(buffered: false).ConfigureAwait(false);

    return Stream(reader, items);
} 

private IEnumerable<Item> Stream(SqlMapper.GridReader reader, IEnumerable<Item> items)
{
    using (reader)
    {
        foreach (var item in items)
        {
            yield return item;
        }
    }     
}

IAsyncEnumerable代码更改后:

代码语言:javascript
运行
复制
// Import Nuget pacakage: System.Linq.Async

public async Task<IAsyncEnumerable<Item>> GetItems(int id)
{
    var reader = await _connection.QueryMultipleAsync(getItemsSql,
       param: new
       {
           Id = id
       });

    var idFromDb = (await reader.ReadAsync<int?>().ConfigureAwait(false)).SingleOrDefault();
    if (idFromDb == null)
    {
        return null;
    }

    var items = await reader.ReadAsync<Item>(buffered: false).ConfigureAwait(false);

    return Stream(reader, items);
} 

private IAsyncEnumerable<Item> Stream(SqlMapper.GridReader reader, IEnumerable<Item> items)
{
    using (reader)
    {
       await foreach (var item in items.ToAsyncEnumerable())
       {
           yield return item;
       }
    }
 }

上面的方法是使用ToAsyncEnumerable是从这个职位松散的灵感,但我不能100%确定我是否在正确的地方/上下文中使用它。

问题:

  • dapper库只返回IEnumerable,但是我们可以像上面那样使用ToAsyncEnumerable将其转换为async streamIAsyncEnumerable吗?

Note:这个问题看起来类似于如果与异步/等待一起使用(使用Dapper从Server传输数据),返回IEnumerable会发生什么?,但我不认为这回答了我的问题。

EN

回答 2

Stack Overflow用户

发布于 2021-03-20 15:52:57

更新:我第一次写这个答案时并不知道异步迭代器。感谢西奥多·祖利亚斯的指点。有鉴于此,可以采取一种简单得多的办法:

代码语言:javascript
运行
复制
using var reader = await connection.ExecuteReaderAsync(query, parameters);
var rowParser = reader.GetRowParser<T>();

// Consider using reader.NextResultAsync(). Follow github issue for details:

while (await reader.ReadAsync()) {
    yield return rowParser(reader);
}

参考文献:https://github.com/DapperLib/Dapper/issues/1239#issuecomment-1035507322

原始答案:

下面是我编写的一个IAsyncEnumerable包装器,它可以帮助那些想要使用异步/等待来流非缓冲数据的人,也可以使用Dapper类型映射的强大功能:

代码语言:javascript
运行
复制
public class ReaderParser<T> : IAsyncEnumerable<T> {
    public ReaderParser(SqlDataReader reader) {
        Reader = reader;
    }
    private SqlDataReader Reader { get; }
    public IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default) {
        return new ReaderParserEnumerator<T>(Reader);
    }
}
public class ReaderParserEnumerator<T> : IAsyncEnumerator<T> {
    public ReaderParserEnumerator(SqlDataReader reader) {
        Reader = reader;
        RowParser = reader.GetRowParser<T>();
    }
    public T Current => Reader.FieldCount == 0 ? default(T) : RowParser(Reader);
    private SqlDataReader Reader { get; }
    private Func<IDataReader, T> RowParser { get; }
    public async ValueTask DisposeAsync() {
        await Reader.DisposeAsync();
    }
    public async ValueTask<bool> MoveNextAsync() {
        return await Reader.ReadAsync();
    }
}

用法:

代码语言:javascript
运行
复制
var reader = await command.ExecuteReaderAsync();
return new ReaderParser<T>(reader);

然后,package System.Linq.Async基本上添加了您所知道和喜爱的所有漂亮的IEnumerable扩展,例如在我的用法中:

代码语言:javascript
运行
复制
var streamData = await repo.GetDataStream();
var buffer = await streamData.Take(BATCH_SIZE).ToListAsync();
票数 15
EN

Stack Overflow用户

发布于 2022-11-23 17:32:07

戴夫的回答提供了所需的一切,但对我来说不够直截了当。

因此,下面是一种用Dapper获取IAsyncEnumerable的扩展方法:

代码语言:javascript
运行
复制
/// <summary>
/// Asynchronously enumerates the results of a query.
/// </summary>
/// <typeparam name="T">The type of result to return.</typeparam>
/// <param name="cnn">The connection to query on.</param>
/// <param name="sql">The SQL to execute for the query.</param>
/// <param name="param">The parameters to pass, if any.</param>
/// <param name="transaction">The transaction to use, if any.</param>
/// <returns>An asynchronous enumerator of the results.</returns>
/// <remarks>See <see href="https://stackoverflow.com/a/66723553/1178314"/> and
/// <see href="https://github.com/DapperLib/Dapper/issues/1239#issuecomment-1035507322"/>.</remarks>
public static async IAsyncEnumerable<T> EnumerateAsync<T>(this DbConnection cnn, string sql, object param = null, IDbTransaction transaction = null)
{
    var reader = await cnn.ExecuteReaderAsync(sql, param, transaction).ConfigureAwait(false);
    var rowParser = reader.GetRowParser<T>();
    while (await reader.ReadAsync().ConfigureAwait(false))
    {
        yield return rowParser(reader);
    }
    while (await reader.NextResultAsync().ConfigureAwait(false)) { }
}

注意,您必须使用DbConnection,而不是IDbConnection。否则,reader.ReadAsync将不可用。

那么在您的代码中,您可以这样做:

代码语言:javascript
运行
复制
var asyncEnumerable = connection.EnumerateAsync<YourDto>("select ...", yourParameters);
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/59956623

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档