4
我在一个类似用途的情况下使用transform
方法在描述变换运算部分的Transformations on DStreams:如何使用变换操作和外部RDD过滤dstream?
spamInfoRDD = sc.pickleFile(...) # RDD containing spam information
# join data stream with spam information to do data cleaning
cleanedDStream = wordCounts.transform(lambda rdd: rdd.join(spamInfoRDD).filter(...))
我的代码如下:
sc = SparkContext("local[4]", "myapp")
ssc = StreamingContext(sc, 5)
ssc.checkpoint('hdfs://localhost:9000/user/spark/checkpoint/')
lines = ssc.socketTextStream("localhost", 9999)
counts = lines.flatMap(lambda line: line.split(" "))\
.map(lambda word: (word, 1))\
.reduceByKey(lambda a, b: a+b)
filter_rdd = sc.parallelize([(u'A', 1), (u'B', 1)], 2)
filtered_count = counts.transform(
lambda rdd: rdd.join(filter_rdd).filter(lambda k, (v1, v2): v1 and not v2)
)
filtered_count.pprint()
ssc.start()
ssc.awaitTermination()
但我得到以下错误
看来您正试图广播RDD或从ac引用RDD重刑或转型。 RDD转换和操作只能由驱动程序调用,而不能在其他转换中调用;例如,rdd1.map(lambda x:rdd2.values.count()* x)无效,因为值转换和计数操作不能在rdd1.map转换中执行。有关更多信息,请参阅SPARK-5063。
我该如何使用外部RDD过滤dstream中的元素?
你得到了这个答案 – Bg1850