我正在使用RXTX从串行端口读取数据。读取是在以以下方式派生的线程中完成的:
CommPortIdentifier portIdentifier = CommPortIdentifier.getPortIdentifier(port);
CommPort comm = portIdentifier.open("Whatever", 2000);
SerialPort serial = (SerialPort)comm;
...settings
Thread t = new Thread(new SerialReader(serial.getInputStream()));
t.start();SerialReader类实现Runnable并无限循环,从端口读取数据并将其构建为有用的包,然后将其发送到其他应用程序。但是,我将其简化为以下简单性:
public void run() {
ReadableByteChannel byteChan = Channels.newChannel(in); //in = InputStream passed to SerialReader
ByteBuffer buffer = ByteBuffer.allocate(100);
while (true) {
try {
byteChan.read(buffer);
} catch (Exception e) {
System.out.println(e);
}
}
}当用户单击停止按钮时,将触发以下功能,理论上应该关闭输入流并中断阻塞的byteChan.read(buffer)调用。代码如下:
public void stop() {
t.interrupt();
serial.close();
}但是,当我运行这段代码时,我从来没有得到过ClosedByInterruptException,它应该在输入流关闭时触发。此外,执行阻塞了对serial.close()的调用--因为底层输入流仍然阻塞了读取调用。我尝试将中断调用替换为byteChan.close(),这应该会导致AsynchronousCloseException,但是,我得到了相同的结果。
如果能帮上我的忙,我将不胜感激。
发布于 2010-10-14 01:31:43
RXTX SerialInputStream (由serial.getInputStream()调用返回)支持超时方案,最终解决了我的所有问题。在创建新的SerialReader对象之前添加以下内容会导致读取不再无限期阻塞:
serial.enableReceiveTimeout(1000);在SerialReader对象中,我必须更改一些内容以直接从InputStream读取,而不是创建ReadableByteChannel,但现在,我可以停止并重新启动阅读器而不会出现问题。
发布于 2010-10-02 06:22:39
您不能简单地将不支持可中断I/O的流包装到InterruptibleChannel中(而且,无论如何,ReadableByteChannel不会扩展InterruptibleChannel)。
您必须查看底层InputStream的契约。SerialPort.getInputStream()对其结果的中断性有何看法?如果它什么也没说,你应该认为它忽略了中断。
对于任何不显式支持中断性的I/O,唯一的选择通常是关闭来自另一个线程的流。这可能会立即在调用流时阻塞的线程中引发IOException (尽管它可能不是AsynchronousCloseException)。
然而,即使这非常依赖于InputStream-and的实现,底层操作系统也可能是一个因素。
请注意newChannel()返回的ReadableByteChannelImpl类上的源代码注释
private static class ReadableByteChannelImpl
extends AbstractInterruptibleChannel // Not really interruptible
implements ReadableByteChannel
{
InputStream in;
⋮发布于 2010-11-30 16:50:24
我使用下面的代码来关闭rxtx。我运行了启动和关闭它们的测试,似乎工作正常。我的读者看起来是这样的:
private void addPartsToQueue(final InputStream inputStream) {
byte[] buffer = new byte[1024];
int len = -1;
boolean first = true;
// the read can throw
try {
while ((len = inputStream.read(buffer)) > -1) {
if (len > 0) {
if (first) {
first = false;
t0 = System.currentTimeMillis();
} else
t1 = System.currentTimeMillis();
final String part = new String(new String(buffer, 0, len));
queue.add(part);
//System.out.println(part + " " + (t1 - t0));
}
try {
Thread.sleep(sleep);
} catch (InterruptedException e) {
//System.out.println(Thread.currentThread().getName() + " interrupted " + e);
break;
}
}
} catch (IOException e) {
System.err.println(Thread.currentThread().getName() + " " + e);
//if(interruSystem.err.println(e);
e.printStackTrace();
}
//System.out.println(Thread.currentThread().getName() + " is ending.");
}谢谢
public void shutdown(final Device device) {
shutdown(serialReaderThread);
shutdown(messageAssemblerThread);
serialPort.close();
if (device != null)
device.setSerialPort(null);
}
public static void shutdown(final Thread thread) {
if (thread != null) {
//System.out.println("before intterupt() on thread " + thread.getName() + ", it's state is " + thread.getState());
thread.interrupt();
//System.out.println("after intterupt() on thread " + thread.getName() + ", it's state is " + thread.getState());
try {
Thread.sleep(100);
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + " was interrupted trying to sleep after interrupting" + thread.getName() + " " + e);
}
//System.out.println("before join() on thread " + thread.getName() + ", it's state is " + thread.getState());
try {
thread.join();
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + " join interruped");
}
//System.out.println(Thread.currentThread().getName() + " after join() on thread " + thread.getName() + ", it's state is" + thread.getState());
}https://stackoverflow.com/questions/3843363
复制相似问题