2011-05-02 52 views
4

在传统的阻塞线程的服务器,我会做这样的事情爪哇 - 的readObject与NIO

class ServerSideThread { 

    ObjectInputStream in; 
    ObjectOutputStream out; 
    Engine engine; 

    public ServerSideThread(Socket socket, Engine engine) { 
     in = new ObjectInputStream(socket.getInputStream()); 
     out = new ObjectOutputStream(socket.getOutputStream()); 
     this.engine = engine; 
    } 

    public void sendMessage(Message m) { 
     out.writeObject(m); 
    } 

    public void run() { 
     while(true) { 
      Message m = (Message)in.readObject(); 
      engine.queueMessage(m,this); // give the engine a message with this as a callback 
     } 
    } 
} 

现在,对象可以被预期是相当大的。在我的nio循环中,我不能简单地等待对象通过,所有其他连接(工作量更小)都会等待我。

如何才可以通知连接具有整个对象,然后它会告诉我的nio通道它已准备好?

回答

7

您可以将对象写入ByteArrayOutputStream,以便在发送对象之前给出长度。在接收端,尝试解码之前需要读取所需的数据量。

然而,你可能会发现它更简单,更有效地使用与对象*流阻塞的IO(而不是NIO)


编辑这样的事情

public static void send(SocketChannel socket, Serializable serializable) throws IOException { 
    ByteArrayOutputStream baos = new ByteArrayOutputStream(); 
    for(int i=0;i<4;i++) baos.write(0); 
    ObjectOutputStream oos = new ObjectOutputStream(baos); 
    oos.writeObject(serializable); 
    oos.close(); 
    final ByteBuffer wrap = ByteBuffer.wrap(baos.toByteArray()); 
    wrap.putInt(0, baos.size()-4); 
    socket.write(wrap); 
} 

private final ByteBuffer lengthByteBuffer = ByteBuffer.wrap(new byte[4]); 
private ByteBuffer dataByteBuffer = null; 
private boolean readLength = true; 

public Serializable recv(SocketChannel socket) throws IOException, ClassNotFoundException { 
    if (readLength) { 
     socket.read(lengthByteBuffer); 
     if (lengthByteBuffer.remaining() == 0) { 
      readLength = false; 
      dataByteBuffer = ByteBuffer.allocate(lengthByteBuffer.getInt(0)); 
      lengthByteBuffer.clear(); 
     } 
    } else { 
     socket.read(dataByteBuffer); 
     if (dataByteBuffer.remaining() == 0) { 
      ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(dataByteBuffer.array())); 
      final Serializable ret = (Serializable) ois.readObject(); 
      // clean up 
      dataByteBuffer = null; 
      readLength = true; 
      return ret; 
     } 
    } 
    return null; 
} 
+0

阻塞IO不会缩放。由于堆栈线程大小在整个JVM中都是不变的,因此每个连接线程将与我的引擎线程(这将需要相当大)相同的堆栈大小。因此,即使只有3000个连接,这也使我处于384Mb只是为了连接。这不会为1Gb盒子上的系统,堆和数据库留下太多空间。 cpu的需求将会足够小,以至于我可以轻松处理超过10000个连接,从而限制内存使用量。 – corsiKa 2011-05-02 22:18:14

+0

因此,在我发送大量数据首先读取的情况下,我怎么知道我排队了多少数据直到...我读了它? – corsiKa 2011-05-02 22:19:41

+1

对每个对象(或对象组)使用一个新的ObjectOutputStream(新的ByteArrayOutputStream)这样你就没有数据排队了,你把字节数组的长度发送为一个int,接着是字节数组。 ,你需要读取4字节的最小值,这将给你对于该对象的数据的其余部分的长度 – 2011-05-02 22:25:23

2

的灵感上面的代码我创建了一个(GoogleCode project

它包括一个简单的单元测试:

SeriServer server = new SeriServer(6001, nthreads); 
final SeriClient client[] = new SeriClient[nclients]; 

//write the data with multiple threads to flood the server 

for (int cnt = 0; cnt < nclients; cnt++) { 
    final int counterVal = cnt; 
    client[cnt] = new SeriClient("localhost", 6001); 
    Thread t = new Thread(new Runnable() { 
     public void run() { 
      try { 
       for (int cnt2 = 0; cnt2 < nsends; cnt2++) { 
        String msg = "[" + counterVal + "]";      
        client[counterVal].send(msg); 
       } 
      } catch (IOException e) { 
       e.printStackTrace(); 
       fail(); 
      } 
     } 
     }); 
    t.start(); 
} 

HashMap<String, Integer> counts = new HashMap<String, Integer>(); 
    int nullCounts = 0; 
    for (int cnt = 0; cnt < nsends * nclients;) { 
     //read the data from a vector (that the server pool automatically fills 
     SeriDataPackage data = server.read(); 
     if (data == null) { 
       nullCounts++; 
       System.out.println("NULL"); 
       continue; 
     } 

     if (counts.containsKey(data.getObject())) { 
       Integer c = counts.get(data.getObject()); 
       counts.put((String) data.getObject(), c + 1); 
     } else { 
       counts.put((String) data.getObject(), 1); 
     } 
     cnt++; 
     System.out.println("Received: " + data.getObject()); 
    } 

    // asserts the results 
    Collection<Integer> values = counts.values(); 
    for (Integer value : values) { 
     int ivalue = value; 
     assertEquals(nsends, ivalue); 
     System.out.println(value); 
    } 
    assertEquals(counts.size(), nclients); 
    System.out.println(counts.size()); 
    System.out.println("Finishing"); 
    server.shutdown();