2016-09-19 95 views
0

数据将是一个线后产生火花数据帧分隔JSON字符串象下面如何从卡夫卡队列直接从卡夫卡队列读取数据

{“头”:{“平台”:“大气压”,“MSGTYPE “:” 1" , “版本”: “1.0”}, “详细信息”:[{ “BCC”: “5814”, “DSRC”: “A”, “辅助”: “5678”},{ “BCC” :“5814”,“dsrc”:“A”,“mid”:“0003”},{“bcc”:“5812”,“dsrc”:“A”,“mid”:“0006”}]}

{“header”:{“platform”:“atm”,“msgtype”:“1”,“version”:“1.0”},“details”:[{“bcc”:“5814”,“dsrc “:” A”, “辅助”: “1234”},{ “BCC”: “5814”, “DSRC”: “A”, “中间”: “0004”},{ “BCC”: “5812”, “dsrc”:“A”,“mid”:“0009”}]}

{“header”:{“platform”:“atm ”, “信息类型”: “1”, “版本”: “1.0”}, “详细信息”:[{ “BCC”: “5814”, “DSRC”: “A”, “辅助”: “1234”}, { “BCC”: “5814”, “DSRC”: “A”, “中间”: “0004”},{ “BCC”: “5812”, “DSRC”: “A”, “中间”: “0009” }]}

我们如何在python中为上述输入创建一个数据框?我有很多列访问上面只是一个示例,数据总共有23列。任何帮助,将不胜感激。

回答

0

您正在寻找pyspark.sql.SQLContext.jsonRDD。由于Spark流是批量生成的,因此您的流对象将返回一系列RDD,每个RDD都可以通过jsonRDD生成DF。

+0

@佩德罗席尔瓦谢谢!我尝试运行下面的代码,但得到错误sc = SparkContext() sqlContext = SQLContext(sc) df = sqlContext.jsonRDD(“/ nro0/app/spark/examples/src/main/resources/input.json “) df.show()AttributeError:'str'对象没有属性'mapPartitions' –

+0

'jsonRDD'需要一个'RDD'。如果你想读一个文件,你需要'sqlContext.read.json'。 –