2014-12-02 40 views
2

我正在使用火花流式传输来消耗kafka消息。我想从kafka获取一些消息,而不是阅读所有消息。所以我想读一批消息,将它们返回给调用者并停止火花流。目前我正在传递batchInterval时间在spark流上下文方法的awaitTermination方法中。我现在不知道如何将处理后的数据从火花流传回给调用者。这里是我的代码,我使用目前在读取第一批数据后停止火花流式传输

def getsample(params: scala.collection.immutable.Map[String, String]): Unit = { 
    if (params.contains("zookeeperQourum")) 
     zkQuorum = params.get("zookeeperQourum").get 
    if (params.contains("userGroup")) 
     group = params.get("userGroup").get 
    if (params.contains("topics")) 
     topics = params.get("topics").get 
    if (params.contains("numberOfThreads")) 
     numThreads = params.get("numberOfThreads").get 
    if (params.contains("sink")) 
     sink = params.get("sink").get 
    if (params.contains("batchInterval")) 
     interval = params.get("batchInterval").get.toInt 
    val sparkConf = new SparkConf().setAppName("KafkaConsumer").setMaster("spark://cloud2-server:7077") 
    val ssc = new StreamingContext(sparkConf, Seconds(interval)) 
    val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap 
    var consumerConfig = scala.collection.immutable.Map.empty[String, String] 
    consumerConfig += ("auto.offset.reset" -> "smallest") 
    consumerConfig += ("zookeeper.connect" -> zkQuorum) 
    consumerConfig += ("group.id" -> group) 
    var data = KafkaUtils.createStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder](ssc, consumerConfig, topicMap, StorageLevel.MEMORY_ONLY).map(_._2) 
    val streams = data.window(Seconds(interval), Seconds(interval)).map(x => new String(x)) 
    streams.foreach(rdd => rdd.foreachPartition(itr => { 
     while (itr.hasNext && size >= 0) { 
     var msg=itr.next 
     println(msg) 
     sample.append(msg) 
     sample.append("\n") 
     size -= 1 
     } 
    })) 
    ssc.start() 
    ssc.awaitTermination(5000) 
    ssc.stop(true) 
    } 

因此,而不是在所谓的“样品”的字符串生成器保存邮件的我想返回到调用者。

回答

3

您可以实现StreamingListener,然后里面,onBatchCompleted你可以调用ssc.stop()

private class MyJobListener(ssc: StreamingContext) extends StreamingListener { 

    override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) = synchronized { 

    ssc.stop(true) 

    } 

} 

这是你如何附上您SparkStreaming到JobListener:

val listen = new MyJobListener(ssc) 
ssc.addStreamingListener(listen) 

ssc.start() 
ssc.awaitTermination() 
+1

火花1.6 .1,尝试使用解决方案时出现以下异常:'org.apache.spark.SparkException:无法停止AsynchronousListenerBus'侦听器线程中的StreamingContext。任何想法如何解决这一问题? – pederpansen 2016-11-29 12:57:05

0

我们可以使用下面的代码段

var sampleMessages=streams.repartition(1).mapPartitions(x=>x.take(10)) 

获得样本消息,如果我们想要第一批后停止那么我们就应该执行我们自己的StreamingListener接口,并应停止在onBatchCompleted方法流。