2016-12-02 29 views
1

同样,但这个问题略有不同:KStream batch process windows,我想从KStream批量消息,然后再推送给消费者。如何将KStream聚合到固定大小的列表?

但是,这种下推不应该安排在固定的时间窗口上,而应该在每个密钥的固定消息计数阈值上。

对于初学者两个问题浮现在脑海中:

1)是一个自定义AbstractProcessor方式这应该如何处理?沿着线的东西:

@Override 
public void punctuate(long streamTime) { 
    KeyValueIterator<String, Message[]> it = messageStore.all(); 
    while (it.hasNext()) 
     KeyValue<String, Message[]> entry = it.next(); 
     if (entry.value.length > 10) { 
      this.context.forward(entry.key, entry.value); 
      entry.value = new Message[10](); 
     } 
    } 
} 

2)由于StateStore将有可能发生爆炸(如果条目值永远不会到达转发,以便该阈值),什么是最好的方式“垃圾收集”这?我可以做一个基于时间的计划,并删除太旧的密钥......但这看起来非常DIY和容易出错。

回答

2

我想这会工作。应用基于时间的“垃圾收集”听起来也很合理。是的,使用处理器API而不是DSL具有DIY的风格 - 首先不是PAPI的目的(授权用户根据需要做任何事情)。

虽然几点意见:

  • 您将需要一个更复杂的数据结构:因为punctuate()是基于流的时间进度调用时,它可能发生,你有超过10个记录两者之间的一个关键调用。因此,您需要像KeyValueIterator<String, List<Message[]>> it = messageStore.all();这样的产品才能够为每个密钥存储多个批次。
  • 我会假设你需要微调时间表来标点会很棘手 - 如果你的日程安排太紧,很多批次可能还没有完成,你会浪费CPU - 如果你的日程安排太松,您将需要大量内存,并且您的下游操作员会一次释放大量内容,从而获得大量数据。向下游发送突发数据可能会成为问题。
  • 扫描整个商店非常昂贵 - 尝试根据批次大小对您的键值对进行“排序”似乎是个好主意。这应该使您只能触摸已完成批次的键而不是所有键。也许你可以保留内存列表中已批量处理的密钥,并只对这些内容进行查找(如果失败,则需要对存储中的所有密钥执行一次传输以重新创建此内存列表)。
+0

感谢这些宝贵的意见。毫无疑问,更多具体问题将随着实施而展开。 – Raf