2013-04-03 31 views
0

我在下面的代码中遇到了java.nio.channels.NotYetConnectedException,因为我试图写入尚未打开的通道。正确的方式来净化netty中的客户端通道?

本质上,我拥有的是一个通道池,在该通道池中,如果某个通道闲置,我可以通过该通道池进行写入,并且如果无法使用,则会创建一个新通道。我的问题是,当我创建一个新频道时,当我调用连接时,频道没有准备好写入,并且我不想在返回之前等待连接打开,因为我不想阻止该线程。什么是最好的方法来做到这一点?另外,我的检索/返回渠道的逻辑是否有效?见下面的代码。

我有一个简单的连接池类似如下:

private static class ChannelPool { 
    private final ClientBootstrap cb; 
    private Set<Channel> activeChannels = new HashSet<Channel>(); 
    private Deque<Channel> freeChannels = new ArrayDeque<Channel>(); 
    public ChannelPool() { 
     ChannelFactory clientFactory = 
       new NioClientSocketChannelFactory(
         Executors.newCachedThreadPool(), 
         Executors.newCachedThreadPool()); 
     cb = new ClientBootstrap(clientFactory); 
     cb.setPipelineFactory(new ChannelPipelineFactory() { 
      public ChannelPipeline getPipeline() { 
       return Channels.pipeline(
         new HttpRequestEncoder(), 
         new HttpResponseDecoder(), 
         new ResponseHandler()); 
      } 
     }); 
    } 

    private Channel newChannel() { 
     ChannelFuture cf; 
     synchronized (cb) { 
      cf = cb.connect(new InetSocketAddress("localhost", 18080)); 
     } 
     final Channel ret = cf.getChannel(); 
     ret.getCloseFuture().addListener(new ChannelFutureListener() { 

      @Override 
      public void operationComplete(ChannelFuture arg0) throws Exception { 
       System.out.println("channel closed?"); 
       synchronized (activeChannels) { 
        activeChannels.remove(ret); 
       } 
      } 
     }); 
     synchronized (activeChannels) { 
      activeChannels.add(ret); 
     } 
     System.out.println("returning new channel"); 
     return ret; 
    } 

    public Channel getFreeChannel() { 
     synchronized (freeChannels) { 
      while (!freeChannels.isEmpty()) { 
       Channel ch = freeChannels.pollFirst(); 
       if (ch.isOpen()) { 
        return ch; 
       } 
      } 
     } 
     return newChannel(); 
    } 

    public void returnChannel(Channel ch) { 
     synchronized (freeChannels) { 
      freeChannels.addLast(ch); 
     } 
    } 
} 

我试图用这个处理程序中,如下所示:

private static class RequestHandler extends SimpleChannelHandler { 

    @Override 
    public void messageReceived(ChannelHandlerContext ctx, final MessageEvent e) { 
     final HttpRequest request = (HttpRequest) e.getMessage(); 
     Channel proxyChannel = pool.getFreeChannel(); 
     proxyToClient.put(proxyChannel, e.getChannel()); 
     proxyChannel.write(request); 
    } 
} 
+0

看看http://stackoverflow.com/questions/30748424/how-to-create-channel-pool-in-netty – Peeyush 2015-06-11 14:29:41

回答

1

不会立即增加新的信道activeChannelsbootstrap.connect(..)之后,您必须将侦听器添加到bootstrap.connect(..)返回的ChannelFuture,并将该频道添加到添加的侦听器中的activeChannels。这样,getFreeChannel()将永远不会获得尚未连接的频道。

因为很可能activeChannels是空的,即使你叫newChannel()newChannel()将返回建立连接甚至更早),你必须决定如何在这种情况下做的。如果我是你,我会将getFreeChannel()的返回类型从Channel更改为ChannelFuture,以便在空闲频道准备就绪时收到通知。

+0

我不认为你有任何想法,为什么我仍然得到'连接重置由同行'(见http://stackoverflow.com/questions/15818767/what-events-do-i-need-to-listen-to-in-order-to-reuse-a-client-connection-in-净价)。 – jonderry 2013-04-04 22:29:14