2015-11-03 91 views
1

我打算使用netty客户端引导程序打开多个连接,以解析来自多个源的消息。这些消息都具有相同的格式,但是,由于需要处理的数据量很大,我必须在不同的线程上运行每个连接(这是假设netty为每个客户端通道创建一个线程,我找不到参考因为 - 如果情况并非如此,这将如何实现?)。Scala Netty有没有什么办法可以共享一个ReplayingDecoder

这是我用来连接到数据服务器的代码:其延伸ReplayingDecoder

var b = new Bootstrap() 
     .group(group) 
     .channel(classOf[NioSocketChannel]) 
     .handler(RawFeedChannelInitializer) 


var ch1 = b.clone().connect(host, port).sync().channel(); 
var ch2 = b.clone().connect(host, port).sync().channel(); 

初始化器调用RawPacketDecoder,并且被定义here。 当打开单个连接时,代码运行良好,没有@Sharable,但为了我的应用程序的目的,我必须多次连接到同一个服务器。

这会导致运行时错误@Sharable annotation is not allowed指向我的RawPacketDecoder类。

我不完全确定如何解决这个问题,但缺少在scala中实现类ReplayingDecoder作为我的解码器直接基于ByteToMessageDecoder

任何帮助将不胜感激。

注:我使用的网状4.0.32最终

回答

1

我发现this StockExchange answer解决方案。

我的问题是我使用的是基于对象的ChannelInitializer(单例),并且ReplayingDecoder以及ByteToMessageDecoder是不可共享的。

我的初始化程序是作为一个scala对象创建的,因此允许使用单个实例。将初始化程序更改为scala类并为每个引导程序克隆实例化解决了问题。我修改了引导代码上面如下:

var b = new Bootstrap() 
    .group(group) 
    .channel(classOf[NioSocketChannel]) 
    //.handler(RawFeedChannelInitializer) 

var ch1 = b.clone().handler(new RawFeedChannelInitializer()).connect(host, port).sync().channel(); 
var ch2 = b.clone().handler(new RawFeedChannelInitializer()).connect(host, port).sync().channel(); 

我不知道这是否可以确保多线程被通缉,但它允许分割数据访问到进给服务器的多个连接。

编辑更新:在对主题进行了额外的研究之后,我确定netty实际上确实为每个通道创建一个线程;这证实通过打印创建每个通道的后到控制台:

println("No. of active threads: " + Thread.activeCount()); 

输出显示一个递增的数字作为被创建并与它们各自的线程相关联的信道。

默认情况下NioEventLoopGroup使用2*Num_CPU_cores线程定义here

DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
       "io.netty.eventLoopThreads", 
       Runtime.getRuntime().availableProcessors() * 2)); 

这个值可以通过设置

val group = new NioEventLoopGroup(16) 

,然后使用该组创建/设置引导被覆盖到别的东西。

+0

Netty不会为每个通道创建一个线程,它通过其自己的线程共享所有传入通道,并且如果存在比线程更多的通道,则1个线程可以具有多个通道 – Ferrybig

相关问题