2012-06-01 87 views
6

我使用this教程来构建一个没有可写部分的java nio服务器。使用Java的异步服务器NIO

一切工作正常,但有一个有趣的事:

  • 当客户端发送数据包太快,服务器无法接收所有邮件,服务器始终获得第一和第二数据包,但不比那更多的。
  • 如果客户端缓慢发送数据包,服务器将获取所有数据包。

任何想法?

我正在添加服务器类代码,如果您需要在下面的代码中提到的另一个类,我在这里:)。

NIOServer类:

package server; 
import java.io.IOException; 
import java.net.InetAddress; 
import java.net.InetSocketAddress; 
import java.net.Socket; 
import java.nio.ByteBuffer; 
import java.nio.channels.SelectionKey; 
import java.nio.channels.Selector; 
import java.nio.channels.ServerSocketChannel; 
import java.nio.channels.SocketChannel; 
import java.nio.channels.spi.SelectorProvider; 
import java.util.*; 

import javax.xml.parsers.ParserConfigurationException; 

import org.xml.sax.SAXException; 

public class NioServer implements Runnable { 



// The host:port combination to listen on 
    private InetAddress hostAddress; 
    private int port; 

    // The channel on which we'll accept connections 
    private ServerSocketChannel serverChannel; 

    // The selector we'll be monitoring 
    private Selector selector; 

    //the cach will hundle the messages that came 
    private Cache cache; 

    // The buffer into which we'll read data when it's available 
    private ByteBuffer readBuffer = ByteBuffer.allocate(8192); 

    public NioServer(InetAddress hostAddress, int port , Cache cache) throws IOException { 
    this.cache = cache; 
    this.hostAddress = hostAddress; 
    this.port = port; 
    this.selector = this.initSelector(); 
    } 


    private Selector initSelector() throws IOException { 
     // Create a new selector 
     Selector socketSelector = SelectorProvider.provider().openSelector(); 

     // Create a new non-blocking server socket channel 
     this.serverChannel = ServerSocketChannel.open(); 
     serverChannel.configureBlocking(false); 

     // Bind the server socket to the specified address and port 
     InetSocketAddress isa = new InetSocketAddress(this.hostAddress, this.port); 
     serverChannel.socket().bind(isa); 

     // Register the server socket channel, indicating an interest in 
     // accepting new connections 
     serverChannel.register(socketSelector, SelectionKey.OP_ACCEPT); 

     return socketSelector; 
     } 

    private void accept(SelectionKey key) throws IOException { 
     // For an accept to be pending the channel must be a server socket channel. 
     ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel(); 

     // Accept the connection and make it non-blocking 
     SocketChannel socketChannel = serverSocketChannel.accept(); 
     Socket socket = socketChannel.socket(); 
     socketChannel.configureBlocking(false); 

     // Register the new SocketChannel with our Selector, indicating 
     // we'd like to be notified when there's data waiting to be read 
     socketChannel.register(this.selector, SelectionKey.OP_READ); 
     } 

    private void read(SelectionKey key) throws IOException { 
     SocketChannel socketChannel = (SocketChannel) key.channel(); 

     // Clear out our read buffer so it's ready for new data 
     this.readBuffer.clear(); 

     // Attempt to read off the channel 
     int numRead; 
     try { 
      numRead = socketChannel.read(this.readBuffer); 
      String test = new String(this.readBuffer.array()); 
      System.out.print(test); 

     } catch (IOException e) { 
      // The remote forcibly closed the connection, cancel 
      // the selection key and close the channel. 
     // key.cancel(); 
     // socketChannel.close(); 
      return; 
     } 

     if (numRead == -1) { 
      // Remote entity shut the socket down cleanly. Do the 
      // same from our end and cancel the channel. 
      key.channel().close(); 
      key.cancel(); 
      return; 
     } 

     // Hand the data off to our worker thread 
     this.cache.processData(this, socketChannel, this.readBuffer.array(), numRead); 
     } 

    public void run() { 
     while (true) { 
      try { 
      // Wait for an event one of the registered channels 

      this.selector.select(); 



      // Iterate over the set of keys for which events are available 
      Iterator selectedKeys = this.selector.selectedKeys().iterator(); 
      while (selectedKeys.hasNext()) { 
       SelectionKey key = (SelectionKey) selectedKeys.next(); 
       selectedKeys.remove(); 

       if (!key.isValid()) { 
       continue; 
       } 

       // Check what event is available and deal with it 
       if (key.isAcceptable()) { 
       this.accept(key); 
       } else if (key.isReadable()) { 
       this.read(key); 
       } 
      } 
      } catch (Exception e) { 
      e.printStackTrace(); 
      } 
     } 
     } 

    public static void main(String[] args) throws ParserConfigurationException, SAXException { 
    try { 
     Cache cache = new Cache(); 
     new Thread(cache).start(); 
     new Thread(new NioServer(null, 9090,cache)).start(); 
    } catch (IOException e) { 
     e.printStackTrace(); 
    } 
    } 
+3

代码中必须有一个错误。如果您需要更多帮助,请提供更多信息。 –

+0

我没有现在的代码,我将在星期天。谢谢 –

+3

TCP不会丢失数据,NIO也不会丢失数据。你要么不读取所有的数据,要么丢掉一些数据。如果没有一些代码发表评论,就不可能进一步评论。 – EJP

回答

1

我希望,如果你在读UDP。请注意您在read方法上处理数据包的速度有多慢。您正在将它们打印到system.out,这是非常缓慢的,并且不确定您能够以多快的速度将数据处理到processData方法上的其他线程。 This library我写的可以帮助你做线程间非阻塞通信,如果这是你的滞后来源。您还应该检查基础读取套接字缓冲区的大小。数据包越大越好,在数据包开始丢失之前,你必须快速并且赶上。对于TCP,如果底层套接字缓冲区已满,您可能会在通道上收到IOException。对于UDP,数据包将悄悄丢弃。

要访问底层读取套接字缓冲区大小,你可以这样做:

final Socket socket = channel.socket(); 
System.out.println(socket.getReceiveBufferSize()); 
socket.setReceiveBufferSize(newSize); 

注:据我所知,Linux的可能需要为了某些操作系统配置为您更改底层的缓冲区大小。如果setReceiveBufferSize没有效果(再次阅读它,看看它是否被改变),谷歌关于它。 :)