2012-05-09 56 views
4

我正在分析的代码使用Netty NioDatagramChannelFactory创建UDP服务器。 它创建了一个线程池:线程在Netty UDP服务器中不同时执行

ExecutorService threadPool = Executors.newCachedThreadPool(); 

则数据报通道,pipelineFactory &自举:

int workerCount = 10; 
DatagramChannelFactory datagramChannelFactory = new NioDatagramChannelFactory(threadPool, workerCount); 
ChannelPipelineFactory pipelineFactory = new SNMPTrapsPipeLineFactory(); 

ConnectionlessBootstrap bootStrap = new ConnectionlessBootstrap(datagramChannelFactory); 
bootStrap.setPipelineFactory(pipelineFactory); 
bootStrap.bind(new InetSocketAddress(host, port)); 

在pipelineFactory中,getPipeline()增加了对定制处理器。

就像有人说的: Multi-threaded Handling of UDP Messages

只有一个线程处理接收到的消息。在日志中,线程名称显示为新的I/O数据报工作者#1像:

2012-04-20 09:20:51,853新的I/O数据报工作者#1' - '1 INFO [cemrsh SNMPTrapsRequestHandler:42] messageReceived |处理:V1TRAP [reqestID = 0,...]

我阅读文档和该条目:Lot of UDP requests lost in UDP server with Netty

然后我根据这些条目改变一个位的代码。

int corePoolSize = 5; 
ExecutorService threadPool = new OrderedMemoryAwareThreadPoolExecutor(corePoolSize, 1048576, 1048576); 

并拥有和ExecutionHandler的pipelineFactory:

ExecutionHandler executionHandler = new ExecutionHandler(threadPool); 
ChannelPipelineFactory pipelineFactory = new SNMPTrapsPipeLineFactory(executionHandler); 

而且getPipeline()将处理程序等记载: 现在线程池与创建

public class SNMPTrapsPipeLineFactory implements ChannelPipelineFactory { 

    private ExecutionHandler executionHandler = null; 

    public SNMPTrapsPipeLineFactory(ExecutionHandler executionHandler) { 
     this.executionHandler = executionHandler; 
    } 

    @Override 
    public ChannelPipeline getPipeline() throws Exception { 

     ChannelPipeline pipeline = Channels.pipeline(); 
     pipeline.addFirst("ExecutorHandler", executionHandler); 

     // Here the custom handlers are added 
     pipeline.addLast(...) 
    } 

现在,我在日志中获得4个不同的线程名称。他们将显示为池2线程1池2线程2,等等

例如:

2012-05-09 09:12:19589 pool- 2-thread-1 INFO [cemrshSNMPTrapsRequestHandler:46] messageReceived |正在处理:V1TRAP [reqestID = 0,...]

但它们不会同时处理。 messageReceived()下的处理必须在一个线程上完成,以处理下一个消息。 我从不同的客户端发送了一堆消息到服务器,我得到的日志不是交错的。我也试着在messageReceived()里面使用Thread.sleep(),并确认了之前的内容。

我错过了什么吗? 有没有办法用Netty实现一个REAL多线程UDP服务器? 如何获得不同的线程同时执行messageReceived()?

+0

如果我没有弄错,那么OrderedMemoryAwareThreadPoolExecutor执行来自同一个线程中同一客户端的请求。 – kofemann

回答

0

跳到我身上的一件事是,你把你的执行处理程序第一个在流水线中。我相信目的是整个管道直到“应用程序”处理程序都应该由执行IO解码的IO线程执行。

因此,我会断言你会首先添加所有的SNMPTrap解码处理程序,然后,当你有一个真正的SNMPTrap时,它会被传递给执行处理程序,然后该处理程序将陷阱传递给实际消费者的陷阱做一些有用的事情。

@Override 
public ChannelPipeline getPipeline() throws Exception { 

    ChannelPipeline pipeline = Channels.pipeline(
     new SomethingSomethingDecoder(), 
     new SNMPTrapDecoder(), 
     executionHandler. 
     snmpTrapConsumerHandler 
    ); 
} 

至少,这是它是如何在ExecutionHandler javadoc中所示,和上面的是我对它的解释。

+0

是的。我尝试了这种方法,这是文档中描述的方法,并没有奏效。然后我试着用addFirst()添加executionHandler,就像在问题的回答中所描述的那样:在Netty的UDP服务器中丢失了大量的UDP请求。但他们都没有工作。 – nephewtom

0

根据我的经验和我对UDP的Netty的理解,通常只有一个线程处理解码的UDP消息。由于UDP是无会话的,只有一个线程可以在一个UDP端口上接收数据并对其进行解码。一旦将数据解码并将其包装到缓冲区或特定的java对象中,然后可以将该对象放入将处理它的线程池(执行处理程序 - >您的业务处理程序)中。然后,一旦将以前解码的数据释放到执行处理程序中,就可以对UDP端口上即将发布的新数据进行解码。

创建NioDatagramChannelFactory时可以指定的池的线程仅在侦听多个端口上的数据时使用。每个端口只有一个线程是有意义的。即使您在该构造函数中指定了100个工作者,如果您配置了一个UDP端口,也只会使用一个。

+0

看来你是对的,每个端口只有一个线程。 但对我来说,它没有任何意义... 看来这只是netty的设计方式。 我不得不添加我的“线程代码”来表现这种行为,当我使用一个框架来处理具有数万亿选项的线程池时... 我不敢相信它...... – nephewtom

+0

你是否想关注那么你的代码作为答案呢?我同意你的看法,即使只有一个接收端口,也可能有很多来源,所以多线程不仅有意义,而且对于为大量客户提供服务似乎也是必要的。 – gsimard