我试图使用Jboss Netty创建长轮询Comet。Netty Comet异步请求超时
如何配置30秒的时间?继documentaton:
@Override
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("decoder", new HttpRequestDecoder());
pipeline.addLast("encoder", new HttpResponseEncoder());
pipeline.addLast("handler", new HTTPRequestHandler());
Timer timer = new HashedWheelTimer();
pipeline.addLast("timeout", new IdleStateHandler(timer, 30, 30, 0));
return pipeline;
,但它不工作,我的要求是永恒的。这怎么解决?
是否意味着我需要执行Callable<T>
,然后调用Future.get
带有超时参数和终止请求,如果TimeOutException
发生?那我应该用Future<ChannelFuture>
?
有没有其他方法?
代码:
FutureExecutor executor = FutureExecutor.getInstance();
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
HttpRequest request = (HttpRequest) e.getMessage();
ChannelFuture channelFuture = null;
Callable<ChannelFuture> myRunnable = new MyCallable(e);
Future<ChannelFuture> future = executor.fireEvent(myRunnable);
try{
channelFuture = future.get(40,TimeUnit.SECONDS);
}catch (TimeoutException ex) {
channelFuture = e.getChannel(Response timeOutResponse);
// handle the timeout
} catch (InterruptedException ex) {
channelFuture = e.getChannel(Response interaptedResponse);
} catch (ExecutionException ex) {
channelFuture = e.getChannel(Response errorResponse);
}
finally{
future.cancel(true);
channelFuture.addListener(ChannelFutureListener.CLOSE);
}
}
和内部可赎回的,我只是监控BlockedQueue:
你@Override
public ChannelFuture call() {
final BlockingQueue<String> queue =.....
while (true){
Message message = queue.take();
ChannelBuffer partialresponse = ChannelBuffers.buffer(message.toJson());
ChannelFuture future = e.getChannel().write(partialresponse);
return future;
}
}
这很好谢谢。我正在使用Async Comet,并且每个监视BlockedQueue的通道都有Thread,我的问题是IdleStateAwareChannelUpstreamHandler不会停止线程工作。 –
2012-01-16 16:33:37
对不起,我没有得到它......即使你使用每个通道的一个线程,它不是一个好主意“不”共享HashedWheelTimer,因为它带有一些开销。如果你需要从空闲处理中取消你的线程,你将需要提供一个方法待办事项。但那更多的东西在你的代码范围内然后在netty中。我仍然认为你应该避免每个通道使用一个线程.... – 2012-01-17 10:40:46
你能告诉我你是什么意思的“每个通道线程”?如果我正在运行Callable = new Callable(Channel),并且正在监视BlockedQueue ,我是否在每个通道使用线程? –
2012-01-17 10:57:30