1
什么是“推荐”方式来处理每条消息,因为它通过结构化流式传输管道(我在spark 2.1.1上,源代码为kafka 0.10.2.1)?结构化流式传输 - 消费每个消息
到目前为止,我在看dataframe.mapPartitions
(因为我需要连接到hbase,其客户端连接类不是serizalable,因此mapPartitions
)。
想法?
什么是“推荐”方式来处理每条消息,因为它通过结构化流式传输管道(我在spark 2.1.1上,源代码为kafka 0.10.2.1)?结构化流式传输 - 消费每个消息
到目前为止,我在看dataframe.mapPartitions
(因为我需要连接到hbase,其客户端连接类不是serizalable,因此mapPartitions
)。
想法?
您应该能够使用foreach
输出水槽:https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-sinks和https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach
即使客户端不是序列化的,你没有在你的ForeachWriter
构造函数将其打开。只需将它保留为None/null,并将其初始化为open
方法,该方法在序列化后称为,但每个任务只能执行一次。
在排序的伪码:
class HBaseForeachWriter extends ForeachWriter[MyType] {
var client: Option[HBaseClient] = None
def open(partitionId: Long, version: Long): Boolean = {
client = Some(... open a client ...)
}
def process(record: MyType) = {
client match {
case None => throw Exception("shouldn't happen")
case Some(cl) => {
... use cl to write record ...
}
}
}
def close(errorOrNull: Throwable): Unit = {
client.foreach(cl => cl.close())
}
}