2015-04-29 52 views
0

我正在使用reactor在整个应用程序中发布事件,并让不同的消费者响应他们的事件。反应器消费者的多线程

这是我的反应器构造

@Configuration 
@EnableReactor 
public class ReactorConfiguration { 

    static { 
     Environment.initializeIfEmpty().assignErrorJournal(); 
    } 

    @Bean 
    public EventBus eventBus() { 
     return EventBus.config().env(Environment.get()).dispatcher(Environment.SHARED).get(); 
} 

我期待默认环形缓冲器基于调度被使用并且发送到单个消费者多个消息应该被并行处理。相反,它似乎是以同步方式处理事件。线程shared-1用于处理我的event1到consumer1,然后只有在完成event1的处理后,同一个线程才开始处理consumer1上的event2。

如何以一种方式实现并行处理,我应该能够将多个事件发送给多个使用者,并且所有事件都是并行处理的。

我将不胜感激任何建议。

这是我如何分派事件到事件总线

dispatch(ReactorEvents.REPORT_REQUEST_EVENT, "", event); 

protected <T> void dispatch(String selector, String info, T event) { 
    eventBus.notify(selector, Event.wrap(Tuple.of(info, event))); 
} 

,这里是消费者

@Consumer 
public class ReportRequestHandler { 
... 
    @Selector(ReactorEvents.REPORT_REQUEST_EVENT) 
    @Override 
    public void handleRequest(Tuple2<String, ReportRequestEvent> tuple) { 
    ReportRequestEvent event = tuple.getT2(); 
    log.debug("processing report request " + event.getId()); 
    .... 
    } 
} 
+0

好像通过简单地改变为'调度(Environment.WORK_QUEUE)'或'调度(Environment.THREAD_POOL)'不我想要什么,但我想我需要了解两者之间的差异多一点,其次,如果线程池将使用与应用程序的其余部分相同的线程池 – adeelmahmood

回答