0
我需要帮助在pyspark。我正在从kafka流式传输json数据,我需要在pyspark中将其转换为Dataframe。为了流,我使用了下面的代码。火花流在pyspark json文件中的数据帧
from __future__ import print_function
import sys
import csv
import json
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql import Row
import pandas as pd
global gspark
def convert_Json2DF(time,rdd):
nf = gspark.read.json(rdd)
nf.toDF().show()
# Convert RDD[String] to RDD[Row] to DataFrame
#rowRdd = rdd.map(lambda w: Row(word=w))
#wordsDataFrame = gspark.createDataFrame(rowRdd)
#pdf = wordsDataFrame.toDF()
#pdf.show()
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: kafka_wordcount.py <zk> <topic>", file=sys.stderr)
exit(-1)
gspark = SparkSession \
.builder \
.appName("SparkSteaming Kafka Receiver") \
.config("spark.some.config.option", "some-value") \
.config("spark.ui.port", 22300) \
.config("spark.executor.instances", 4) \
.config("spark.executor.cores", 4) \
.getOrCreate()
sc = gspark.sparkContext
SQLContext= SQLContext(sc)
ssc = StreamingContext(sc, 15)
zkQuorum, topic = sys.argv[1:]
kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
lines = kvs.map(lambda (key,value): json.loads(value))
lines.pprint()
lines.foreachRDD(Json2DF)
ssc.start()
ssc.awaitTermination()
对于上述代码,我无法将json数据转换为数据框。任何人都可以纠正我在哪里我需要做的变化,在Json2DF功能或主要功能。
感谢 巴拉