我有一组关键字(超过600),我想使用流API来跟踪推文与他们。 Twitter api将您可以跟踪的关键字数量限制为200个。因此,我决定使用多个线程来完成此操作,并为此使用多个OAuth令牌。这是我该怎么做的:使用多线程从twitter使用twitter4j获取数据
String[] dbKeywords = KeywordImpl.listKeywords();
List<String[]> keywords = ditributeKeywords(dbKeywords);
for (String[] subList : keywords) {
StreamCrawler streamCrawler = new StreamCrawler();
streamCrawler.setKeywords(subList);
Thread crawlerThread = new Thread(streamCrawler);
crawlerThread.start();
}
这是如何在线程间分配字。每个线程收到不超过200个单词。 这是StreamCrawler执行:
public class StreamCrawler extends Crawler implements Runnable {
...
private String[] keywords;
public void setKeywords(String[] keywords) {
this.keywords = keywords;
}
@Override
public void run() {
TwitterStream twitterStream = getTwitterInstance();
StatusListener listener = new StatusListener() {
ArrayDeque<Tweet> tweetbuffer = new ArrayDeque<Tweet>();
ArrayDeque<TwitterUser> userbuffer = new ArrayDeque<TwitterUser>();
@Override
public void onException(Exception arg0) {
System.out.println(arg0);
}
@Override
public void onDeletionNotice(StatusDeletionNotice arg0) {
System.out.println(arg0);
}
@Override
public void onScrubGeo(long arg0, long arg1) {
System.out.println(arg1);
}
@Override
public void onStatus(Status status) {
...Doing something with message
}
@Override
public void onTrackLimitationNotice(int arg0) {
System.out.println(arg0);
try {
Thread.sleep(5 * 60 * 1000);
System.out.println("Will sleep for 5 minutes!");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void onStallWarning(StallWarning arg0) {
System.out.println(arg0);
}
};
FilterQuery fq = new FilterQuery();
String keywords[] = getKeywords();
System.out.println(keywords.length);
System.out.println("Listening for " + Arrays.toString(keywords));
fq.track(keywords);
twitterStream.addListener(listener);
twitterStream.filter(fq);
}
private long getCurrentThreadId() {
return Thread.currentThread().getId();
}
private TwitterStream getTwitterInstance() {
TwitterConfiguration configuration = null;
TwitterStream twitterStream = null;
while (configuration == null) {
configuration = TokenFactory.getAvailableToken();
if (configuration != null) {
System.out
.println("Token was obtained " + getCurrentThreadId());
System.out.println(configuration.getTwitterAccount());
setToken(configuration);
ConfigurationBuilder cb = new ConfigurationBuilder();
cb.setDebugEnabled(true);
cb.setOAuthConsumerKey(configuration.getConsumerKey());
cb.setOAuthConsumerSecret(configuration.getConsumerSecret());
cb.setOAuthAccessToken(configuration.getAccessToken());
cb.setOAuthAccessTokenSecret(configuration.getAccessSecret());
twitterStream = new TwitterStreamFactory(cb.build())
.getInstance();
} else {
// If there is no available configuration, wait for 2 minutes
// and try again
try {
System.out
.println("There were no available tokens, sleeping for 2 minutes.");
Thread.sleep(2 * 60 * 1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
return twitterStream;
}
}
所以我的问题是,当我开始为例如2个线程我得到他们两人都开流,得到它的通知。但实际上只有第一个真正获得流并分别调用OnStatus方法。在第二个线程中使用的数组不是空的; Twitter配置也是有效和唯一的。所以我不明白什么可能是这种行为的原因。为什么唯一的第一个线程返回tweets?
这很奇怪,因为一切似乎都是独一无二的。为什么不运行多个应用程序而不是在同一应用程序中运行线程流? – mgokhanbakal
我没有想到这种方法,因为关键字的数量会增加,所以我将不得不运行超过5个实例。此外,我会说这不是明智的,因为我看不出有任何障碍可以通过不同的线索来完成。 – Tonven
我同意它应该以某种方式工作。我只是提出了一个想法,如果你的键盘很小。 :D – mgokhanbakal