2013-05-08 43 views
0

我正在写一个简单的路由应用程序。这个想法是,我有服务器或源节点接收持续x时间的瞬态客户端连接。接收到的消息被解码,然后根据消息的细节发送到相应的接收节点或已经打开的客户端。路由器类注册所有通道并尝试将它们保存在地图中,以便它可以过滤并排除消息的目的地。一旦我到达目的地,我应该能够选择实际的汇聚节点(根据配置可以是持久性的瞬态),并将数据发送到该通道等待响应,然后将其发送回始发者。我想知道,如果我的实施使用netty是在正确的方向?以及如何传递从任何服务器收到的消息并将其发送给任何客户端并回应到始发源节点?如何在netty中的通道之间传递数据?

下面是我的源代码:它会/应该给你一个我最喜欢的概念:请在你的解释中使用代码示例。

import java.net.InetSocketAddress; 
    import java.util.ArrayList; 
    import java.util.HashMap; 
    import java.util.List; 
    import java.util.Map; 
    import java.util.concurrent.Executors; 
    import org.jboss.netty.bootstrap.ClientBootstrap; 
    import org.jboss.netty.bootstrap.ServerBootstrap; 
    import org.jboss.netty.channel.ChannelFactory; 
    import org.jboss.netty.channel.ChannelHandlerContext; 
    import org.jboss.netty.channel.ChannelPipeline; 
    import org.jboss.netty.channel.ChannelPipelineFactory; 
    import org.jboss.netty.channel.ChannelStateEvent; 
    import org.jboss.netty.channel.Channels; 
    import org.jboss.netty.channel.ChildChannelStateEvent; 
    import org.jboss.netty.channel.ExceptionEvent; 
    import org.jboss.netty.channel.MessageEvent; 
    import org.jboss.netty.channel.SimpleChannelHandler; 
    import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; 
    import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; 

    /* 
    * @author Kimathi 
    */ 

    public class Service { 

     private Nodes nodes; 

     public void start(){ 

      nodes = new Nodes(); 
      nodes.addSourceNodes(new SourceNodes()). 
        addSinkNodes(new SinkNodes()). 
        addConfigurations(new Configurations()). 
        boot(); 
     } 

     public void stop(){ 

      nodes.stop(); 
     } 

     public static void main(String [] args){ 

      new Service().start(); 
     } 

    } 

    class Nodes { 

     private SourceNodes sourcenodes; 

     private SinkNodes sinknodes ; 

     private Configurations configurations; 

     public Nodes addConfigurations(Configurations configurations){ 

      this.configurations = configurations; 

      return this; 
     } 

     public Nodes addSourceNodes(SourceNodes sourcenodes){ 

      this.sourcenodes = sourcenodes; 

      return this; 
     } 

     public Nodes addSinkNodes(SinkNodes sinknodes){ 

      this.sinknodes = sinknodes; 

      return this; 
     } 

     public void boot(){ 

      Router router = new Router(configurations); 

      sourcenodes.addPort(8000). 
         addPort(8001). 
         addPort(8002); 
      sourcenodes.addRouter(router); 
      sourcenodes.boot() ; 

      sinknodes.addRemoteAddress("127.0.0.1", 6000). 
        addRemoteAddress("127.0.0.1", 6001). 
        addRemoteAddress("127.0.0.1", 6002); 
      sinknodes.addRouter(router); 
      sinknodes.boot(); 

     } 

     public void stop(){ 

      sourcenodes.stop(); 

      sinknodes.stop(); 
     } 

    } 

    final class SourceNodes implements Bootable , Routable { 

     private List <Integer> ports = new ArrayList(); 

     private ServerBootstrap serverbootstrap; 

     private Router router; 

     @Override 
     public void addRouter(final Router router){ 

      this.router = router; 
     } 

     public SourceNodes addPort(int port){ 

      this.ports.add(port); 

      return this; 
     } 

     @Override 
     public void boot(){ 

      this.initBootStrap(); 

      this.serverbootstrap.setOption("child.tcpNoDelay", true); 
      this.serverbootstrap.setOption("child.keepAlive", true); 
      this.serverbootstrap.setPipelineFactory(new ChannelPipelineFactory() { 

       @Override 
       public ChannelPipeline getPipeline() throws Exception { 

        return Channels.pipeline(new SourceHandler(router)); 
       } 
      }); 



      for(int port:this.ports){ 
       this.serverbootstrap.bind(new InetSocketAddress(port)); 
      } 
     } 

     @Override 
     public void stop(){ 

      this.serverbootstrap.releaseExternalResources(); 

     } 

     private void initBootStrap(){ 

      ChannelFactory factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),Executors.newCachedThreadPool()); 

      this.serverbootstrap = new ServerBootstrap(factory); 
     } 
    } 

    final class SinkNodes implements Bootable , Routable { 

     private List<SinkAddress> addresses= new ArrayList(); 

     private ClientBootstrap clientbootstrap; 

     private Router router; 

     @Override 
     public void addRouter(final Router router){ 

      this.router = router; 

     } 

     public SinkNodes addRemoteAddress(String hostAddress,int port){ 

      this.addresses.add(new SinkAddress(hostAddress,port)); 

      return this; 
     } 

     @Override 
     public void boot(){ 

      this.initBootStrap(); 

      this.clientbootstrap.setOption("tcpNoDelay", true); 
      this.clientbootstrap.setOption("keepAlive", true); 
      this.clientbootstrap.setPipelineFactory(new ChannelPipelineFactory() { 

       @Override 
       public ChannelPipeline getPipeline() throws Exception { 

        return Channels.pipeline(new SinkHandler(router)); 
       } 
      }); 

      for(SinkAddress address:this.addresses){ 

       this.clientbootstrap.connect(new InetSocketAddress(address.hostAddress(),address.port())); 
      } 
     } 

     @Override 
     public void stop(){ 

      this.clientbootstrap.releaseExternalResources(); 
     } 

     private void initBootStrap(){ 

      ChannelFactory factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),Executors.newCachedThreadPool()); 

      this.clientbootstrap = new ClientBootstrap(factory); 
     } 

     private class SinkAddress { 

      private final String hostAddress; 
      private final int port; 

      public SinkAddress(String hostAddress, int port) { 
       this.hostAddress = hostAddress; 
       this.port = port; 
      } 

      public String hostAddress() { return this.hostAddress; } 
      public int port() { return this.port; } 
     } 
    } 

    class SourceHandler extends SimpleChannelHandler { 

     private Router router; 

     public SourceHandler(Router router){ 

      this.router = router; 
     } 

     @Override 
     public void childChannelOpen(ChannelHandlerContext ctx, ChildChannelStateEvent e) throws Exception { 

      System.out.println("child is opened"); 
     } 

     @Override 
     public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { 

      System.out.println("child is closed"); 
     } 

     @Override 
     public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { 


       System.out.println("Server is opened"); 

     } 

     @Override 
     public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { 

      System.out.println(e.getCause()); 
     } 

     @Override 
     public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { 


      System.out.println("channel received message"); 

     } 
    } 

    class SinkHandler extends SimpleChannelHandler { 

     private Router router; 

     public SinkHandler(Router router){ 

      this.router = router; 
     } 

     @Override 
     public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { 

      System.out.println("Channel is connected"); 
     } 

     @Override 
     public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { 

      System.out.println(e.getCause()); 
     } 

     @Override 
     public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { 

      System.out.println("channel received message"); 

     } 
    } 

    final class Router { 

     private Configurations configurations; 

     private Map sourcenodes = new HashMap(); 

     private Map Sinknodes = new HashMap(); 

     public Router(){} 

     public Router(Configurations configurations){ 

      this.configurations = configurations; 
     } 

     public synchronized boolean submitSource(ChannelHandlerContext ctx , MessageEvent e){ 

      boolean responded = false; 

      return responded; 
     } 

     public synchronized boolean submitSink(ChannelHandlerContext ctx , MessageEvent e){ 

      boolean responded = false; 

      return responded; 
     } 
    } 

    final class Configurations { 

     public Configurations(){} 
    } 

    interface Bootable { 

     public abstract void boot(); 

     public abstract void stop(); 
    } 

    interface Routable { 

     public abstract void addRouter(Router router); 
    } 

回答

0

这个想法似乎是合理的。

源通道处理程序只能使用Channel#write(...)写入相应的接收通道,反之亦然。

当然,您还需要一种将源通道与回复关联的方式,以及如何最好地完成取决于协议的性质。如果可能的话,最好的选择是以某种方式将消息中的源通道ID编码到宿信道(当然也包括在回复中)。

如果这是不可能的,你将不知何故必须保持相关性。如果确保答复与发送的请求配对,则每个接收器通道的FIFO队列可能是合适的。

相关问题