这是我们一直在使用它成功地过滤鸣叫:
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
queue = new LinkedBlockingQueue<Status>(1000);
_collector = collector;
StatusListener listener = new StatusListener() {
public void onStatus(Status status) {
queue.offer(status);
}
public void onDeletionNotice(StatusDeletionNotice sdn) {
}
public void onTrackLimitationNotice(int i) {
}
public void onScrubGeo(long l, long l1) {
}
public void onException(Exception e) {
}
};
TwitterStreamFactory fact = new TwitterStreamFactory(new ConfigurationBuilder().setUser(_username).setPassword(_pwd).build());
_twitterStream = fact.getInstance();
_twitterStream.addListener(listener);
_twitterStream.filter(new FilterQuery().track(TERMS_TO_TRACK).setIncludeEntities(true));
}
我们不使用流式API,因为我们根据日期范围和其他条件的主机只可用过滤器鸣叫在查询API接口上。因此,我的原始问题如何集成,如果我没有访问状态对象 – Joe
你可以有状态对象只是解析Tuple对象的螺栓execute()方法是这样的:'Status status =(Status)tuple.getValue (0);' – Orbita