0
我有一个问题:是否可以从spark.sql.Row获取自定义对象?如何从spark.sql.Row获取自定义对象
当前代码能够将数据推送到Spark ROW,但我无法将其提取回来。
首先有一个简单的POJO对象:
public class Event implements Serializable {
private Map<String, Object> fields;
public Event() {
}
public Event(Map<String, Object> fields) {
this.fields = fields;
}
public Map<String, Object> getFields() {
return fields;
}
public void setFields(Map<String, Object> fields) {
this.fields = fields;
}
}
当我们创建一个使用星火流API Tuple2的JavaDStream(字符串,事件)下一个步骤。之后,我们将每个RDD转换为数据集。
JavaDStream<Tuple2<String, Event>> events = ...
events.foreachRDD(tuple2JavaRDD -> {
SparkSession sparkSession = SparkSession.builder().config(tuple2JavaRDD.context().conf()).getOrCreate();
Dataset<Row> dataSet = sparkSession.createDataset(tuple2JavaRDD.rdd(),
Encoders.tuple(Encoders.STRING(), Encoders.bean(Event.class))).toDF("EventType", "Event");
//try to get data back
Dataset<Event> eventsSet = dataSet.map((MapFunction<Row, Event>) row -> row.<Event>getAs(1), Encoders.bean(Event.class));
//and getting an exception when try to get the element from stream
eventsSet.show();
});
}
这是我得到的错误:
java.lang.ClassCastException:org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema不能转换到事件