我有多个线程试图同时写入websocket。在从多个线程访问websocket时,最佳的并发机制是什么?
每个线程在websocket上发送一条消息,然后等待单个响应。我一直有个错误:
对于这个WebSocket实例已经有一个尚未执行的'ReceiveAsync‘调用。ReceiveAsync和SendAsync可以同时调用,但最多允许它们同时执行一个未完成的操作。
相关代码(没有任何并发机制)
// multiple trackers are calling in
// for each tracker a thread is started calling this method
public static void SendPackageToClients(GpsTrackerEntry gte)
{
var gteByteArray = Encoding.ASCII.GetBytes(Newtonsoft.Json.JsonConvert.SerializeObject(gte));
foreach (var cs in AsynchronousSocketListener.carSubscriptions[gte.carId])
{
if (cs.Value.ws.State != WebSocketState.Open)
{
UnsubscribeClient(cs.Key);
continue;
}
Task.Run( () => {
// TODO: ideally here a thread should stop and wait for
// the target websocket to become available (i.e. no send or
// receive is being performed) and then execute the following method
SendPackageToClient(cs.Value.ws, gteByteArray, cs.Key);
});
}
return;
}
private static async void SendPackageToClient(WebSocket ws, byte[] gteByteArray, int key)
{
try
{
await ws.SendAsync(gteByteArray, 0, true, CancellationToken.None);
var timeOut = new CancellationTokenSource(5000).Token;
byte[] ping = new byte[10];
var result = await ws.ReceiveAsync(ping, timeOut);
if (result.CloseStatus.HasValue)
{
Console.WriteLine(" **** CLIENT CLOSED CONNECTION **** ");
UnsubscribeClient(key);
}
}
catch (Exception ex)
{
Console.WriteLine(" **** SERVER ERROR **** " + ex.Message);
UnsubscribeClient(key);
}
}我尝试过使用Monitor.Enter(cs.Value.ws)和SemaphoreSlim,但似乎无法正确地设置它
发布于 2021-07-28 15:58:32
设法解决问题。最后使用SemaphoreSlim,为每个websocket添加一个semaphoreSlim,并在调用SendPackageToClient之前检查它。此外,我还必须从SendPackageToClient返回一个值,以便等待它完成,然后释放信号量。
Thx @Lasse V. Karlsen
https://stackoverflow.com/questions/68560634
复制相似问题