0
我想要使用由卡夫卡发布的火花数据,但我无法这样做。我正在使用Spark 2.2。使用卡夫卡与火花使用pyspark问题
- 我想使用由卡夫卡使用Spark发送的数据,处理它并存储在本地文件或HDFS中。
- 我想打印出运行spark工作后在控制台中由kafka发送的数据(由spark消耗)。
对于卡夫卡,我下面这个教程:https://kafka.apache.org/quickstart
[[email protected] kafka]$ ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>message 1
>message 2
>message 3
>message 4
运行星火python脚本file.py:
./spark/bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0 file.py
Pyspark代码:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("stream").getOrCreate()
df = spark\
.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers","localhost:9092")\
.option("subscribe","test")\
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "topic")
#Trying to save result in a file
df.writeStream\
.format("text")\
.option("checkpointLocation", "file:///home/cloudera/file.txt")\
.option("path","file:///home/cloudera/file.txt")\
.start()
# Does not write to a file
#Trying to print result in console
df.writeStream()\
.outputMode("append")\
.format("console")\
.start()
# Does not print to console and gives error: TypeError: 'DataStreamWriter' object is not callable
任何帮助?
只是为了确保,你开始引发** ** THEN产生的数据权? – LuckyGuess
@Falan是的,我首先开始了kafka。我想知道如何从火花流中将数据存储到HDFS中。 – Rio