2014-02-18 75 views
-1

我创建将处理> 1000所连接的服务器。我决定在服务器中使用非阻塞IO。我在互联网上发现了一些代码,它基本上是一个回显服务器。我认为一切都很好,但我不明白服务器中的一些概念。非阻塞IO Java中与逻辑

import java.io.IOException; 
import java.net.InetAddress; 
import java.net.InetSocketAddress; 
import java.net.Socket; 
import java.net.SocketAddress; 
import java.nio.ByteBuffer; 
import java.nio.channels.SelectionKey; 
import java.nio.channels.Selector; 
import java.nio.channels.ServerSocketChannel; 
import java.nio.channels.SocketChannel; 
import java.util.*; 

public class EchoServer { 
    private InetAddress addr; 
    private int port; 
    private Selector selector; 
    private Map<SocketChannel,List<byte[]>> dataMap; 

public EchoServer(InetAddress addr, int port) throws IOException { 
    this.addr = addr; 
    this.port = port; 
    dataMap = new HashMap<SocketChannel,List<byte[]>>(); 
    startServer(); 
} 

private void startServer() throws IOException { 
    // create selector and channel 
    this.selector = Selector.open(); 
    ServerSocketChannel serverChannel = ServerSocketChannel.open(); 
    serverChannel.configureBlocking(false); 

    // bind to port 
    InetSocketAddress listenAddr = new InetSocketAddress(this.addr, this.port); 
    serverChannel.socket().bind(listenAddr); 
    serverChannel.register(this.selector, SelectionKey.OP_ACCEPT); 

    log("Echo server ready. Ctrl-C to stop."); 

    // processing 
    while (true) { 
     // wait for events 
     this.selector.select(); 

     // wakeup to work on selected keys 
     Iterator keys = this.selector.selectedKeys().iterator(); 
     while (keys.hasNext()) { 
      SelectionKey key = (SelectionKey) keys.next(); 

      // this is necessary to prevent the same key from coming up 
      // again the next time around. 
      keys.remove(); 

      if (! key.isValid()) { 
       continue; 
      } 

      if (key.isAcceptable()) { 
       this.accept(key); 
      } 
      else if (key.isReadable()) { 
       this.read(key); 
      } 
      else if (key.isWritable()) { 
       this.write(key); 
      } 
     } 
    } 
} 

private void accept(SelectionKey key) throws IOException { 
    ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel(); 
    SocketChannel channel = serverChannel.accept(); 
    channel.configureBlocking(false); 

    // write welcome message 
    channel.write(ByteBuffer.wrap("Welcome, this is the echo server\r\n".getBytes("US-  ASCII"))); 

    Socket socket = channel.socket(); 
    SocketAddress remoteAddr = socket.getRemoteSocketAddress(); 
    log("Connected to: " + remoteAddr); 

    // register channel with selector for further IO 
    dataMap.put(channel, new ArrayList<byte[]>()); 
    channel.register(this.selector, SelectionKey.OP_READ); 
} 

private void read(SelectionKey key) throws IOException { 
    SocketChannel channel = (SocketChannel) key.channel(); 

    ByteBuffer buffer = ByteBuffer.allocate(8192); 
    int numRead = -1; 
    try { 
     numRead = channel.read(buffer); 
    } 
    catch (IOException e) { 
     e.printStackTrace(); 
    } 

    if (numRead == -1) { 
     this.dataMap.remove(channel); 
     Socket socket = channel.socket(); 
     SocketAddress remoteAddr = socket.getRemoteSocketAddress(); 
     log("Connection closed by client: " + remoteAddr); 
     channel.close(); 
     key.cancel(); 
     return; 
    } 

    byte[] data = new byte[numRead]; 
    System.arraycopy(buffer.array(), 0, data, 0, numRead); 
    log("Got: " + new String(data, "US-ASCII")); 

    // write back to client 
    doEcho(key, data); 
} 

private void write(SelectionKey key) throws IOException { 
    SocketChannel channel = (SocketChannel) key.channel(); 
    List<byte[]> pendingData = this.dataMap.get(channel); 
    Iterator<byte[]> items = pendingData.iterator(); 
    while (items.hasNext()) { 
     byte[] item = items.next(); 
     items.remove(); 
     channel.write(ByteBuffer.wrap(item)); 
    } 
    key.interestOps(SelectionKey.OP_READ); 
} 

private void doEcho(SelectionKey key, byte[] data) { 
    SocketChannel channel = (SocketChannel) key.channel(); 
    List<byte[]> pendingData = this.dataMap.get(channel); 
    pendingData.add(data); 
    key.interestOps(SelectionKey.OP_WRITE); 
} 

private static void log(String s) { 
    System.out.println(s); 
} 

    public static void main(String[] args) throws Exception { 
     new EchoServer(null, 8989); 
    } 
} 

所以对于这段代码,我有几个问题。一,如果我读了10个字节,但我不想做任何事情,直到我读了100个字节,我该如何实现?另外,如果我只想写一个计数器达到一定数量时,我将如何实现该非阻塞?关于这个代码的事情是,无论字节缓冲区有多大,它都会回显。我该如何改变它,只有当它有100个字节时才会回显?我只有在计数器达到一定大小时才能写入数据?谢谢!

将投入read方法工作的,如果(numRead < 100){做休息}其他{}回报的第一个问题? 此外,将投入一个if(计数器> 100){做休息}否则将在第二写入方法工作{}的回报?

回答

0

你必须代码部分,基本上你需要跟踪的字节读取,不断增加读取临时缓冲区的字节,一旦你达到了你的要求限制,您可以通过该缓冲区您的工作线程。

我会建议你使用netty它提供了所有你正在寻找的开箱即用的东西。 看this link

希望这有助于

0

就与非阻塞模式返回-1通道,而读书?对于你的问题,你可以设置ByteBuffer的限制: -

例如: -

ByteBuffer buff = ByteBuffer.allocate(1024); 
buff.clear(); 

buff.limit(your_limit);//is this what you want?? 

while(buff.remaining>0&&channel.read(buff)); // if will reach till your limit only. 

System.out.println(new String(buff.array())); 

希望这有助于