2017-07-04 41 views
1

我试图访问像在解决这个问题上取得过滤DStreams集合的访问收集:Spark Streaming - Best way to Split Input Stream based on filter ParamDStreams

我创建集合如下:

val statuCodes = Set("200","500", "404") 
    spanTagStream.cache() 
    val statusCodeStreams = statuCodes.map(key => key -> spanTagStream.filter(x => x._3.get("http.status_code").getOrElse("").asInstanceOf[String].equals(key))) 

我尝试访问statusCodeStreams以下列方式:

for(streamTuple <- statusCodeStreams){ 
     streamTuple._2.foreachRDD(rdd => 
    rdd.foreachPartition(
     partitionOfRecords => 
     { 
      val props = new HashMap[String, Object]() 
      props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers) 
      props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
       "org.apache.kafka.common.serialization.StringSerializer") 
      props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
       "org.apache.kafka.common.serialization.StringSerializer") 
      val producer = new KafkaProducer[String,String](props) 

      partitionOfRecords.foreach 
      { 
       x=>{ 
       /* Code Writing to Kafka using streamTuple._1 as the topic-String */ 
       } 
      } 
     }) 
    ) 
} 

当执行这个我收到以下错误: java.io.NotSerializab leException:Object of org.apache.spark.streaming.kafka010.DirectKafkaInputDStream is being serialized possibly as a part of closure of an RDD operation. This is because the DStream object is being referred to from within the closure. Please rewrite the RDD operation inside this DStream to avoid this. This has been enforced to avoid bloating of Spark tasks with unnecessary objects

如何访问数据流写入到卡夫卡在一个序列化的方式?

回答

1

作为异常指示,所述DStream定义正在被闭合捕获。 一个简单的选择是声明此DStream瞬态:

@transient val spamTagStream = //KafkaUtils.create... 

@transient标志某些对象,以从某个对象的对象图的Java序列被移除。此方案的关键是,在相同的范围内(在这种情况下statusCodeStreams)的DStream宣布一些val被封闭内使用。该val从封闭中的实际参考是outer.statusCodeStreams,引起序列化过程中“拉”的outer所有上下文到关闭。使用@transient我们将DStream(以及StreamingContext)声明标记为不可序列化,并且我们避免了序列化问题。根据不同的代码结构(如果它在一个main功能(不好的做法,所有的线性,顺便说一句),它可能是必要的,以纪念ALL DSTREAM声明+中的StreamingContext实例作为@transient

如果初始过滤的唯一目的是“路线”的内容分开卡夫卡的话题,它可能是值得移动foreachRDD内的过滤。这将使一个简单的程序结构。

spamTagStream.foreachRDD{ rdd => 
    rdd.cache() 
    statuCodes.map{code => 
     val matchingCodes = rdd.filter(...) 
     matchingCodes.foreachPartition{write to kafka} 
    } 
    rdd.unpersist(true) 
} 
+0

什么是使用的优势/劣势'@ transient' ?当我进行转化时,必须在流的初始化之前进行,直到我达到最后'spanTagStream'?关于这种情况:这只是我已经拥有的代码的一些缩减,可以立即利用收藏。其他用例将使用原样使用Stream或者在一段时间内写入RDD以训练一些机器学习算法,或者提供数据点以便与训练好的模型进行比较。 – LST

+0

@LST添加上'transient'解释的问题(太长的评论,我认为这将是在未来的问题,参考值无论如何) - 该@Transient标志必须在每个DSTREAM赋值。通过在单独的类/对象中构造代码并小心封闭的序列化范围,可以避免很多问题。 – maasg

+0

你有一个resoruce(也许是一个你知道的git项目或类似的东西),良好的做法是如何构建最好的应用程序以避免上下文问题,而不需要标记瞬态?我还是一开始,主要从Spark主页上的教程中学习,然后从那里开始学习。所以我基本上仍然在主要方法中做所有事情。 – LST

相关问题