2017-08-21 65 views
0

我想要使用由卡夫卡发布的火花数据,但我无法这样做。我正在使用Spark 2.2。使用卡夫卡与火花使用pyspark问题

  1. 我想使用由卡夫卡使用Spark发送的数据,处理它并存储在本地文件或HDFS中。
  2. 我想打印出运行spark工作后在控制台中由kafka发送的数据(由spark消耗)。

对于卡夫卡,我下面这个教程:https://kafka.apache.org/quickstart

[[email protected] kafka]$ ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 
    >message 1 
    >message 2 
    >message 3 
    >message 4 

运行星火python脚本file.py:

./spark/bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0 file.py 

Pyspark代码:

from pyspark.sql import SparkSession 
spark = SparkSession.builder.appName("stream").getOrCreate() 

df = spark\ 
.readStream\ 
.format("kafka")\ 
.option("kafka.bootstrap.servers","localhost:9092")\ 
.option("subscribe","test")\ 
.load() 

df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "topic") 


#Trying to save result in a file 
df.writeStream\ 
.format("text")\ 
.option("checkpointLocation", "file:///home/cloudera/file.txt")\ 
.option("path","file:///home/cloudera/file.txt")\ 
.start() 
# Does not write to a file 

#Trying to print result in console 
df.writeStream()\ 
.outputMode("append")\ 
.format("console")\ 
.start() 
# Does not print to console and gives error: TypeError: 'DataStreamWriter' object is not callable 

任何帮助?

+0

只是为了确保,你开始引发** ** THEN产生的数据权? – LuckyGuess

+0

@Falan是的,我首先开始了kafka。我想知道如何从火花流中将数据存储到HDFS中。 – Rio

回答

0

的问题很可能是这一行:

df.writeStream()\ 

从行中删除()像这样:

df.writeStream\ 
+1

你好@antoine,欢迎来到StackOverflow!请花一分钟时间[参观](https://stackoverflow.com/tour)。当他们解释OP所做的错误以及为什么时,答案更有用。请编辑你的答案来描述你的代码为什么会起作用,而不是OP的作用。 –