2017-08-29 51 views
0

我有一个问题:是否可以从spark.sql.Row获取自定义对象?如何从spark.sql.Row获取自定义对象

当前代码能够将数据推送到Spark ROW,但我无法将其提取回来。

首先有一个简单的POJO对象:

public class Event implements Serializable { 

    private Map<String, Object> fields; 

    public Event() { 
    } 

    public Event(Map<String, Object> fields) { 
    this.fields = fields; 
    } 

    public Map<String, Object> getFields() { 
    return fields; 
    } 

    public void setFields(Map<String, Object> fields) { 
    this.fields = fields; 
    } 
} 

当我们创建一个使用星火流API Tuple2的JavaDStream(字符串,事件)下一个步骤。之后,我们将每个RDD转换为数据集。

JavaDStream<Tuple2<String, Event>> events = ... 

    events.foreachRDD(tuple2JavaRDD -> { 

     SparkSession sparkSession = SparkSession.builder().config(tuple2JavaRDD.context().conf()).getOrCreate(); 

     Dataset<Row> dataSet = sparkSession.createDataset(tuple2JavaRDD.rdd(), 
       Encoders.tuple(Encoders.STRING(), Encoders.bean(Event.class))).toDF("EventType", "Event"); 

     //try to get data back 
     Dataset<Event> eventsSet = dataSet.map((MapFunction<Row, Event>) row -> row.<Event>getAs(1), Encoders.bean(Event.class)); 

     //and getting an exception when try to get the element from stream 
     eventsSet.show(); 
    }); 
} 

这是我得到的错误:

java.lang.ClassCastException:org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema不能转换到事件

回答

0

如何

eventsSet.select("Event").as(Encoders.bean(Event.class)); 

为您的代码,你应该构造对象一步步骤:

... 
Event event = new Event(); 
event.setFields(row.getAs(...)); 
return event; 
相关问题