2017-10-18 99 views
0

我希望异步写入春云卡夫卡流。例如异步写入春云卡夫卡流

Class SomeClass{ 

@StreamLister(Processor.INPUT) 
public void receiveEvents(String e){ 

    class ThreadExecutor implements Runnable { 
    private String message; 

    public ThreadExecutor(message){ 
     this.message = message; 
    } 

    public void run(){ 
     //after processing the string I will publish it 
     message = message + "done"; 
     writeToStream(message); 
    } 
    } 

    Executors.newCachedThreadPool().execute(new ThreadExecutor(e)); 
} 

@SendTo //not sure how to write it back 
public Message<String> writeToStream(String message){ 
    //this is what I want to know 
    } 

} 

所以在上面的例子中。我想知道如何调用writeToStream方法,以便它将回写到kafka。基本上我想写投票完成任务不像轮询。请帮忙。

回答

0
@Autowired 
private MessageChannel output; 

... 

    output.send(MessageBuilder.withPayload(data).build()); 

但是,目前尚不清楚为什么你需要这个;默认情况下,kafka发送已经是异步的。

+0

感谢加里。我这样解决了它。我无法回答我的问题。我被阻止回答这个问题。将流监听器为其收到的每条消息创建线程。在我的情况下,它只是在返回输出后才接收消息。所以我创建了一个线程来异步发送它。 –

+0

为什么你需要添加另一个答案,如果这一个回答你的问题?您可以增加活页夹中的“并发”以获得多个并发交付;你必须至少有这么多的分区; kafka将在线程之间分配分区。来自同一分区的消息将在同一个线程上传递。 –