2
我是新来的spark和mqtt。我试图使用我得到了网上一个名为wordcount.py使用mqtt与pyspark streaming
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.mqtt import MQTTUtils
if __name__ == "__main__":
if len(sys.argv) != 3:
print >> sys.stderr, "Usage: mqtt_wordcount.py <broker url> <topic>"
exit(-1)
sc = SparkContext(appName="PythonStreamingMQTTWordCount")
ssc = StreamingContext(sc, 1)
brokerUrl = sys.argv[1]
topic = sys.argv[2]
lines = MQTTUtils.createStream(ssc, brokerUrl, topic)
counts = lines.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a+b)
counts.pprint()
ssc.start()
ssc.awaitTermination()
,我跟着指示安装mosquitto代理(它的工作)MQTTUtils的代码,下载火花流-MQTT-assembly_2。 11-1.6.2.jar并运行此命令的python脚本: 〜$火花提交--jars火花流-MQTT组装_ *罐子wordcount.py
,但显示的错误:
from pyspark.streaming.mqtt import MQTTUtils
ImportError:No module named mqtt
是我错过了什么从这里? 谢谢
如何创建[MCVE。另外Spark 2.0+不再提供MQTT后端。它已被转移到Spark包。 – zero323
我有同样的问题,但我正在使用2.0版本,现在我正在使用1.6.2版本,脚本正在运行。 – 2016-10-11 20:38:20