0
我希望异步写入春云卡夫卡流。例如异步写入春云卡夫卡流
Class SomeClass{
@StreamLister(Processor.INPUT)
public void receiveEvents(String e){
class ThreadExecutor implements Runnable {
private String message;
public ThreadExecutor(message){
this.message = message;
}
public void run(){
//after processing the string I will publish it
message = message + "done";
writeToStream(message);
}
}
Executors.newCachedThreadPool().execute(new ThreadExecutor(e));
}
@SendTo //not sure how to write it back
public Message<String> writeToStream(String message){
//this is what I want to know
}
}
所以在上面的例子中。我想知道如何调用writeToStream方法,以便它将回写到kafka。基本上我想写投票完成任务不像轮询。请帮忙。
感谢加里。我这样解决了它。我无法回答我的问题。我被阻止回答这个问题。将流监听器为其收到的每条消息创建线程。在我的情况下,它只是在返回输出后才接收消息。所以我创建了一个线程来异步发送它。 –
为什么你需要添加另一个答案,如果这一个回答你的问题?您可以增加活页夹中的“并发”以获得多个并发交付;你必须至少有这么多的分区; kafka将在线程之间分配分区。来自同一分区的消息将在同一个线程上传递。 –