2017-05-31 41 views
1

在Apache Camel 2.19.0中,我想要在并发的seda队列中异步地生成消息并使用结果,同时阻塞seda队列上的执行程序是否已满。 它背后的用例:我需要处理大量文件,并且需要为它创建批次,因为每条单独一行的单个消息的开销太大,而我无法将整个文件放入堆中。但最后,我需要知道我所触发的所有批次是否已成功完成。 如此有效,我需要一个反压机制来发送队列,同时又想利用多线程处理。Apache Camel:异步操作和背压

下面是骆驼和春季的一个简单例子。我配置的路由:

package com.test; 

import org.apache.camel.builder.RouteBuilder; 
import org.springframework.stereotype.Component; 

@Component 
public class AsyncCamelRoute extends RouteBuilder { 

    public static final String ENDPOINT = "seda:async-queue?concurrentConsumers=2&size=2&blockWhenFull=true"; 

    @Override 
    public void configure() throws Exception { 
     from(ENDPOINT) 
       .process(exchange -> { 
        System.out.println("Processing message " + (String)exchange.getIn().getBody()); 
        Thread.sleep(10_000); 
       }); 
    } 
} 

制片人看起来是这样的:

package com.test; 

import org.apache.camel.ProducerTemplate; 
import org.springframework.beans.factory.annotation.Autowired; 
import org.springframework.context.event.ContextRefreshedEvent; 
import org.springframework.context.event.EventListener; 
import org.springframework.stereotype.Component; 

import java.util.ArrayList; 
import java.util.List; 
import java.util.concurrent.CompletableFuture; 

@Component 
public class AsyncProducer { 

    public static final int MAX_MESSAGES = 100; 

    @Autowired 
    private ProducerTemplate producerTemplate; 

    @EventListener 
    public void handleContextRefresh(ContextRefreshedEvent event) throws Exception { 
     new Thread(() -> { 
      // Just wait a bit so everything is initialized 
      try { 
       Thread.sleep(5_000); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 
      List<CompletableFuture> futures = new ArrayList<>(); 

      System.out.println("Producing messages"); 
      for (int i = 0; i < MAX_MESSAGES; i++) { 
       CompletableFuture future = producerTemplate.asyncRequestBody(AsyncCamelRoute.ENDPOINT, String.valueOf(i)); 
       futures.add(future); 
      } 
      System.out.println("All messages produced"); 

      System.out.println("Waiting for subtasks to finish"); 
      futures.forEach(CompletableFuture::join); 
      System.out.println("Subtasks finished"); 
     }).start(); 

    } 
} 

这段代码的输出如下所示:

Producing messages 
All messages produced 
Waiting for subtasks to finish 
Processing message 6 
Processing message 1 
Processing message 2 
Processing message 5 
Processing message 8 
Processing message 7 
Processing message 9 
... 
Subtasks finished 

如此看来,blockIfFull被忽略,所有的消息在处理之前被创建并放入队列中。

有没有什么办法可以创建消息,以便我可以在骆驼中使用异步处理,同时确保将元素放入队列会阻塞是否存在太多未处理的元素?

+1

你可以尝试'requestBody(..)'而不是'asyncRequestBody(..)'?它可能会导致用于执行异步消息发送的池中有大量阻塞的线程。而不是阻止你的客户端线程。 – Ralf

+0

嗨@Ralf,我不太了解你的方法 - requestBody让客户端(制片人)阻塞,直到消费者完成。虽然我想阻止客户端,如果它是垃圾邮件的消费者,它应该创建消息,只要有消费者。然而,我用不同的方法解决了这个问题。 –

+1

这是正确的。但是如果你做任何异步,那么另一个线程正在完成向seda提交并等待响应的工作。除非处理异步任务的线程池耗尽,否则不会阻止您运行循环并调用'asyncRequestBody(..)'的线程。但是如果线程根据需要在池中创建,那么您将永远不会看到您的循环线程被阻塞。 – Ralf

回答

0

我使用流媒体和自定义分离器解决了这个问题。通过这样做,我可以使用迭代器将源代码行分割为块,该迭代器仅返回行列表而不是单行。有了这个,在我看来,我可以根据需要使用骆驼。

所以路线包含以下部分:

.split().method(new SplitterBean(), "splitBody").streaming().parallelProcessing().executorService(customExecutorService) 

与如上述的行为定制的分离器。