2017-05-27 49 views
0

我的项目包括ZooKeeper,Kafka和Spark Streaming。问题是zkClient当我尝试使用Spark Streaming向ZooKeeper写入Kafka偏移量时,无法序列化。我见过几个GitHub的项目,如:https://github.com/ippontech/spark-kafka-sourcezkClient不能被Serializabled,sparkstreaming写kafka偏移量到zookeeper

//save the offsets 

kafkaStream.foreachRDD(rdd => offsetsStore.saveOffsets(topic, rdd)) 

def saveOffsets(topic: String, rdd: RDD[_]): Unit = { 

    logger.info("Saving offsets to ZooKeeper") 
    val stopwatch = new Stopwatch() 

    val offsetsRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges 
    offsetsRanges.foreach(offsetRange => logger.debug(s"Using ${offsetRange}")) 

    val offsetsRangesStr = offsetsRanges.map(offsetRange => s"${offsetRange.partition}:${offsetRange.fromOffset}").mkString(",") 
    logger.debug(s"Writing offsets to ZooKeeper: ${offsetsRangesStr}") 
    **ZkUtils.updatePersistentPath(zkClient, zkPath, offsetsRangesStr)** 

    logger.info("Done updating offsets in ZooKeeper. Took " + stopwatch) 

} 

正如代码:kafkaStream.foreachRDD(rdd => offsetsStore.saveOffsets(rdd))将司机private val zkClient = new ZkClient(zkHosts, 30000, 30000,ZKStringSerializer)在对象offsetStore执行,但zkClient无法序列,它是如何工作的?

+0

kafkaStream.foreachRDD(RDD => { rdd.foreachPartition(X => offsetsStore.saveOffsets(RDD)) })这会是好的,但foreachpartition将作为分区号执行severl次数 –

回答

0

可以定义zkClient@transient lazy val,这意味着它不会驱动器和执行器(这是@transient一部分)之间被序列化,而是将被重新初始化时,在每一个和所述类的每个实例,其包含上述代码(这是lazy部分)。

你可以阅读更多关于这个模式在这里: http://fdahms.com/2015/10/14/scala-and-the-transient-lazy-val-pattern/

+0

ty :)我已经尝试过这种方式,但失败了,现在它的工作,所以这意味着初始化zkClient只是在驱动程序中执行? –

+0

这绝对意味着zkClient不会被序列化。 我并不是100%确定遍历'offsetsetsRanges'的代码在哪里运行。 在任何情况下,运行此代码的每个位置都将初始化一个单独的zkClient实例。 希望这有助于:-) – Alexey

+0

只有在驱动程序它可以得到的RDD不RDD分区,所以offsetsetsRanges只能在驱动程序中执行,我认为这就是为什么zkClient不需要序列化。 –

相关问题