2017-02-21 42 views
9

我想让消费者演员订阅Kafka主题并流数据,以便在消费者之外使用Spark Streaming进行进一步处理。为什么是演员?因为我读到它的主管策略是处理卡夫卡故障的好方法(例如,在发生故障时重新启动)。来自演员的Spark-Streaming

我发现了两个选项:

  • Java的KafkaConsumer类:其poll()方法返回一个Map[String, Object]。我想要返回一个DStream就像KafkaUtils.createDirectStream会,并且我不知道如何从演员外部获取流。
  • 扩展ActorHelper特征并使用actorStream(),如example所示。后一个选项不显示到主题的连接,而是显示到套接字的连接。

任何人都可以指向正确的方向吗?

回答

2

如需办理卡夫卡的失败,我使用了Apache馆长框架,并采取以下解决办法:

val client: CuratorFramework = ... // see docs 
val zk: CuratorZookeeperClient = client.getZookeeperClient 

/** 
    * This method returns false if kafka or zookeeper is down. 
    */ 
def isKafkaAvailable:Boolean = 
    Try { 
     if (zk.isConnected) { 
     val xs = client.getChildren.forPath("/brokers/ids") 
     xs.size() > 0 
     } 
     else false 
    }.getOrElse(false) 

对于消费卡夫卡的话题,我用了com.softwaremill.reactivekafka库。例如:

class KafkaConsumerActor extends Actor { 
    val kafka = new ReactiveKafka() 
    val config: ConsumerProperties[Array[Byte], Any] = ... // see docs 

    override def preStart(): Unit = { 
     super.preStart() 

     val publisher = kafka.consume(config) 
     Source.fromPublisher(publisher) 
      .map(handleKafkaRecord) 
      .to(Sink.ignore).run() 
    } 

    /** 
    * This method will be invoked when any kafka records will happen. 
    */ 
    def handleKafkaRecord(r: ConsumerRecord[Array[Byte], Any]) = { 
     // handle record 
    } 
}