2011-11-12 41 views
0

我有使用java nio的问题,并希望有很多java nio知识的人可以帮助我澄清一些错误概念。如何让选择器在Java中的socketchannel键更改nio

我正在使用java nio套接字。使用socketchannel.write()可能会填充写入缓冲区。在这种情况下,剩余的缓冲区排队并且密钥更改为OP_WRITE。我的一个场景是队列长度很长。每次调用selector.select()之前,我都会从另一个名为pendingRequest的队列中将密钥更改为OP_WRITE。但是我发现读取过程非常缓慢,在发送处理完成后,有许多消息未写入,并且仍然在队列中。如何处理这个问题?

在我的代码中,我有两个写作的地方。一个来自发生器:当它有消息要发布时,它直接写入频道。如果缓冲区已满,数据将被排队。第二个地方是调度员:当密钥可写入时,它会调用write()来写入排队数据。我猜这两部分可以竞争写作。我只是觉得我的代码缺少一些处理来配合两次写入。

有没有解决上述问题的解决方案?我发现在我的代码中有很多排队的数据不能写出来。当密钥可写时,生成器可能会再次写入数据,这会导致排队的数据变化较少而被写出。如何使这部分正确?感谢

//在WriteListener()中,写入代码被以下三个部分

public synchronized int writeData(EventObject source) {  
    int n = 0; 
    int count = 0; 

    SocketChannel socket = (SocketChannel)source.getSource();  
    ByteBuffer buffer = ((WriteEvent)source).getBuffer(); 
    try { 
     write(socket); 
    } catch (IOException e1) {   
     e1.printStackTrace(); 
    }  

    while (buffer.position()>0) { 
     try {   
       buffer.flip(); 
       n = socket.write(buffer);         
       if(n == 0) { 
         key.interestOps(SelectionKey.OP_WRITE);       synchronized (this.pendingData) { 
          List<ByteBuffer> queue = (List<ByteBuffer>) this.pendingData.get(socket); 
          if(queue == null) { 
           queue = new ArrayList<ByteBuffer>(); 
           this.pendingData.put(socket, queue); 
         } 
         queue.add(buffer); 

         logger.logInfo("queue length:" + queue.size()); 
        }            
        break; 
       }    
       count += n; 

     } catch (IOException e) {    
      e.printStackTrace(); 
     } finally {      
      buffer.compact();    
     } 
    } 

    if(buffer.position()==0) {      
     key.interestOps(SelectionKey.OP_READ);     
    } 
      return count; 

} 

// ====该写入方法是用来写排队缓冲器

public synchronized int write(SocketChannel sc, ByteBuffer wbuf) {   
    int n = 0; 
    int count = 0; 

    SelectionKey key = sc.keyFor(this.dispatcher.getDemultiplexer().getDemux());     
    while (wbuf.position()>0) {  
     try {   
      wbuf.flip();   

      n = sc.write(wbuf);    

      if(n == 0) {  
        key.interestOps(SelectionKey.OP_WRITE);         
        synchronized (this.pendingData) { 
         List<ByteBuffer> queue = (List<ByteBuffer>) this.pendingData.get(sc); 
         if(queue == null) { 
           queue = new ArrayList<ByteBuffer>(); 
           this.pendingData.put(sc, queue); 
         } 
         queue.add(wbuf); 
        } 

        break; 
       }    
       count += n; 

     } catch (IOException e) {    
      e.printStackTrace(); 
     } finally {    

      wbuf.compact();     
     } 
    } 

    if(wbuf.position()==0) {  
     wbuf.clear();    
     key.interestOps(SelectionKey.OP_READ);   
    } 

return n;  
} 

// ====当key.isWritable()为真时,此方法是Dispatch的回调

public void write(SocketChannel socketChannel) throws IOException {   
    SelectionKey key = socketChannel.keyFor(this.dispatcher.getDemultiplexer().getDemux());  
    synchronized (this.pendingData) {    
     List<ByteBuffer> queue = (List<ByteBuffer>) this.pendingData.get(socketChannel);    
     if(queue == null || queue.isEmpty()) {     
      // We wrote away all data, so we're no longer interested     
      // in writing on this socket. Switch back to waiting for data.     
      try {      
       if (key!=null)       
        key.interestOps(SelectionKey.OP_READ);     
      } catch(Exception ex) {      
       if (key!=null)       
        key.cancel();     
       }    
     }   

     // Write until there's not more data ...  
     int n = 0; 
     while (queue != null && !queue.isEmpty()) {     
      ByteBuffer buf = (ByteBuffer) queue.get(0); 
      // zero length write, break the loop and wait for next writable time 
      n = write(socketChannel, buf); 

      logger.logInfo("queue length:" + queue.size() + " used time: " + (t2-t1) + " ms."); 

      if(n==0) {    
       break; 
      } 
         queue.remove(0); 

     }   

} 
+0

请张贴一些代码(最好是[SSCCE](http://sscce.org/)),证明您遇到的问题。 –

+0

最后的方法,即写入队列的方法,总是从队列中删除缓冲区,即使是“if(n == 0)”。你应该在删除缓冲区之前做这个测试并打破*,实际上你应该只在缓冲区为空的时候删除它。目前你正在丢失数据。 – EJP

+0

在我当前的代码中,在测试写入的字节数后删除(0)。目前的代码工作,但性能不如它应该做的那样好。在缓冲区满后的下一个可写时间需要多长时间? – susan

回答

0

如果您有消费者呃这太慢了,唯一的选择可能是断开它们来保护你的服务器。你不想让一个坏消费者影响你的其他客户。

我通常会将发送缓冲区大小增加到如果填充的位置,我关闭连接。这避免了在Java代码中处理未写入数据的复杂性,因为您所做的只是将缓冲区扩大一点点。如果你增加发送缓冲区的大小,你就是在透明地做这个。有可能你甚至不需要使用发送缓冲区大小,缺省值通常是64 KB左右。

+0

我使用以下方法更改socketchannel buffersize。但为什么serverSocket只能设置receivebuffer的大小。socket.socket()。setReceiveBufferSize(256 * 1024); socket.socket()。setSendBufferSize(256 * 1024); – susan

+1

@susan ServerSocket允许您更改接收缓冲区大小,该大小由接受的套接字继承,因此您可以将它们设置为> 64k。如果你试图对接受的套接字这么做,它会失败,因为> 64k需要在连接握手期间协商的TCP“窗口缩放”选项。出于同样的原因,如果您想在客户端套接字上设置接收缓冲大于64k,则必须在连接它之前执行此操作。您可以随时设置发送缓冲区,因为它不需要协议的帮助。 – EJP

+0

@susan更正:它不会'失败',但64k以上的部分将不会被使用,除非在连接之前设置了大小。 – EJP

0
  1. 您必须确保新数据在已写入待处理数据之后才会入队。

  2. 如果行为依然存在,那么您实际上只有两种选择:或者以不当行为为由断开客户端,或者停止为其生成输出,直到待办事项清除。可能都是。

您可以通过熟练的select()超时实现第一个操作。如果select()返回零,则表示要么没有注册的频道,要么在超时期间没有任何事情发生,在这种情况下,您可能想要考虑从全部客户端断开连接。如果你有太多的并发客户端工作太忙,那么你必须跟踪每个频道最后一次被选中的时间,并断开其最后一次活动时间太久的频道。

在该超时期限内,您可能会想要停止生产输出,而他是慢读。

'长期'的确切定义是作为读者的练习而留下的,但十分钟后想到的是第一个近似值。

+0

我非常感谢你的帮助。它通过在生成器侧生成下一条消息之前写出消息来工作。但对于其他进程,如果缓冲区已满,请等待unitl再次有空间再写入。 – susan

相关问题