2013-07-25 34 views
2

我有一个到服务器的TCP连接,由套接字和流实现。在会话期间,服务器可以发送任意数量的消息 - 我必须阅读并处理所有消息。从TCP套接字读取最有效的方法

我创建一个线程,检查和在一个无限循环中读取数据:

in = socket.getInputStream(); 
ByteArrayOutputStream baos = null; 
byte[] buf = new byte[4096]; 
while(!isInterrupted()) { 
    baos = new ByteArrayOutputStream(); 
    for(int s; (s = in.read(buf)) != -1;) { 
     baos.write(buf, 0, s); 
     if(in.available() <= 0) { 
      readChunk(baos.toByteArray()); 
     } 
    } 
} 

但实际上,它是没有效率的 - 它把CPU高负载下,并以某种方式字节与前粘在一起回答。

什么是解决这种情况最有效和最优雅的方法?

+0

如果不知道您收到的消息类型,很难给出答案。它是文本吗?,二进制数据?是固定长度吗?你也编程服务器?在大多数情况下需要一个协议。 –

+0

@ja_mesa它是二进制数据。前四个字节用于写入消息长度,后四个字节用于CRC32校验和。 – bvitaliyg

回答

6

TCP不是面向消息的,它是面向流的。这意味着如果您发送两条消息AA和BB,则可以在不同场合读取值AABB,A A B B,A ABB,AAB B,AA BB(空格表示不同的读取尝试)。

您将需要自己处理消息大小或消息分隔符,因此不再需要in.available()。此外,您的代码至少将相同的数据复制到不同的缓冲区3次,并考虑在socket.getInputStream()中使用BufferedInputStream。

+0

@bvitaliyg添加到此答案,如果效率至关重要尝试使用Java nio。见[这里](https://blogs.oracle.com/slc/entry/javanio_vs_javaio)。 – linski

+0

总消息长度以前四个字节传输。读取消息时可以使用它吗? – bvitaliyg

+0

当然。首先读取消息长度,分配该大小的缓冲区,将整个消息读入该缓冲区。如果需要并行执行,请使用阻塞I/O和线程。如果你真的希望它是高吞吐量的,使用现成的消息系统,它们很难做到正确。另外 - 你的目标是多大的吞吐量? –

2

删除available()调用。 InputStream.available()不是一个有效的流结束检查,它在文档中是这样说的。它也永远不会返回一个负值。另外readChunk()方法是应该进行阅读的方法。此外,TCP中没有消息,因此使用available()或任何其他技术来识别它们是无效的。

编辑

你说在其他赞扬你有一个计数前缀。使用它。与DataInputStream.readInt()读它,分配byte[]数组的大小,并与DataInputStream.readFully():

int len = din.readInt(); 
byte[] message = new byte[len]; 
din.readFully(message); 
+0

没有'available()'读周期('read()')永远不会结束,它返回零。\t 尽管嗅探器显示消息的这一端。 – bvitaliyg

+0

我重复一遍。 'available()'不是检测消息结束的有效技术。 – EJP

0

根据你所说的关于该消息的什么补它,这是一个办法做到这一点:

in = socket.getInputStream(); 
byte[] buff = new byte[4096]; 
int packLen=0; 
int ret=0; 
while(!isInterrupted()) { 
    int offset=0; 
    int bLeft=4; 
    // 99% of the times the read will return 4 bytes, 
    // but just in case, put it in a loop. 
    while (bLeft > 0) { 
     ret = in.read(buff, offset, bLeft); 
     if (ret > 0) { 
      bLeft-=ret; 
      offset+=ret; 
     } 
     else if (ret == 0) { 
      // socket has been closed 
     } 
     else { 
      // soket has an error 
     } 

    } 
    // convert the 4 bytes to an int, depends on the way it's was sent 
    // this method is used frecuently 
    packLen = (int)((buff[0] & 0xff) << 24) | 
       (int)((buff[1] & 0xff) << 16) | 
       (int)((buff[2] & 0xff) << 8) | 
       (int)(buff[3] & 0xff); 

    // if the 4 bytes of the CRC32 is not included in the length, 
    // increment the length 
    packLen+=4; 
    offset=4; 
       if (packLen > 4092) 
       { 
        // packet is too big, ignore it or do something else 
        packLen=4092; 
       } 
    bLeft=packLen; 
    // Noew loop until the whole mesage has been read 
    while (bLeft > 0) { 
     ret = in.read(buff, offset, bLeft); 
     if (ret > 0) { 
      bLeft-=ret; 
      offset+=ret; 
     } 
     else if (ret == 0) { 
      // socket has been closed 
     } 
     else { 
      // soket has an error 
     } 
    } 
    // the readChunk function must be change 
    // Need to pass the length of the message. 
    // Is not the buff.length anymore 
    readChunk(buff, packLen+4 /* +4 for the length of the message*/); 
} 

如果您需要一个Java CRC32类,我可以给它,它符合PKZIP和以太网标准。

编辑: 备注: 如果数据包长度大于4096,则此方法将不起作用。

+0

'DataInputStream.readInt()'和'readFully()'已经完成了所有这些。 – EJP