首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >处理WebSocket的类close和dispose (Observable.Using)

处理WebSocket的类close和dispose (Observable.Using)
EN

Stack Overflow用户
提问于 2021-11-29 07:56:40
回答 1查看 145关注 0票数 0

我如何处理WebSocket的关闭和处理?simple.SubscribeToTicker().Subscribe(...)也需要处理。

我看到有些人这样做,但我不确定Observable.Using(...)是如何工作的,也不确定它何时处理。

代码语言:javascript
运行
复制
var message =
    Observable.Using(() => ws,
        _ => Observable.FromEventPattern<MessageReceivedEventArgs>(h => ws.MessageReceived += h,
            h => ws.MessageReceived -= h));

片段

代码语言:javascript
运行
复制
using System.Net;
using System.Reactive.Linq;
using System.Text.Json;
using System.Text.Json.Serialization;
using RestSharp;
using WebSocket4Net;

namespace SimpleTest;

public class Simple
{
    public RestPriceTicker24Hr? GetTicker()
    {
        const string uri = "https://api.binance.com";

        var client = new RestClient(uri);
        var request = new RestRequest("/api/v3/ticker/24hr?symbol=BNBUSDT", Method.GET);
        var response = client.Execute(request);

        if (response.StatusCode != HttpStatusCode.OK)
            throw new InvalidOperationException();

        var content = response.Content;

        var deserialize = JsonSerializer.Deserialize<RestPriceTicker24Hr>(content, new JsonSerializerOptions
        {
            NumberHandling = JsonNumberHandling.AllowReadingFromString
        });

        return deserialize;
    }

    public IObservable<PriceTicker24Hr?> SubscribeToTicker()
    {
        const string uri = "wss://stream.binance.com:9443/ws";

        var ws = new WebSocket($"{uri}/bnbusdt@ticker") // TODO: Dispose
        {
            AutoSendPingInterval = 3, // 3 seconds
            EnableAutoSendPing = true
        };

        ws.Open();

        ws.Error += (_, e) => { Console.WriteLine($"Exception: {e.Exception.Message}"); };

        ws.Opened += (_, _) => { Console.WriteLine("Connection opened"); };

        ws.Closed += (_, _) => { Console.WriteLine("Connection closed"); };

        //var message =
        //    Observable.Using(() => ws,
        //        _ => Observable.FromEventPattern<MessageReceivedEventArgs>(h => ws.MessageReceived += h,
        //            h => ws.MessageReceived -= h));

        var message =
            Observable.FromEventPattern<MessageReceivedEventArgs>(h => ws.MessageReceived += h,
                h => ws.MessageReceived -= h);

        return message.Select(e =>
            JsonSerializer.Deserialize<PriceTicker24Hr>(e.EventArgs.Message, new JsonSerializerOptions
            {
                NumberHandling = JsonNumberHandling.AllowReadingFromString
            }));
    }
}

public class Program
{
    private static void GetData()
    {
        var simple = new Simple();

        var ticker = simple.GetTicker();
        simple.SubscribeToTicker().Subscribe(message => { Console.WriteLine($"Message: {message?.BestAskPrice}"); }); // TODO: Dispose
    }

    private static void Main()
    {
        GetData();

        Console.ReadKey();
    }
}

public class PriceTicker24Hr
{
    [JsonPropertyName("e")] public string? EventType { get; set; }

    [JsonPropertyName("E")] public long EventTime { get; set; }

    [JsonPropertyName("s")] public string? Symbol { get; set; }

    [JsonPropertyName("p")] public decimal PriceChange { get; set; }

    [JsonPropertyName("P")] public decimal PriceChangePercent { get; set; }

    [JsonPropertyName("w")] public decimal WeightedAveragePrice { get; set; }

    [JsonPropertyName("x")] public decimal PreviousClosePrice { get; set; }

    [JsonPropertyName("c")] public decimal LastPrice { get; set; }

    [JsonPropertyName("Q")] public decimal LastQuantity { get; set; }

    [JsonPropertyName("b")] public decimal BestBidPrice { get; set; }

    [JsonPropertyName("B")] public decimal BestBidQuantity { get; set; }

    [JsonPropertyName("a")] public decimal BestAskPrice { get; set; }

    [JsonPropertyName("A")] public decimal BestAskQuantity { get; set; }

    [JsonPropertyName("o")] public decimal OpenPrice { get; set; }

    [JsonPropertyName("h")] public decimal HighPrice { get; set; }

    [JsonPropertyName("l")] public decimal LowPrice { get; set; }

    [JsonPropertyName("v")] public decimal TotalTradedBaseVolume { get; set; }

    [JsonPropertyName("q")] public decimal TotalTradedQuoteVolume { get; set; }

    [JsonPropertyName("O")] public long OpenTime { get; set; }

    [JsonPropertyName("C")] public long CloseTime { get; set; }

    [JsonPropertyName("F")] public long FirstTradeId { get; set; }

    [JsonPropertyName("L")] public long LastTradeId { get; set; }

    [JsonPropertyName("n")] public long Count { get; set; }
}

public class RestPriceTicker24Hr
{
    [JsonPropertyName("symbol")] public string? Symbol { get; set; }
    [JsonPropertyName("priceChange")] public decimal PriceChange { get; set; }

    [JsonPropertyName("priceChangePercent")]
    public decimal PriceChangePercent { get; set; }

    [JsonPropertyName("weightedAvgPrice")] public decimal WeightedAveragePrice { get; set; }
    [JsonPropertyName("prevClosePrice")] public decimal PreviousClosePrice { get; set; }
    [JsonPropertyName("lastPrice")] public decimal LastPrice { get; set; }
    [JsonPropertyName("lastQty")] public decimal LastQuantity { get; set; }
    [JsonPropertyName("bidPrice")] public decimal BestBidPrice { get; set; }
    [JsonPropertyName("bidQty")] public decimal BestBidQuantity { get; set; }
    [JsonPropertyName("askPrice")] public decimal BestAskPrice { get; set; }
    [JsonPropertyName("askQty")] public decimal BestAskQuantity { get; set; }
    [JsonPropertyName("openPrice")] public decimal OpenPrice { get; set; }
    [JsonPropertyName("highPrice")] public decimal HighPrice { get; set; }
    [JsonPropertyName("lowPrice")] public decimal LowPrice { get; set; }
    [JsonPropertyName("volume")] public decimal TotalTradedBaseVolume { get; set; }
    [JsonPropertyName("quoteVolume")] public decimal TotalTradedQuoteVolume { get; set; }
    [JsonPropertyName("openTime")] public long OpenTime { get; set; }
    [JsonPropertyName("closeTime")] public long CloseTime { get; set; }
    [JsonPropertyName("firstId")] public long FirstTradeId { get; set; }
    [JsonPropertyName("lastId")] public long LastTradeId { get; set; }
    [JsonPropertyName("count")] public long Count { get; set; }
}
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-12-02 06:56:53

当可观察的结束时,每个处理都需要在Observable.Using中触发。

如下所示:

代码语言:javascript
运行
复制
public IObservable<PriceTicker24Hr?> SubscribeToTicker() =>
    Observable
        .Using(
            () =>
            {
                const string uri = "wss://stream.binance.com:9443/ws";
                var ws = new WebSocket($"{uri}/bnbusdt@ticker") // TODO: Dispose
                {
                    AutoSendPingInterval = 3, // 3 seconds
                    EnableAutoSendPing = true
                };
                ws.Open();
                ws.Error += (_, e) => { Console.WriteLine($"Exception: {e.Exception.Message}"); };
                ws.Opened += (_, _) => { Console.WriteLine("Connection opened"); };
                ws.Closed += (_, _) => { Console.WriteLine("Connection closed"); };
                return ws;
            },
            ws =>
                Observable
                    .FromEventPattern<MessageReceivedEventArgs>(
                        h => ws.MessageReceived += h,
                        h => ws.MessageReceived -= h)
                    .Select(e =>
                        JsonSerializer.Deserialize<PriceTicker24Hr>(
                            e.EventArgs.Message,
                            new JsonSerializerOptions
                            {
                                NumberHandling = JsonNumberHandling.AllowReadingFromString
                            })));

下面是一个可测试的示例,用于查看Using的运行情况:

代码语言:javascript
运行
复制
IObservable<Unit> observable =
    Observable
        .Using(
            () => Disposable.Create(() => Console.WriteLine("Using Disposed!")),
            _ => Observable.Never<Unit>());
            
IDisposable subscription = observable.Subscribe();

subscription.Dispose();

在控制台上弹出Using Disposed!

下面是另一个例子:

代码语言:javascript
运行
复制
IObservable<int> observable =
    Observable
        .Using(
            () => Disposable.Create(() => Console.WriteLine("Using Disposed!")),
            _ => Observable.Repeat(42))
        .Take(1);
            
IDisposable subscription = observable.Subscribe(Console.WriteLine);

它会弹出以下内容:

代码语言:javascript
运行
复制
42
Using Disposed!

希望这有助于您了解Observable.Using是如何工作的。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/70151321

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档