1
同样,但这个问题略有不同:KStream batch process windows,我想从KStream
批量消息,然后再推送给消费者。如何将KStream聚合到固定大小的列表?
但是,这种下推不应该安排在固定的时间窗口上,而应该在每个密钥的固定消息计数阈值上。
对于初学者两个问题浮现在脑海中:
1)是一个自定义AbstractProcessor
方式这应该如何处理?沿着线的东西:
@Override
public void punctuate(long streamTime) {
KeyValueIterator<String, Message[]> it = messageStore.all();
while (it.hasNext())
KeyValue<String, Message[]> entry = it.next();
if (entry.value.length > 10) {
this.context.forward(entry.key, entry.value);
entry.value = new Message[10]();
}
}
}
2)由于StateStore
将有可能发生爆炸(如果条目值永远不会到达转发,以便该阈值),什么是最好的方式“垃圾收集”这?我可以做一个基于时间的计划,并删除太旧的密钥......但这看起来非常DIY和容易出错。
感谢这些宝贵的意见。毫无疑问,更多具体问题将随着实施而展开。 – Raf