首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >带有IAsyncEnumerable的EF核心长时间流查询抛出异常

带有IAsyncEnumerable的EF核心长时间流查询抛出异常
EN

Stack Overflow用户
提问于 2022-10-17 07:41:15
回答 1查看 93关注 0票数 1

我正在使用IAsyncEnumerable将不同的大表写入CSV文件,将行从数据库流流到我的应用程序,进行一些修改,然后将它们写入CSV文件流中。

这些表包含很多行,所以查询运行了几个小时。我经常看到像这样的例外:

System.InvalidOperationException无效,试图在关闭读取器时调用CheckDataIsReady。 System.InvalidOperationException:关闭读取器时调用CheckDataIsReady的无效尝试。在Microsoft.Data.SqlClient.SqlDataReader.CheckDataIsReady(Int32 columnIndex,布尔allowPartiallyReadColumn,布尔permitAsync,String methodName在Microsoft.Data.SqlClient.SqlDataReader.GetFieldValueT at lambda_method965(闭包,QueryContext,DbDataReader,ResultContext )( Microsoft.EntityFrameworkCore.Query.Internal.SingleQueryingEnumerable1.Enumerator.MoveNext() at System.Linq.AsyncEnumerable.AsyncEnumerableAdapter1.MoveNextCore() in //Ix.NET/Source/System.Linq.Async/System/Linq/Operators/ToAsyncEnumerable.cs:line 79 at System.Linq.AsyncIteratorBase`1.MoveNextAsync() in //Ix.NET/Source/System.Linq.Async/System/Linq/AsyncIterator.cs:line 77 at System.Linq.AsyncIteratorBase`1.MoveNextAsync() in /_/Ix.NET/Source/System.Linq.Async/System/Linq/AsyncIterator.cs:line 77 )

我的代码如下所示:

代码语言:javascript
运行
复制
await context.Database
    .CreateExecutionStrategy()
    .ExecuteInTransactionAsync(async cancellationToken =>
{
    var entityResult = context.Set<TEntity>().AsNoTracking().ToAsyncEnumerable();
    var done = false;
    await using var enumerator = entityResult.GetAsyncEnumerator();
    await using var stream = new MemoryStream();
    await using var writer = new StreamWriter(stream);

    var csv = new CsvWriter(writer, CultureInfo.InvariantCulture, true);
    csv.Context.RegisterClassMap(new EntityClassMap<TEntity>());
    csv.WriteHeader<TEntity>();
    csv.NextRecord();

    while (await enumerator.MoveNextAsync()) // Cannot use foreach, because of some other stuff below
    {
        var entity = enumerator.Current;
        csv.WriteRecord(entity);
        csv.NextRecord();

        // some other stuff
    }
},
_ => Task.FromResult(true), // We are just reading, so we can always commit the transaction
System.Data.IsolationLevel.ReadUncommitted, // Do not block the whole table while reading. This is essentially the same as WITH(NOLOCK).
cancellationToken);

我认为,问题可能是小型的Network,或者是数据库忙于其他任务,所以我可能需要的是某种弹性。但是,我不能使用默认的SqlServerRetryingExecutionStrategy,因为它会缓冲内存中的所有行,这太大了(数百it )。

EN

Stack Overflow用户

发布于 2022-10-17 09:32:08

像EF这样的ORM并不适用于大型ETL作业,但是使用NOLOCK可能会导致额外的阻塞,甚至可能导致异常。

首先,NOLOCK的使用并不意味着表没有锁定,恰恰相反。NOLOCK的意思是ignore locks,实际上是获取数据库上的共享锁和表上的架构锁,所以它会导致阻塞。

这也意味着,如果表或其索引正在被修改,NOLOCK可能会被阻塞,因为这些操作也采用Schema锁。

如果您确实希望避免阻塞和阻塞,请使用快照隔离。

所有这些都在Server表提示-使用(NOLOCK)最佳实践和布伦特·奥扎尔的短文“使用NOLOCK?下面是如何获得错误的查询结果。”和““但是当我的数据没有变化的时候,NOLOCK就可以了,对吗?””中作了解释。正如布伦特·奥扎尔所强调的:

我怎么强调都不为过:对于NOLOCK,你可以:

  • 两次查看行
  • 跳过所有行
  • 请参阅从未提交的数据。
  • 如果查询失败,则会出现错误。

快速出口解决方案

最快的解决方案是使用数据库提供的工具,如bcpSSISbcp是一个独立的命令行工具,可以在Windows和Linux上运行。

如果您必须编写自己的代码,最快的选择是直接使用ADO.NET,通过IDataReader读取结果并将其写入CSV文件。这样,一次只加载一行。Github问题直接从数据中心写入文件演示了如何使用CsvHelper来完成此操作:

代码语言:javascript
运行
复制
await using (var db = new SqlConnection(Connection.ConnectionString))
await using (var cmd = db.CreateCommand())
{
    await db.OpenAsync();
    cmd.CommandText = @"
select *
from MyTable";
    cmd.CommandTimeout = 5 * 60;

    await using (var reader = await cmd.ExecuteReaderAsync())
    await using (var writer = new StreamWriter(Path.Combine(MyExtensions.TempFolder, "MyFile.csv")))
    await using (var csv = new CsvWriter(writer, CultureInfo.InvariantCulture))
    {
        var schema = await reader.GetColumnSchemaAsync();
        foreach (var columnName in schema.Select(c => c.ColumnName))
        {
            csv.WriteField(columnName);
        }

        await csv.NextRecordAsync();

        while (await reader.ReadAsync())
        {
            for (int i = 0; i < reader.FieldCount; i++)
            {
                csv.WriteField(reader[i]);
            }

            await csv.NextRecordAsync();
        }
    }
}

如果使用像脱衣舞这样的库来减少样板ADO.NET代码,则可以简化此代码:

代码语言:javascript
运行
复制
using var con=new SqlConnection(...);
var sql="select * From Sales where Date>=@date";
await using var reader=con.ExequteReader(sql,new {date=DateTime.Today.AddMonths(-2)});

await using (var writer = new StreamWriter(Path.Combine(MyExtensions.TempFolder, "MyFile.csv")))
...

Dapper将根据需要打开和关闭连接,构造一个SqlCommand,其参数与匿名参数对象@date的名称和值匹配。

票数 0
EN
查看全部 1 条回答
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/74093952

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档