我有使用java nio的问题,并希望有很多java nio知识的人可以帮助我澄清一些错误概念。如何让选择器在Java中的socketchannel键更改nio
我正在使用java nio套接字。使用socketchannel.write()可能会填充写入缓冲区。在这种情况下,剩余的缓冲区排队并且密钥更改为OP_WRITE。我的一个场景是队列长度很长。每次调用selector.select()之前,我都会从另一个名为pendingRequest的队列中将密钥更改为OP_WRITE。但是我发现读取过程非常缓慢,在发送处理完成后,有许多消息未写入,并且仍然在队列中。如何处理这个问题?
在我的代码中,我有两个写作的地方。一个来自发生器:当它有消息要发布时,它直接写入频道。如果缓冲区已满,数据将被排队。第二个地方是调度员:当密钥可写入时,它会调用write()来写入排队数据。我猜这两部分可以竞争写作。我只是觉得我的代码缺少一些处理来配合两次写入。
有没有解决上述问题的解决方案?我发现在我的代码中有很多排队的数据不能写出来。当密钥可写时,生成器可能会再次写入数据,这会导致排队的数据变化较少而被写出。如何使这部分正确?感谢
//在WriteListener()中,写入代码被以下三个部分
public synchronized int writeData(EventObject source) {
int n = 0;
int count = 0;
SocketChannel socket = (SocketChannel)source.getSource();
ByteBuffer buffer = ((WriteEvent)source).getBuffer();
try {
write(socket);
} catch (IOException e1) {
e1.printStackTrace();
}
while (buffer.position()>0) {
try {
buffer.flip();
n = socket.write(buffer);
if(n == 0) {
key.interestOps(SelectionKey.OP_WRITE); synchronized (this.pendingData) {
List<ByteBuffer> queue = (List<ByteBuffer>) this.pendingData.get(socket);
if(queue == null) {
queue = new ArrayList<ByteBuffer>();
this.pendingData.put(socket, queue);
}
queue.add(buffer);
logger.logInfo("queue length:" + queue.size());
}
break;
}
count += n;
} catch (IOException e) {
e.printStackTrace();
} finally {
buffer.compact();
}
}
if(buffer.position()==0) {
key.interestOps(SelectionKey.OP_READ);
}
return count;
}
// ====该写入方法是用来写排队缓冲器
public synchronized int write(SocketChannel sc, ByteBuffer wbuf) {
int n = 0;
int count = 0;
SelectionKey key = sc.keyFor(this.dispatcher.getDemultiplexer().getDemux());
while (wbuf.position()>0) {
try {
wbuf.flip();
n = sc.write(wbuf);
if(n == 0) {
key.interestOps(SelectionKey.OP_WRITE);
synchronized (this.pendingData) {
List<ByteBuffer> queue = (List<ByteBuffer>) this.pendingData.get(sc);
if(queue == null) {
queue = new ArrayList<ByteBuffer>();
this.pendingData.put(sc, queue);
}
queue.add(wbuf);
}
break;
}
count += n;
} catch (IOException e) {
e.printStackTrace();
} finally {
wbuf.compact();
}
}
if(wbuf.position()==0) {
wbuf.clear();
key.interestOps(SelectionKey.OP_READ);
}
return n;
}
// ====当key.isWritable()为真时,此方法是Dispatch的回调
public void write(SocketChannel socketChannel) throws IOException {
SelectionKey key = socketChannel.keyFor(this.dispatcher.getDemultiplexer().getDemux());
synchronized (this.pendingData) {
List<ByteBuffer> queue = (List<ByteBuffer>) this.pendingData.get(socketChannel);
if(queue == null || queue.isEmpty()) {
// We wrote away all data, so we're no longer interested
// in writing on this socket. Switch back to waiting for data.
try {
if (key!=null)
key.interestOps(SelectionKey.OP_READ);
} catch(Exception ex) {
if (key!=null)
key.cancel();
}
}
// Write until there's not more data ...
int n = 0;
while (queue != null && !queue.isEmpty()) {
ByteBuffer buf = (ByteBuffer) queue.get(0);
// zero length write, break the loop and wait for next writable time
n = write(socketChannel, buf);
logger.logInfo("queue length:" + queue.size() + " used time: " + (t2-t1) + " ms.");
if(n==0) {
break;
}
queue.remove(0);
}
}
请张贴一些代码(最好是[SSCCE](http://sscce.org/)),证明您遇到的问题。 –
最后的方法,即写入队列的方法,总是从队列中删除缓冲区,即使是“if(n == 0)”。你应该在删除缓冲区之前做这个测试并打破*,实际上你应该只在缓冲区为空的时候删除它。目前你正在丢失数据。 – EJP
在我当前的代码中,在测试写入的字节数后删除(0)。目前的代码工作,但性能不如它应该做的那样好。在缓冲区满后的下一个可写时间需要多长时间? – susan