2010-10-01 92 views
8

我正在使用RXTX从串行端口读取数据。该读数按以下方式催生了一个线程中完成的:线程中断未终止输入流读取阻塞调用

CommPortIdentifier portIdentifier = CommPortIdentifier.getPortIdentifier(port); 
CommPort comm = portIdentifier.open("Whatever", 2000); 
SerialPort serial = (SerialPort)comm; 
...settings 
Thread t = new Thread(new SerialReader(serial.getInputStream())); 
t.start(); 

的SerialReader类实现Runnable,只是无限循环,从港口阅读和发送它关闭其他应用程序之前构建数据转换成有用的软件包。不过,我已经减少了它归结为以下简单:

public void run() { 
    ReadableByteChannel byteChan = Channels.newChannel(in); //in = InputStream passed to SerialReader 
    ByteBuffer buffer = ByteBuffer.allocate(100); 
    while (true) { 
    try { 
     byteChan.read(buffer); 
    } catch (Exception e) { 
     System.out.println(e); 
    } 
    } 
} 

当用户点击停止按钮,下面的功能大火在理论上应关闭输入流,打破了阻塞byteChan.read的(缓冲区)呼叫。代码如下:

public void stop() { 
    t.interrupt(); 
    serial.close(); 
} 

然而,当我运行此代码,我从来没有收到一个ClosedByInterruptException,一旦输入流关闭这应该解雇。此外,调用serial.close()时的执行块 - 因为底层输入流在读取调用时仍然阻塞。我尝试用byteChan.close()替换中断调用,然后应该会导致AsynchronousCloseException,但是,我得到相同的结果。

对我所缺少的任何帮助将不胜感激。

回答

3

的RXTX SerialInputStream(什么是由serial.getInputStream()调用返回)支持最终超时方案解决我所有的问题。添加创建新SerialReader对象会导致读取不再无限期阻塞之前如下:

serial.enableReceiveTimeout(1000); 

内SerialReader对象,我不得不改变周围的一些东西直接从InputStream读取,而不是创造的ReadableByteChannel,但现在,我可以停下来重新启动读者而不会有任何问题。

+0

这绝对有效。谢谢。 – 2015-08-03 07:58:18

5

仅通过包装它就可以制作不支持可中断I/O的流InterruptibleChannel(并且,无论如何,ReadableByteChannel不会延伸InterruptibleChannel)。

你必须看看底层InputStream的合约。 SerialPort.getInputStream()关于其结果的可中断性有何说法?如果它什么都没说,你应该假设它忽略了中断。

对于没有明确支持可中断性的任何I/O,唯一的选择是通常关闭来自另一个线程的流。这可能会在调用流时阻塞的线程中立即引发IOException(尽管它可能不是AsynchronousCloseException)。

但是,即使如此,这也非常依赖于InputStream —的实现,而底层操作系统也可能是一个因素。


注由newChannel()返回ReadableByteChannelImpl类的源代码注释:

private static class ReadableByteChannelImpl 
    extends AbstractInterruptibleChannel  // Not really interruptible 
    implements ReadableByteChannel 
    { 
    InputStream in; 
    ⋮ 
+0

在我的例子,Channels.newChannel(<输入流>)返回类型ReadableByteChannelImpl,它实现的ReadableByteChannel的对象(但更重要的是延伸AbstractInterruptibleChannel它实现InterruptibleChannel)。 – JDS 2010-10-01 22:48:26

+0

糟糕...输入提交评论。 总之,对InterruptibleChannel执行byteChan的instanceof检查返回true。此外,由于不清楚,stop()的调用在产生读循环线程的线程中完成。 – JDS 2010-10-01 22:51:09

+0

@JDS - ...然而,它不起作用,是吗?请参阅我的更新。该通道不可中断,并且您可能没有任何可行的选项来释放RXTX读取。 – erickson 2010-10-01 23:54:11

1

我使用下面的代码来关闭rxtx。我运行测试,启动它们并关闭它们,似乎工作正常。我的读者看起来像:

private void addPartsToQueue(final InputStream inputStream) { 
    byte[] buffer = new byte[1024]; 
    int len = -1; 
    boolean first = true; 
    // the read can throw 
    try { 
     while ((len = inputStream.read(buffer)) > -1) { 
      if (len > 0) { 
       if (first) { 
        first = false; 
        t0 = System.currentTimeMillis(); 
       } else 
        t1 = System.currentTimeMillis(); 
       final String part = new String(new String(buffer, 0, len)); 
       queue.add(part); 
       //System.out.println(part + " " + (t1 - t0)); 
      } 
      try { 
       Thread.sleep(sleep); 
      } catch (InterruptedException e) { 
       //System.out.println(Thread.currentThread().getName() + " interrupted " + e); 
       break; 
      } 
     } 
    } catch (IOException e) { 
     System.err.println(Thread.currentThread().getName() + " " + e); 
     //if(interruSystem.err.println(e); 
     e.printStackTrace(); 
    } 
    //System.out.println(Thread.currentThread().getName() + " is ending."); 
} 

感谢

public void shutdown(final Device device) { 
    shutdown(serialReaderThread); 
    shutdown(messageAssemblerThread); 
    serialPort.close(); 
    if (device != null) 
     device.setSerialPort(null); 
} 

public static void shutdown(final Thread thread) { 
    if (thread != null) { 
     //System.out.println("before intterupt() on thread " + thread.getName() + ", it's state is " + thread.getState()); 
     thread.interrupt(); 
     //System.out.println("after intterupt() on thread " + thread.getName() + ", it's state is " + thread.getState()); 
     try { 
      Thread.sleep(100); 
     } catch (InterruptedException e) { 
      System.out.println(Thread.currentThread().getName() + " was interrupted trying to sleep after interrupting" + thread.getName() + " " + e); 
     } 
     //System.out.println("before join() on thread " + thread.getName() + ", it's state is " + thread.getState()); 
     try { 
      thread.join(); 
     } catch (InterruptedException e) { 
      System.out.println(Thread.currentThread().getName() + " join interruped"); 
     } 
     //System.out.println(Thread.currentThread().getName() + " after join() on thread " + thread.getName() + ", it's state is" + thread.getState()); 
    }