2015-12-19 140 views
0

我是Spark和Scala的新手。我使用Spark Streaming编写了一个在Twitter上获取hashtag或tweet的程序。我的代码是这样的:Spark Twitter Streaming

val conf = new SparkConf().setMaster("local[2]").setAppName("SparkTwitterHelloWorldExample"); 
    val jssc = new StreamingContext(conf, new Duration(1000)); 
    System.setProperty("twitter4j.oauth.consumerKey", consumerKey); 
    System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret); 
    System.setProperty("twitter4j.oauth.accessToken", accessToken); 
    System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret); 

    val twitterStream=TwitterUtils.createStream(jssc, None, Array("#Spark")) 

    // Without filter: Output text of all tweets 
    val statuses = twitterStream.map{ status => status.getText() } 
    val hashTags = statuses.filter(word => word.startsWith("#Spark")) 
    val tagCounts = hashTags.window(Seconds(100), Seconds(10)).countByValue() 
    hashTags.count().print(); 
    tagCounts.count().print(); 
    jssc.start(); 

此代码总是打印0,我不知道为什么?如果有人知道,你能帮我吗,谢谢。

回答

0

我认为现在,这段代码只会查找状态以#Spark开头的推文。除此之外,我会建议降低文字,所以你可以找到#Spark,#spark,#SPARK等。你可以试试这个吗?

val hashTags = statuses.filter(word => word.toLowerCase.contains("#Spark")) 

另一种选择是首先获取状态中的所有井号标签,然后从井号标签列表中继续。你可以在火花示例中找到这样的例子:

https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala 
+0

谢谢你的答案。现在,当我打印hashTags时,我得到的时间是Time:1450611281000毫秒。你能告诉我如何获取状态吗? – Licky

+0

对于这个例子,你给了我,我再次得到0 ..我不知道什么是问题 – Licky