首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >用AsyncEnumerable反序列化为Newtonsoft.Json

用AsyncEnumerable反序列化为Newtonsoft.Json
EN

Stack Overflow用户
提问于 2022-05-31 17:04:00
回答 1查看 1.3K关注 0票数 1

System.Text.Json.JsonSerializer.DeserializeAsyncEnumerable<T>是System.Text.Json的一种方法,它可以获取Stream并生成IAsyncEnumerable<T>,其中枚举可以是异步的。例如,这对于反序列化由网络连接流的元素数组非常有用,因此我们可以在到达流的实际末尾之前输出这些元素。

是否有任何方法可以使用Newtonsoft.Json库实现等效的功能?

EN

回答 1

Stack Overflow用户

发布于 2022-06-04 18:57:28

Json.NET的JsonSerializer类不支持异步反序列化,今后也不计划提供支持。如需确认,请参见下列未决问题:

然而,JsonTextReader确实支持异步读取,LINQ支持通过JToken.LoadAsync()进行异步加载。因此,您应该能够创建一个IAsyncEnumerable,它遍历根值为数组的JSON流,并异步地将每个数组元素作为JToken返回。随后,您可以使用JToken.ToObject()将每个令牌反序列化到最终模型。

首先,创建以下扩展方法:

代码语言:javascript
运行
复制
public static partial class JsonExtensions
{
    /// <summary>
    /// Asynchronously load and synchronously deserialize values from a stream containing a JSON array.  The root object of the JSON stream must in fact be an array, or an exception is thrown
    /// </summary>
    public static async IAsyncEnumerable<T?> DeserializeAsyncEnumerable<T>(Stream stream, JsonSerializerSettings? settings = default, [EnumeratorCancellation] CancellationToken cancellationToken = default)
    {
        var serializer = JsonSerializer.CreateDefault(settings);
        var loadSettings = new JsonLoadSettings { LineInfoHandling = LineInfoHandling.Ignore }; // For performance do not load line info.
        // StreamReader and JsonTextReader do not implement IAsyncDisposable so let the caller dispose the stream.
        using (var textReader = new StreamReader(stream, leaveOpen : true))
        using (var reader = new JsonTextReader(textReader) { CloseInput = false })
        {
            await foreach (var token in LoadAsyncEnumerable(reader, loadSettings, cancellationToken ).ConfigureAwait(false))
                yield return token.ToObject<T>(serializer);
        }
    }
    
    /// <summary>
    /// Asynchronously load and return JToken values from a stream containing a JSON array.  The root object of the JSON stream must in fact be an array, or an exception is thrown
    /// </summary>
    public static async IAsyncEnumerable<JToken> LoadJTokenAsyncEnumerable(Stream stream, JsonLoadSettings? settings = default, [EnumeratorCancellation] CancellationToken cancellationToken = default)
    {
        // StreamReader and JsonTextReader do not implement IAsyncDisposable so let the caller dispose the stream.
        using (var textReader = new StreamReader(stream, leaveOpen : true))
        using (var reader = new JsonTextReader(textReader) { CloseInput = false })
        {
            await foreach (var token in LoadAsyncEnumerable(reader, settings, cancellationToken).ConfigureAwait(false))
                yield return token;
        }
    }
    
    /// <summary>
    /// Asynchronously load and return JToken values from a stream containing a JSON array.  The root object of the JSON stream must in fact be an array, or an exception is thrown
    /// </summary>
    public static async IAsyncEnumerable<JToken> LoadAsyncEnumerable(JsonTextReader reader, JsonLoadSettings? settings = default, [EnumeratorCancellation] CancellationToken cancellationToken = default)
    {
        (await reader.MoveToContentAndAssertAsync().ConfigureAwait(false)).AssertTokenType(JsonToken.StartArray);
        cancellationToken.ThrowIfCancellationRequested();
        while ((await reader.ReadToContentAndAssert(cancellationToken).ConfigureAwait(false)).TokenType != JsonToken.EndArray)
        {
            cancellationToken.ThrowIfCancellationRequested();
            yield return await JToken.LoadAsync(reader, settings, cancellationToken).ConfigureAwait(false);
        }
        cancellationToken.ThrowIfCancellationRequested();
    }
    
    public static JsonReader AssertTokenType(this JsonReader reader, JsonToken tokenType) => 
        reader.TokenType == tokenType ? reader : throw new JsonSerializationException(string.Format("Unexpected token {0}, expected {1}", reader.TokenType, tokenType));
    
    public static async Task<JsonReader> ReadToContentAndAssert(this JsonReader reader, CancellationToken cancellationToken = default) =>
        await (await reader.ReadAndAssertAsync(cancellationToken).ConfigureAwait(false)).MoveToContentAndAssertAsync(cancellationToken).ConfigureAwait(false);

    public static async Task<JsonReader> MoveToContentAndAssertAsync(this JsonReader reader, CancellationToken cancellationToken = default)
    {
        if (reader == null)
            throw new ArgumentNullException();
        if (reader.TokenType == JsonToken.None)       // Skip past beginning of stream.
            await reader.ReadAndAssertAsync(cancellationToken).ConfigureAwait(false);
        while (reader.TokenType == JsonToken.Comment) // Skip past comments.
            await reader.ReadAndAssertAsync(cancellationToken).ConfigureAwait(false);
        return reader;
    }

    public static async Task<JsonReader> ReadAndAssertAsync(this JsonReader reader, CancellationToken cancellationToken = default)
    {
        if (reader == null)
            throw new ArgumentNullException();
        if (!await reader.ReadAsync(cancellationToken).ConfigureAwait(false))
            throw new JsonReaderException("Unexpected end of JSON stream.");
        return reader;
    }
}

现在你可以做这样的事情:

代码语言:javascript
运行
复制
await foreach (var token in JsonExtensions.LoadJTokenAsyncEnumerable(stream, cancellationToken : cancellationToken))
{
    Console.WriteLine(token.ToString(Formatting.None));
}

备注:

  • 取消没有测试。如果取消,上述实现应该抛出OperationCanceledException
  • System.Text.Json是从零开始设计的,以支持异步反序列化,具有良好的性能。如果需要异步反序列化,则应考虑重写代码以使用它。
  • StreamReaderJsonTextReader不实现IAsyncDisposable,因此扩展方法让调用者释放底层流。
  • 我不确定在.ConfigureAwait(false) 6中还需要所有这些.NET调用。

轻度测试演示小提琴这里

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/72451669

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档