2016-01-04 34 views
0

我通过火花流应用实时异常检测系统。 在每个流式传输间隔中,如果数据点是异常的,AWS SNS会发送电子邮件来订阅帐户。 但AWS SNS java sdk就像在火花流中不工作。下面是错误消息AWS SNS SDK不适用于火花流传输


错误的StreamingContext:错误启动的背景下,将其标记为停止 java.io.NotSerializableException:DSTREAM检查点已启用,但它们的功能DStreams不是序列 com.amazonaws。 services.sns.AmazonSNSClient 序列化堆栈: - 对象不可序列化(class:com.amazonaws.services.sns.AmazonSNSClient,value:[email protected]) - field(class:wordCount $$ anonfun $ main $ 2,name:snsClient $ 1,type:class com.amazonaws.services.sns.AmazonSNSClient) - object(class classCount $$ anonfun $ main $ 2,) - field(class:org.apache.spark.streaming.dstream.DStream $$ anonfun $ foreachRDD $ 1 $$ anonfun $ apply $ mcV $ sp $ 3,name:clearedF $ 1,type:interface scala.Function1) - 对象(类org.apache.spark.streaming.dstream.DStream $$ anonfun $ foreachRDD $ 1 $$ anonfun $ apply $ mcV $ sp $ 3) - writeObject数据(类:org.apache.spark.streaming.dstream。对象(类org.apache.spark.streaming.dstream.ForEachDStream,[email protected]) - writeObject数据(类:org.apache.spark.streaming.dstream。 DStreamCheckpointData) - object(class org.apache.spark.streaming.dstream.DStreamCheckpointData,[ 0 checkpoint files


有没有人有任何想法解决这个问题..或者有一些其他的解决方案,以发送电子邮件的火花流

非常感谢

回答

0

的错误是AmazonSNSClient实例不是序列化。这可能意味着你已经在转换之外实例化它并在转换中使用它。这将导致火花串行它。

对于非流式火花,您可以尝试在RDD中的mapPartitions函数中实例化AmazonSNSClient,或者使用火花流等效函数。快速浏览流媒体文档有一个可能对你有用的部分,其中似乎涵盖了围绕efficiently creating connections to databases,外部系统等的类似地面。

重点是你需要在worker上实例化你的客户端,而不是然后将它发送给工作者(这要求实例是可序列化的)。

+0

感谢您的帮助和参考。我的问题像你说的那样,把AmazonSNSClient实例放在转换之外(foreachRDD),然后发布sns。 –