我正在使用命名管道在应用程序之间传递一些消息。
我的管道客户机使用PipeStream.ReadByte()
读取从服务器发送的消息。这会阻塞它在其中运行的线程。
这种行为通常很好,但是,我找不到一个好方法来在线程被阻塞时将其处理掉。当客户端退出时,该线程保持活动状态,直到流获得一些数据,并且允许它再次检查循环变量。在服务器静默的情况下,线程仍然存在。
Thread.Abort()
不起作用。
在转移到ReadByte()
方法之前,有没有办法检查数据流?PipeStream.Length
抛出了UnsupportedException
。
发布于 2015-04-04 04:27:15
解决方案可能是使用异步I/O和任务,而不是专用线程,并使用CancelationToken
中止挂起的IO。下面的代码片段中显示了一个简化的示例,该示例基于答案https://stackoverflow.com/a/29423042/2573395,但针对管道操作稍作了修改(在本例中为异步等待每条消息的连接)。请注意,我不是命名管道方面的专家,因此可能有更好的方法来做到这一点。
public class MyPipeReader
{
private string _pipeName;
public MyPipeReader(string pipeName)
{
_pipeName = pipeName;
}
public async Task ReceiveAndProcessStuffUntilCancelled(CancellationToken token)
{
var received = new byte[4096];
while (!token.IsCancellationRequested)
{
using (var stream = CreateStream())
{
try
{
if (await WaitForConnection(stream, token))
{
var bytesRead = 0;
do {
bytesRead = await stream.ReadAsync(received, 0, 4096, token).ConfigureAwait(false);
} while (bytesRead > 0 && DoMessageProcessing(received, bytesRead));
if (stream.IsConnected)
stream.Disconnect();
}
}
catch (OperationCanceledException)
{
break; // operation was canceled.
}
catch (Exception e)
{
// report error & decide if you want to give up or retry.
}
}
}
}
private NamedPipeServerStream CreateStream()
{
return new NamedPipeServerStream(_pipeName, PipeDirection.In, 1, PipeTransmissionMode.Byte, PipeOptions.Asynchronous);
}
private async Task<bool> WaitForConnection(NamedPipeServerStream stream, CancellationToken token)
{
// We can't simply use 'Task.Factory.FromAsync', as that is not cancelable.
var tcs = new TaskCompletionSource<bool>();
var cancelRegistration = token.Register(() => tcs.SetCanceled());
var iar = stream.BeginWaitForConnection(null, null);
var rwh = ThreadPool.RegisterWaitForSingleObject(iar.AsyncWaitHandle, delegate { tcs.TrySetResult(true); }, null, -1, true);
try
{
await tcs.Task.ConfigureAwait(false);
if (iar.IsCompleted) {
stream.EndWaitForConnection(iar);
return true;
}
}
finally
{
cancelRegistration.Dispose();
rwh.Unregister(null);
}
return false;
}
private bool DoMessageProcessing(byte[] buffer, int nBytes)
{
try
{
// Your processing code.
// You could also make this async in case it does any I/O.
return true;
}
catch (Exception e)
{
// report error, and decide what to do.
// return false if the task should not
// continue.
return false;
}
}
}
class Program
{
public static void Main(params string[] args)
{
using (var cancelSource = new CancellationTokenSource())
{
var receive = new MyPipeReader("_your_pipe_name_here_").ReceiveAndProcessStuffUntilCancelled(cancelSource.Token);
Console.WriteLine("Press <ENTER> to stop");
Console.ReadLine();
cancelSource.Cancel();
receive.Wait();
}
}
}
https://stackoverflow.com/questions/29437319
复制相似问题