我在databricks的网站上发现了这个博客。它展示了如何利用Spark SQL的API来消费和转换来自Apache Kafka的复杂数据流。
https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html
有解释UDF如何被用来解串器行的部分:
object MyDeserializerWrapper {
val deser = new MyDeserializer
}
spark.udf.register("deserialize", (topic: String, bytes: Array[Byte]) =>
MyDeserializerWrapper.deser.deserialize(topic, bytes)
)
df.selectExpr("""deserialize("topic1", value) AS message""")
我用java,所以只好写下面的示例UDF,以检查它怎么能在Java中被称为:
UDF1<byte[], String> mode = new UDF1<byte[], String>() {
@Override
public String call(byte[] bytes) throws Exception {
String s = new String(bytes);
return "_" + s;
}
};
现在我可以使用这个UDF在结构化流字数举例如下:
Dataset<String> words = df
//converted the DataFrame to a Dataset of String using .as(Encoders.STRING())
// .selectExpr("CAST(value AS STRING)")
.select(callUDF("mode", col("value")))
.as(Encoders.STRING())
.flatMap(
new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String x) {
return Arrays.asList(x.split(" ")).iterator();
}
}, Encoders.STRING());
对我来说,下一步是为节俭反序列化写一个UDF。我会尽快发布。