5
我使用的Spark 2.2,我想读卡夫卡的JSON消息行,它们变换为DataFrame
并将它们作为一个:这个我jsontostructs在火花结构流
spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "topic")
.load()
.select(col("value").cast(StringType).as("col"))
.writeStream()
.format("console")
.start();
可以实现:
+--------------------+
| col|
+--------------------+
|{"myField":"somet...|
+--------------------+
我想要更多的东西是这样的:
+--------------------+
| myField|
+--------------------+
|"something" |
+--------------------+
我试着用struct
使用from_json
功能:
DataTypes.createStructType(
new StructField[] {
DataTypes.createStructField("myField", DataTypes.StringType)
}
)
,但我只得到:
+--------------------+
| jsontostructs(col)|
+--------------------+
|[something] |
+--------------------+
然后我试图使用explode
但我只拿到了异常说:
cannot resolve 'explode(`col`)' due to data type mismatch:
input to function explode should be array or map type, not
StructType(StructField(...
任何想法如何使这项工作?