2011-05-11 125 views
6

我正在通过Java套接字实现一个面向事件的层,我在想是否有一种方法来确定是否有数据等待读取。Java网络:连接Socket/InputStream

我通常的做法是从套接字读入缓冲区,并在缓冲区填充给定数量的字节时调用提供的回调函数(可能为0,如果每次调用回调需要触发到达),但我怀疑Java已经在为我做缓冲了。

InputStream的available()方法对此可靠吗?我应该只是read()并在Socket上做我自己的缓冲?还是有另一种方式?

回答

8

很快就放了,没有。 available()不可靠(至少它不适合我)。我推荐使用java.nio.channels.SocketChannel连接SelectorSelectionKey。这个解决方案有点基于事件,但比简单的套接字更复杂。

对于客户端:

  1. 构建体插座通道(socket),打开一个选择器(selector = Selector.open();)。
  2. 使用非阻塞socket.configureBlocking(false);
  3. 注册选择用于连接socket.register(selector, SelectionKey.OP_CONNECT);
  4. 连接socket.connect(new InetSocketAddress(host, port));
  5. 看看是否有什么新selector.select();
  6. 如果“新”指的是连接成功,为OP_READ注册的选择;如果“新”指的是可用的数据,只需从套接字读取即可。

但是,为了让它是异步的,你需要设置一个单独的线程(尽管套接字被创建为非阻塞,线程仍然会阻塞)来检查是否已经到达。

对于服务器,有ServerSocketChannel,您使用OP_ACCEPT

供参考,这是我的代码(客户端),应该给你一个提示:

private Thread readingThread = new ListeningThread(); 

/** 
    * Listening thread - reads messages in a separate thread so the application does not get blocked. 
    */ 
private class ListeningThread extends Thread { 
    public void run() { 
    running = true; 
    try { 
    while(!close) listen(); 
    messenger.close(); 
    } 
    catch(ConnectException ce) { 
    doNotifyConnectionFailed(ce); 
    } 
    catch(Exception e) { 
// e.printStackTrace(); 
    messenger.close(); 
    } 
    running = false; 
    } 
} 

/** 
    * Connects to host and port. 
    * @param host Host to connect to. 
    * @param port Port of the host machine to connect to. 
    */ 
public void connect(String host, int port) { 
    try { 
    SocketChannel socket = SocketChannel.open(); 
    socket.configureBlocking(false); 
    socket.register(this.selector, SelectionKey.OP_CONNECT); 
    socket.connect(new InetSocketAddress(host, port)); 
    } 
    catch(IOException e) { 
    this.doNotifyConnectionFailed(e); 
    } 
} 

/** 
    * Waits for an event to happen, processes it and then returns. 
    * @throws IOException when something goes wrong. 
    */ 
protected void listen() throws IOException { 
    // see if there are any new things going on 
    this.selector.select(); 
    // process events 
    Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); 
    while(iter.hasNext()) { 
    SelectionKey key = iter.next(); 
    iter.remove(); 
    // check validity 
    if(key.isValid()) { 
    // if connectable... 
    if(key.isConnectable()) { 
    // ...establish connection, make messenger, and notify everyone 
    SocketChannel client = (SocketChannel)key.channel(); 
    // now this is tricky, registering for OP_READ earlier causes the selector not to wait for incoming bytes, which results in 100% cpu usage very, very fast 
    if(client!=null && client.finishConnect()) { 
     client.register(this.selector, SelectionKey.OP_READ); 
    } 
    } 
    // if readable, tell messenger to read bytes 
    else if(key.isReadable() && (SocketChannel)key.channel()==this.messenger.getSocket()) { 
    // read message here 
    } 
    } 
    } 
} 

/** 
    * Starts the client. 
    */ 
public void start() { 
    // start a reading thread 
    if(!this.running) { 
    this.readingThread = new ListeningThread(); 
    this.readingThread.start(); 
    } 
} 

/** 
    * Tells the client to close at nearest possible moment. 
    */ 
public void close() { 
    this.close = true; 
} 

而对于服务器:

/** 
    * Constructs a server. 
    * @param port Port to listen to. 
    * @param protocol Protocol of messages. 
    * @throws IOException when something goes wrong. 
    */ 
public ChannelMessageServer(int port) throws IOException { 
    this.server = ServerSocketChannel.open(); 
    this.server.configureBlocking(false); 
    this.server.socket().bind(new InetSocketAddress(port)); 
    this.server.register(this.selector, SelectionKey.OP_ACCEPT); 
} 

/** 
    * Waits for event, then exits. 
    * @throws IOException when something goes wrong. 
    */ 
protected void listen() throws IOException { 
    // see if there are any new things going on 
    this.selector.select(); 
    // process events 
    Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); 
    while(iter.hasNext()) { 
    SelectionKey key = iter.next(); 
    // do something with the connected socket 
    iter.remove(); 
    if(key.isValid()) this.process(key); 
    } 
} 

/** 
    * Processes a selection key. 
    * @param key SelectionKey. 
    * @throws IOException when something is wrong. 
    */ 
protected void process(SelectionKey key) throws IOException { 
    // if incoming connection 
    if(key.isAcceptable()) { 
    // get client 
    SocketChannel client = (((ServerSocketChannel)key.channel()).accept()); 
    try { 
    client.configureBlocking(false); 
    client.register(this.selector, SelectionKey.OP_READ); 
    } 
    catch(Exception e) { 
    // catch 
    } 
    } 
    // if readable, tell messenger to read 
    else if(key.isReadable()) { 
    // read 
    } 
} 

希望这有助于。

+0

我不明白。你不需要一个单独的线程。非阻塞套接字不会按照定义进行阻塞。只要正确使用OP_READ,并且在读取时停止的正确读取循环返回零。 – EJP

+0

@EJP:不反对;不过在我看来,无论阻塞如何,即使没有任何可读的内容,仍然会阻止来自套接字的读取。但可能是我做错了。我建议提问者按照你的说法去尝试,如果它不起作用 - 尝试线程。 – Sorrow

+0

你几乎可以肯定的是循环,而read()返回零。这就是我提到它的原因。这不是阻塞,这是循环。 – EJP

0

available()只会告诉你,如果你可以读取数据而不去操作系统。它在这里不是很有用。

您可以根据自己的喜好进行封锁或非封锁阅读。当没有要读取的数据时,非阻塞读取会返回,这可能是您想要的。

+2

错误。 available()会告诉你BufferedInputStream/BufferedReader中的数据总和(如果你使用的是)以及套接字接收缓冲区,它是一个内核数据结构。如果数据只在套接字接收缓冲区中,那么您必须'进入操作系统'来获取它,但在这个过程中您不会*阻止*。正如Javadoc所说。但是,如果它是例如SSLSocket,则available()始终返回零。 – EJP