我试图访问像在解决这个问题上取得过滤DStreams集合的访问收集:Spark Streaming - Best way to Split Input Stream based on filter ParamDStreams
我创建集合如下:
val statuCodes = Set("200","500", "404")
spanTagStream.cache()
val statusCodeStreams = statuCodes.map(key => key -> spanTagStream.filter(x => x._3.get("http.status_code").getOrElse("").asInstanceOf[String].equals(key)))
我尝试访问statusCodeStreams
以下列方式:
for(streamTuple <- statusCodeStreams){
streamTuple._2.foreachRDD(rdd =>
rdd.foreachPartition(
partitionOfRecords =>
{
val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String,String](props)
partitionOfRecords.foreach
{
x=>{
/* Code Writing to Kafka using streamTuple._1 as the topic-String */
}
}
})
)
}
当执行这个我收到以下错误: java.io.NotSerializab leException:Object of org.apache.spark.streaming.kafka010.DirectKafkaInputDStream is being serialized possibly as a part of closure of an RDD operation. This is because the DStream object is being referred to from within the closure. Please rewrite the RDD operation inside this DStream to avoid this. This has been enforced to avoid bloating of Spark tasks with unnecessary objects
如何访问数据流写入到卡夫卡在一个序列化的方式?
什么是使用的优势/劣势'@ transient' ?当我进行转化时,必须在流的初始化之前进行,直到我达到最后'spanTagStream'?关于这种情况:这只是我已经拥有的代码的一些缩减,可以立即利用收藏。其他用例将使用原样使用Stream或者在一段时间内写入RDD以训练一些机器学习算法,或者提供数据点以便与训练好的模型进行比较。 – LST
@LST添加上'transient'解释的问题(太长的评论,我认为这将是在未来的问题,参考值无论如何) - 该@Transient标志必须在每个DSTREAM赋值。通过在单独的类/对象中构造代码并小心封闭的序列化范围,可以避免很多问题。 – maasg
你有一个resoruce(也许是一个你知道的git项目或类似的东西),良好的做法是如何构建最好的应用程序以避免上下文问题,而不需要标记瞬态?我还是一开始,主要从Spark主页上的教程中学习,然后从那里开始学习。所以我基本上仍然在主要方法中做所有事情。 – LST