我正在玩风暴,我想知道Storm在聚合时指定(如果可能的话)(滚动/滑动)窗口大小。例如。如果我们想要在Twitter上查找前一小时的热门话题。我们如何指定螺栓应每小时返回结果?这是否以编程方式在每个螺栓内完成?或者是有些方法来指定一个“窗口”?(推特)风暴的聚合窗口
7
A
回答
17
免责声明:我使用暴风雨文章撰写了趋势专题文章gakhov in his answer above。
我想说的最好的做法是使用所谓的tick tuples in Storm 0.8+。通过这些,您可以配置自己的喷嘴/螺栓,以便在特定时间间隔(例如,每十秒或每分钟)发出通知。
这里有一个简单的例子,配置组件的问题得到每十秒钟滴答元组:
// in your spout/bolt
@Override
public Map<String, Object> getComponentConfiguration() {
Config conf = new Config();
int tickFrequencyInSeconds = 10;
conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, tickFrequencyInSeconds);
return conf;
}
然后,您可以使用您的嘴/螺栓的方法的条件开关来区分“正常”来袭元组来自特殊的tick元组。例如:
// in your spout/bolt
@Override
public void execute(Tuple tuple) {
if (isTickTuple(tuple)) {
// now you can trigger e.g. a periodic activity
}
else {
// do something with the normal tuple
}
}
private static boolean isTickTuple(Tuple tuple) {
return tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)
&& tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID);
}
再次,我写了一个漂亮的detailed blog post约几天前在风暴这样的gakhov指出(无耻插件!)。
1
添加一个并行度为1的新喷口,让它发出一个空信号,然后Utils.sleep直到下次(全部在nextTuple中完成)。然后,使用所有分组将所有相关螺栓连接到该喷口,以便它们的所有实例都将接收到相同的信号。
相关问题
- 1. 风暴:不同大小的几个滑动窗口的最小/最大聚合
- 2. 风暴三叉戟的平均聚合
- 3. 风暴三叉戟窗口加入
- 4. 窗口上的SQL聚合
- 5. 推式风暴水龙头
- 6. 仅暴露来自DBContext的聚合根
- 7. 窗口功能/聚合函数/中断窗口
- 8. 浏览器窗口弹出窗口 - 风险和特殊功能
- 9. 在聚合物中暴露API
- 10. 卡夫卡风暴喷口lein或Mvn
- 11. 状态保存风暴螺栓,可以进行定期聚合并将聚合结果保存到db
- 12. 通知窗口 - 防止窗口聚焦
- 13. MongoDb聚合(SQL UNION风格)
- 14. 风暴localcluster无法加载博尔特
- 15. 如何整合风暴和卡夫卡
- 16. 腓风暴,导航功能类合同
- 17. 如何整合hadoop和风暴?
- 18. 风暴不适合批量学习吗?
- 19. 风暴InvalidTopologyException
- 20. 好用风暴?
- 21. 暴露窗口对象的文档
- 22. SQL窗功能无法在其他窗口函数或聚合
- 23. 在一个简单的聚集风暴拓扑分组
- 24. 保留一个变量的值/状态的喷口的特定ID。在风暴
- 25. Linux上的端口聚合
- 26. 推特风格更新
- 27. 需要关于风暴喷口合适的消息队列的建议
- 28. 的Apache 9.2风暴中缺少风暴起动
- 29. Apache风暴 - 带风暴集群的地图拓扑
- 30. 的Java JNA聚焦于特定的窗口
我们最终使用tick元组来“触发”聚合函数(螺栓)。谢谢一堆=) – gronnbeck
不客气。 :) –
嗨迈克尔,我想知道这件事:当风暴正在运行,我可以以某种方式改变滴答元组的频率?如果可以的话,我们可以改变风暴记录趋势结果日志的频率,或者它可以改变风暴计算主题趋势的窗口大小。谢谢! – user2895478