System.Text.Json.JsonSerializer.DeserializeAsyncEnumerable<T>
是System.Text.Json的一种方法,它可以获取Stream
并生成IAsyncEnumerable<T>
,其中枚举可以是异步的。例如,这对于反序列化由网络连接流的元素数组非常有用,因此我们可以在到达流的实际末尾之前输出这些元素。
是否有任何方法可以使用Newtonsoft.Json库实现等效的功能?
发布于 2022-06-04 18:57:28
Json.NET的JsonSerializer
类不支持异步反序列化,今后也不计划提供支持。如需确认,请参见下列未决问题:
然而,JsonTextReader
确实支持异步读取,LINQ支持通过JToken.LoadAsync()
进行异步加载。因此,您应该能够创建一个IAsyncEnumerable
,它遍历根值为数组的JSON流,并异步地将每个数组元素作为JToken
返回。随后,您可以使用JToken.ToObject()
将每个令牌反序列化到最终模型。
首先,创建以下扩展方法:
public static partial class JsonExtensions
{
/// <summary>
/// Asynchronously load and synchronously deserialize values from a stream containing a JSON array. The root object of the JSON stream must in fact be an array, or an exception is thrown
/// </summary>
public static async IAsyncEnumerable<T?> DeserializeAsyncEnumerable<T>(Stream stream, JsonSerializerSettings? settings = default, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
var serializer = JsonSerializer.CreateDefault(settings);
var loadSettings = new JsonLoadSettings { LineInfoHandling = LineInfoHandling.Ignore }; // For performance do not load line info.
// StreamReader and JsonTextReader do not implement IAsyncDisposable so let the caller dispose the stream.
using (var textReader = new StreamReader(stream, leaveOpen : true))
using (var reader = new JsonTextReader(textReader) { CloseInput = false })
{
await foreach (var token in LoadAsyncEnumerable(reader, loadSettings, cancellationToken ).ConfigureAwait(false))
yield return token.ToObject<T>(serializer);
}
}
/// <summary>
/// Asynchronously load and return JToken values from a stream containing a JSON array. The root object of the JSON stream must in fact be an array, or an exception is thrown
/// </summary>
public static async IAsyncEnumerable<JToken> LoadJTokenAsyncEnumerable(Stream stream, JsonLoadSettings? settings = default, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
// StreamReader and JsonTextReader do not implement IAsyncDisposable so let the caller dispose the stream.
using (var textReader = new StreamReader(stream, leaveOpen : true))
using (var reader = new JsonTextReader(textReader) { CloseInput = false })
{
await foreach (var token in LoadAsyncEnumerable(reader, settings, cancellationToken).ConfigureAwait(false))
yield return token;
}
}
/// <summary>
/// Asynchronously load and return JToken values from a stream containing a JSON array. The root object of the JSON stream must in fact be an array, or an exception is thrown
/// </summary>
public static async IAsyncEnumerable<JToken> LoadAsyncEnumerable(JsonTextReader reader, JsonLoadSettings? settings = default, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
(await reader.MoveToContentAndAssertAsync().ConfigureAwait(false)).AssertTokenType(JsonToken.StartArray);
cancellationToken.ThrowIfCancellationRequested();
while ((await reader.ReadToContentAndAssert(cancellationToken).ConfigureAwait(false)).TokenType != JsonToken.EndArray)
{
cancellationToken.ThrowIfCancellationRequested();
yield return await JToken.LoadAsync(reader, settings, cancellationToken).ConfigureAwait(false);
}
cancellationToken.ThrowIfCancellationRequested();
}
public static JsonReader AssertTokenType(this JsonReader reader, JsonToken tokenType) =>
reader.TokenType == tokenType ? reader : throw new JsonSerializationException(string.Format("Unexpected token {0}, expected {1}", reader.TokenType, tokenType));
public static async Task<JsonReader> ReadToContentAndAssert(this JsonReader reader, CancellationToken cancellationToken = default) =>
await (await reader.ReadAndAssertAsync(cancellationToken).ConfigureAwait(false)).MoveToContentAndAssertAsync(cancellationToken).ConfigureAwait(false);
public static async Task<JsonReader> MoveToContentAndAssertAsync(this JsonReader reader, CancellationToken cancellationToken = default)
{
if (reader == null)
throw new ArgumentNullException();
if (reader.TokenType == JsonToken.None) // Skip past beginning of stream.
await reader.ReadAndAssertAsync(cancellationToken).ConfigureAwait(false);
while (reader.TokenType == JsonToken.Comment) // Skip past comments.
await reader.ReadAndAssertAsync(cancellationToken).ConfigureAwait(false);
return reader;
}
public static async Task<JsonReader> ReadAndAssertAsync(this JsonReader reader, CancellationToken cancellationToken = default)
{
if (reader == null)
throw new ArgumentNullException();
if (!await reader.ReadAsync(cancellationToken).ConfigureAwait(false))
throw new JsonReaderException("Unexpected end of JSON stream.");
return reader;
}
}
现在你可以做这样的事情:
await foreach (var token in JsonExtensions.LoadJTokenAsyncEnumerable(stream, cancellationToken : cancellationToken))
{
Console.WriteLine(token.ToString(Formatting.None));
}
备注:
OperationCanceledException
。System.Text.Json
是从零开始设计的,以支持异步反序列化,具有良好的性能。如果需要异步反序列化,则应考虑重写代码以使用它。StreamReader
和JsonTextReader
不实现IAsyncDisposable
,因此扩展方法让调用者释放底层流。.ConfigureAwait(false)
6中还需要所有这些.NET调用。轻度测试演示小提琴这里。
https://stackoverflow.com/questions/72451669
复制相似问题