2017-01-30 101 views
2

我有一个弹簧应用程序,应该处理和存储套接字传入的数据,因为瓶颈问题,这应该与多线程完成。春季并发处理多个队列与单线程池

传入数据属于许多实体和每个实体的任务应该处理顺序,但我认为分配一个线程每个实体是不是一个很好的解决方案(千单一线程来处理每个实体的队列)

那么我怎样才能定义一个公共ThreadPool来处理所有实体的队列与票价算法?

回答

0

您可以使用Project reactorRxJava按组分组传入消息,并按顺序处理每个组中的事件。

随着工程反应堆你可以的代码如下所示:

Scheduler groupScheduler = Schedulers.newParallel("groupByPool", 16); 
    Flux.fromStream(incomingMessages()) // stream of new data from socket 
      .groupBy(Message::getEntityId) // split incoming messages by groups, which should be processed serially 
      .map(g -> g.publishOn(groupScheduler)) //create new publisher for groups of messages 
      .subscribe(//create consumer for main stream 
        stream -> 
          stream.subscribe(this::processMessage) // create consumer for group stream and process messagaes 
      ); 
+0

在您的示例代码中,您正在为每个组创建新的并行线程池,并将其指向为我的问题中的一个错误解决方案。使用单个线程池处理所有带有票价算法的任务是我的目标 – Mojtabye

+0

@Mojtabye同意,对不起这个。更新了答案 –

2

您已经描述了使用消息驱动体系结构解决的完美问题。

Spring Integration是为您提供此功能的模块。

您可以构建您的任务服务并使用@ServiceActivator进行注释,并使用Channels创建您的链。

通道可以选择在不同的线程池上执行,并且可以通过您的通道上的队列设置来克服由于尖峰负载而导致的瓶颈。

绝对值得一试的查看Spring Integration的文档。

+0

请提高通过添加源代码的答案。 – Mojtabye