2017-02-15 68 views
0

我正在学习Netty并原型化一个通过TCP发送对象的简单应用程序。我的问题是,当我从服务器端以我的消息呼叫Channel.write时,似乎无法到达处理程序中。当我从客户端发送消息到服务器时,它按预期工作。Netty通道写不到处理程序

这是代码。

服务器:

public class Main {  
    private int serverPort; 

    private EventLoopGroup bossGroup; 
    private EventLoopGroup workerGroup; 

    private ServerBootstrap boot; 
    private ChannelFuture future; 

    private SomeDataChannelDuplexHandler duplex; 

    private Channel ch; 

    public Main(int serverPort) { 
     this.serverPort = serverPort; 
    } 

    public void initialise() {  
     boot = new ServerBootstrap();  
     bossGroup = new NioEventLoopGroup(); 
     workerGroup = new NioEventLoopGroup(); 

     boot.group(bossGroup, workerGroup) 
      .channel(NioServerSocketChannel.class) 
      .childHandler(new ChannelInitializer<SocketChannel>() { 
       @Override 
       public void initChannel(SocketChannel ch) throws Exception { 
        ch.pipeline().addLast("idleStateHandler", new IdleStateHandler(0, 0, 2)); 

        // Inbound 
        ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(65535, 0, 2, 0, 0)); 
        ch.pipeline().addLast(new SomeDataDecoder()); 

        // Outbound 
        ch.pipeline().addLast(new LengthFieldPrepender(2)); 
        ch.pipeline().addLast(new SomeDataEncoder()); 

        // In-Out 
        ch.pipeline().addLast(new SomeDataChannelDuplexHandler()); 
       } 
      })  
      .option(ChannelOption.SO_BACKLOG, 128) 
      .childOption(ChannelOption.SO_KEEPALIVE, true); 
    } 

    public void sendMessage() { 
     SomeData fd = new SomeData("hello", "localhost", 1234);  
     ChannelFuture future = ch.writeAndFlush(fd);   
     future.addListener(new ChannelFutureListener() { 
      @Override 
      public void operationComplete(ChannelFuture future) throws Exception { 
       if (!future.isSuccess()) { 
        System.out.println("send error: " + future.cause().toString()); 
       } else { 
        System.out.println("send message ok"); 
       } 
      } 
     }); 
    } 

    public void startServer(){ 
     try { 
      future = boot.bind(serverPort) 
        .sync() 
        .addListener(new ChannelFutureListener() { 
         @Override 
         public void operationComplete(ChannelFuture future) throws Exception { 
          ch = future.channel(); 
         } 
      }); 
     } catch (InterruptedException e) { 
      // log failure 
     } 
    } 

    public void stopServer() { 
     workerGroup.shutdownGracefully() 
      .addListener(e -> System.out.println("workerGroup shutdown")); 

     bossGroup.shutdownGracefully() 
      .addListener(e -> System.out.println("bossGroup shutdown")); 
    } 

    public static void main(String[] args) throws InterruptedException { 

     Main m = new Main(5000); 

     m.initialise(); 
     m.startServer(); 

     final Scanner scanner = new Scanner(System.in); 

     System.out.println("running."); 

     while (true) { 

      final String input = scanner.nextLine(); 

      if ("q".equals(input.trim())) { 
       break; 
      } else { 
       m.sendMessage(); 
      } 
     } 

     scanner.close(); 
     m.stopServer(); 
    } 
} 

的双工信道的处理程序:

public class SomeDataChannelDuplexHandler extends ChannelDuplexHandler { 

    @Override 
    public void channelActive(ChannelHandlerContext ctx) { 
     System.out.println("duplex channel active"); 
     ctx.fireChannelActive(); 
    } 

    @Override 
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 
     System.out.println("duplex channelRead"); 
     if (msg instanceof SomeData) { 
      SomeData sd = (SomeData) msg; 
      System.out.println("received: " + sd); 
     } else { 
      System.out.println("some other object"); 
     } 
     ctx.fireChannelRead(msg); 
    } 

    @Override 
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { 
     cause.printStackTrace(); 
     ctx.close(); 
    } 

    @Override 
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { 
     if (evt instanceof IdleStateEvent) { 
      IdleStateEvent event = (IdleStateEvent) evt; 
      if (event.state() == IdleState.ALL_IDLE) { // idle for no read and write 
       System.out.println("idle: " + event.state()); 
      } 
     } 
    } 
} 

最后的编码器(解码器是相似的):

public class SomeDataEncoder extends MessageToByteEncoder<SomeData> { 

    @Override 
    protected void encode(ChannelHandlerContext ctx, SomeData msg, ByteBuf out) throws Exception { 

     System.out.println("in encoder, msg = " + msg); 
     ByteArrayOutputStream bos = new ByteArrayOutputStream(); 
     ObjectOutputStream oos = new ObjectOutputStream(bos); 

     oos.writeObject(msg.getName()); 
     oos.writeObject(msg.getIp()); 
     oos.writeInt(msg.getPort()); 
     oos.close(); 

     byte[] serialized = bos.toByteArray(); 
     int size = serialized.length; 

     ByteBuf encoded = ctx.alloc().buffer(size); 
     encoded.writeBytes(bos.toByteArray()); 

     out.writeBytes(encoded); 
    } 
} 

客户端:

public class Client { 

    String host = "10.188.36.66"; 
    int port = 5000; 

    EventLoopGroup workerGroup = new NioEventLoopGroup(); 
    ChannelFuture f; 
    private Channel ch; 

    public Client() { 
    } 

    public void startClient() throws InterruptedException { 
     Bootstrap boot = new Bootstrap(); 
     boot.group(workerGroup); 
     boot.channel(NioSocketChannel.class); 
     boot.option(ChannelOption.SO_KEEPALIVE, true); 
     boot.handler(new ChannelInitializer<SocketChannel>() { 
      @Override 
      public void initChannel(SocketChannel ch) throws Exception {    
       // Inbound 
       ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(65535, 0, 2, 0, 0)); 
       ch.pipeline().addLast(new SomeDataDecoder()); 

       // Outbound 
       ch.pipeline().addLast(new LengthFieldPrepender(2)); 
       ch.pipeline().addLast(new SomeDataEncoder()); 

       // Handler 
       ch.pipeline().addLast(new SomeDataHandler()); 
      } 
     }); 

     // Start the client 
     f = boot.connect(host, port).sync(); 
     f.addListener(new ChannelFutureListener() { 
      public void operationComplete(ChannelFuture future) throws Exception { 
       System.out.println("connected to server"); 
       ch = f.channel(); 
      } 
     }); 
    } 

    public void stopClient() {  
     workerGroup.shutdownGracefully(); 
    } 

    private void writeMessage(String input) { 
     SomeData data = new SomeData("client", "localhost", 3333); 
     ChannelFuture fut = ch.writeAndFlush(data); 
     fut.addListener(new ChannelFutureListener() { 
      @Override 
      public void operationComplete(ChannelFuture future) throws Exception { 
       System.out.println("send message"); 
      } 
     }); 
    } 

    public static void main(String[] args) throws InterruptedException { 
     Client client = new Client(); 
     client.startClient();   

     System.out.println("running.\n\n"); 
     final Scanner scanner = new Scanner(System.in); 

     while (true) { 

      final String input = scanner.nextLine(); 

      if ("q".equals(input.trim())) { 
       break; 
      } else { 
       client.writeMessage(input); 
      } 
     } 

     scanner.close(); 
     client.stopClient(); //call this at some point to shutdown the client 
    } 

} 

和处理程序:

public class SomeDataHandler extends SimpleChannelInboundHandler<SomeData> { 

    private ChannelHandlerContext ctx; 

    @Override 
    public void channelActive(ChannelHandlerContext ctx) { 
     System.out.println("connected"); 
     this.ctx = ctx; 
    } 

    @Override 
    protected void channelRead0(ChannelHandlerContext ctx, SomeData msg) throws Exception { 
     System.out.println("got message: " + msg); 
    } 

    @Override 
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { 
     System.out.println("caught exception: " + cause.getMessage()); 
     ctx.close(); 
    } 
} 

当我通过在服务器端控制台发送一条消息,我得到的输出:

running. 
duplex channel active 
duplex read 
idle: ALL_IDLE 
idle: ALL_IDLE 

send message ok 

所以它看起来好像被发送的消息,但没有在客户端收到。

当我从客户端做,我获得(在服务器控制台上):

in decoder, numBytes in message = 31 
duplex channelRead 
received: SomeData [name=client, ip=localhost, port=3333] 

这是我的期望。

那么问题在哪里呢?这与在服务器端使用ChannelDuplexHandler以及在客户端使用SimpleChannelInboundHandler有关吗?有什么我需要打电话来消除管道中的消息?

UPDATE 我已经在服务器中的sendMessage方法添加的支票future.isSuccess()和我得到的控制台上 send error: java.lang.UnsupportedOperationException

回答

1

(发表于OP)

对于任何有兴趣的人,问题是我试图在服务器通道上发送消息,而不是正常通道。 This post指出我在正确的方向。

相关问题