我正在使用IAsyncEnumerable
将不同的大表写入CSV文件,将行从数据库流流到我的应用程序,进行一些修改,然后将它们写入CSV文件流中。
这些表包含很多行,所以查询运行了几个小时。我经常看到像这样的例外:
System.InvalidOperationException无效,试图在关闭读取器时调用CheckDataIsReady。 System.InvalidOperationException:关闭读取器时调用CheckDataIsReady的无效尝试。在Microsoft.Data.SqlClient.SqlDataReader.CheckDataIsReady(Int32 columnIndex,布尔allowPartiallyReadColumn,布尔permitAsync,String methodName在Microsoft.Data.SqlClient.SqlDataReader.GetFieldValueT at lambda_method965(闭包,QueryContext,DbDataReader,ResultContext )( Microsoft.EntityFrameworkCore.Query.Internal.SingleQueryingEnumerable
1.Enumerator.MoveNext() at System.Linq.AsyncEnumerable.AsyncEnumerableAdapter
1.MoveNextCore() in //Ix.NET/Source/System.Linq.Async/System/Linq/Operators/ToAsyncEnumerable.cs:line 79 at System.Linq.AsyncIteratorBase`1.MoveNextAsync() in //Ix.NET/Source/System.Linq.Async/System/Linq/AsyncIterator.cs:line 77 at System.Linq.AsyncIteratorBase`1.MoveNextAsync() in /_/Ix.NET/Source/System.Linq.Async/System/Linq/AsyncIterator.cs:line 77 )
我的代码如下所示:
await context.Database
.CreateExecutionStrategy()
.ExecuteInTransactionAsync(async cancellationToken =>
{
var entityResult = context.Set<TEntity>().AsNoTracking().ToAsyncEnumerable();
var done = false;
await using var enumerator = entityResult.GetAsyncEnumerator();
await using var stream = new MemoryStream();
await using var writer = new StreamWriter(stream);
var csv = new CsvWriter(writer, CultureInfo.InvariantCulture, true);
csv.Context.RegisterClassMap(new EntityClassMap<TEntity>());
csv.WriteHeader<TEntity>();
csv.NextRecord();
while (await enumerator.MoveNextAsync()) // Cannot use foreach, because of some other stuff below
{
var entity = enumerator.Current;
csv.WriteRecord(entity);
csv.NextRecord();
// some other stuff
}
},
_ => Task.FromResult(true), // We are just reading, so we can always commit the transaction
System.Data.IsolationLevel.ReadUncommitted, // Do not block the whole table while reading. This is essentially the same as WITH(NOLOCK).
cancellationToken);
我认为,问题可能是小型的Network,或者是数据库忙于其他任务,所以我可能需要的是某种弹性。但是,我不能使用默认的SqlServerRetryingExecutionStrategy,因为它会缓冲内存中的所有行,这太大了(数百it )。
发布于 2022-10-17 09:32:08
像EF这样的ORM并不适用于大型ETL作业,但是使用NOLOCK可能会导致额外的阻塞,甚至可能导致异常。
首先,NOLOCK
的使用并不意味着表没有锁定,恰恰相反。NOLOCK
的意思是ignore locks
,实际上是获取数据库上的共享锁和表上的架构锁,所以它会导致阻塞。
这也意味着,如果表或其索引正在被修改,NOLOCK可能会被阻塞,因为这些操作也采用Schema锁。
如果您确实希望避免阻塞和阻塞,请使用快照隔离。
所有这些都在Server表提示-使用(NOLOCK)最佳实践和布伦特·奥扎尔的短文“使用NOLOCK?下面是如何获得错误的查询结果。”和““但是当我的数据没有变化的时候,NOLOCK就可以了,对吗?””中作了解释。正如布伦特·奥扎尔所强调的:
我怎么强调都不为过:对于NOLOCK,你可以:
快速出口解决方案
最快的解决方案是使用数据库提供的工具,如bcp或SSIS
。bcp
是一个独立的命令行工具,可以在Windows和Linux上运行。
如果您必须编写自己的代码,最快的选择是直接使用ADO.NET,通过IDataReader读取结果并将其写入CSV文件。这样,一次只加载一行。Github问题直接从数据中心写入文件演示了如何使用CsvHelper来完成此操作:
await using (var db = new SqlConnection(Connection.ConnectionString))
await using (var cmd = db.CreateCommand())
{
await db.OpenAsync();
cmd.CommandText = @"
select *
from MyTable";
cmd.CommandTimeout = 5 * 60;
await using (var reader = await cmd.ExecuteReaderAsync())
await using (var writer = new StreamWriter(Path.Combine(MyExtensions.TempFolder, "MyFile.csv")))
await using (var csv = new CsvWriter(writer, CultureInfo.InvariantCulture))
{
var schema = await reader.GetColumnSchemaAsync();
foreach (var columnName in schema.Select(c => c.ColumnName))
{
csv.WriteField(columnName);
}
await csv.NextRecordAsync();
while (await reader.ReadAsync())
{
for (int i = 0; i < reader.FieldCount; i++)
{
csv.WriteField(reader[i]);
}
await csv.NextRecordAsync();
}
}
}
如果使用像脱衣舞这样的库来减少样板ADO.NET代码,则可以简化此代码:
using var con=new SqlConnection(...);
var sql="select * From Sales where Date>=@date";
await using var reader=con.ExequteReader(sql,new {date=DateTime.Today.AddMonths(-2)});
await using (var writer = new StreamWriter(Path.Combine(MyExtensions.TempFolder, "MyFile.csv")))
...
Dapper将根据需要打开和关闭连接,构造一个SqlCommand,其参数与匿名参数对象@date
的名称和值匹配。
https://stackoverflow.com/questions/74093952
复制相似问题