2012-09-26 49 views
7

我正在玩风暴,我想知道Storm在聚合时指定(如果可能的话)(滚动/滑动)窗口大小。例如。如果我们想要在Twitter上查找前一小时的热门话题。我们如何指定螺栓应每小时返回结果?这是否以编程方式在每个螺栓内完成?或者是有些方法来指定一个“窗口”?(推特)风暴的聚合窗口

回答

17

免责声明:我使用暴风雨文章撰写了趋势专题文章gakhov in his answer above

我想说的最好的做法是使用所谓的tick tuples in Storm 0.8+。通过这些,您可以配置自己的喷嘴/螺栓,以便在特定时间间隔(例如,每十秒或每分钟)发出通知。

这里有一个简单的例子,配置组件的问题得到每十秒钟滴答元组:

// in your spout/bolt 
@Override 
public Map<String, Object> getComponentConfiguration() { 
    Config conf = new Config(); 
    int tickFrequencyInSeconds = 10; 
    conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, tickFrequencyInSeconds); 
    return conf; 
} 

然后,您可以使用您的嘴/螺栓的​​方法的条件开关来区分“正常”来袭元组来自特殊的tick元组。例如:

// in your spout/bolt 
@Override 
public void execute(Tuple tuple) { 
    if (isTickTuple(tuple)) { 
     // now you can trigger e.g. a periodic activity 
    } 
    else { 
     // do something with the normal tuple 
    } 
} 

private static boolean isTickTuple(Tuple tuple) { 
    return tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID) 
     && tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID); 
} 

再次,我写了一个漂亮的detailed blog post约几天前在风暴这样的gakhov指出(无耻插件!)。

+0

我们最终使用tick元组来“触发”聚合函数(螺栓)。谢谢一堆=) – gronnbeck

+0

不客气。 :) –

+0

嗨迈克尔,我想知道这件事:当风暴正在运行,我可以以某种方式改变滴答元组的频率?如果可以的话,我们可以改变风暴记录趋势结果日志的频率,或者它可以改变风暴计算主题趋势的窗口大小。谢谢! – user2895478

1

添加一个并行度为1的新喷口,让它发出一个空信号,然后Utils.sleep直到下次(全部在nextTuple中完成)。然后,使用所有分组将所有相关螺栓连接到该喷口,以便它们的所有实例都将接收到相同的信号。