2
我有一个与Spark JavaStreamingContext
一起使用的程序。我已经了解到,使用DStreams时只有几个输出操作,如print()
。 这是一段代码在JavaSparkStreamingContext中执行查询
private static void analyzeHashtags() throws InterruptedException {
JavaPairDStream<String, String> messages = KafkaUtils.createStream(jssc, zookeeper_server, kafka_consumer_group, topics);
JavaPairDStream<String, Integer> lines = messages.mapToPair((x)->(new Tuple2<String, Integer>(x._2, 1))).reduceByKey(sumFunc);
lines.print();
jssc.start();
jssc.awaitTermination();
}
现在我想查询操作添加到这个代码,如下图所示:
private static void analyzeHashtags() throws InterruptedException, SQLException {
JavaPairDStream<String, String> messages = KafkaUtils.createStream(jssc, zookeeper_server, kafka_consumer_group, topics);
JavaPairDStream<String, Integer> lines = messages.mapToPair((x)->(new Tuple2<String, Integer>(x._2, 1))).reduceByKey(sumFunc);
lines.print();
String hashtag = "#dummy"; int frequencies = 59;
String cql = " CREATE (n:Hashtag {name:'"+hashtag+"', freq:"+frequencies+"})";
st.executeUpdate(cql);
jssc.start();
jssc.awaitTermination();
}
但这代码只是执行查询一次。我希望它在每次循环时执行它。 怎么可能做到这一点?提前致谢。
感谢完整和有用的答案。 我只是不知道如何在Java(idk Scala)中实现'foreachRDD'部分。使用lambda表达式,我应该写'lines.foreachRDD(rdd - >(...'用函数代替点吗? – sirdan
我个人推荐使用Scala和Spark Streaming。对于'foreachRDD' lambda的Java转换,我想你可以在Spark Streaming示例包中找到一个例子,例如:https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/ JavaSqlNetworkWordCount.java – maasg
非常感谢,这有助于很多 – sirdan