5

我使用的Spark 2.2,我想读卡夫卡的JSON消息行,它们变换为DataFrame并将它们作为一个:这个我jsontostructs在火花结构流

spark 
    .readStream() 
    .format("kafka") 
    .option("kafka.bootstrap.servers", "localhost:9092") 
    .option("subscribe", "topic") 
    .load() 
    .select(col("value").cast(StringType).as("col")) 
    .writeStream() 
    .format("console") 
    .start(); 

可以实现:

+--------------------+ 
|     col| 
+--------------------+ 
|{"myField":"somet...| 
+--------------------+ 

我想要更多的东西是这样的:

+--------------------+ 
|    myField| 
+--------------------+ 
|"something"   | 
+--------------------+ 

我试着用struct使用from_json功能:

DataTypes.createStructType(
    new StructField[] { 
      DataTypes.createStructField("myField", DataTypes.StringType) 
    } 
) 

,但我只得到:

+--------------------+ 
| jsontostructs(col)| 
+--------------------+ 
|[something]   | 
+--------------------+ 

然后我试图使用explode但我只拿到了异常说:

cannot resolve 'explode(`col`)' due to data type mismatch: 
input to function explode should be array or map type, not 
StructType(StructField(... 

任何想法如何使这项工作?

回答

4

你几乎在那里,只要选择正确的事情。 from_json返回与该模式匹配的struct列。如果架构(JSON表示)是这样的:

{"type":"struct","fields":[{"name":"myField","type":"string","nullable":false,"metadata":{}}]} 

你会得到嵌套的对象等同于:

root 
|-- jsontostructs(col): struct (nullable = true) 
| |-- myField: string (nullable = false) 

可以使用getField(或getItem)方法来选择特定领域

df.select(from_json(col("col"), schema).getField("myField").alias("myField")); 

.*选择struct中的所有顶级字段:

df.select(from_json(col("col"), schema).alias("tmp")).select("tmp.*"); 

虽然单stringget_json_object应该是绰绰有余:

df.select(get_json_object(col("col"), "$.myField"));