1

我想从spark-streaming保存到几个弹性搜索索引。 我创建了<key(index), value>对,当我执行groupByKey时,结果是的Tuple,但为了使用elasticsearch-spark插件保存到elasticsearch,我需要的值为JavaRDD<value>将iterable转换为RDD

我知道有一个sparkContext.parallelize(list)选项可以从列表创建JavaRDD,但是这只能在驱动程序上执行。

是否有另一种选择来创建可以在执行程序上执行的JavaRDD?或者我可以通过另一种方式实现对执行程序有效的Tuple2<key(index), JavaRDD<value>>? 如果不是,我怎么才能使Iterator只能切换到驱动程序上的JavaRDD,以及插件是否在执行程序中写入elasticsearch?

感谢,

丹妮拉

+0

埃姆,AFAIK,'groupByKey'产生'JavaPairRDD >',它仍然是'rdd' 。 rdd的任何进一步处理都在执行程序上执行,而不是在驱动程序上执行。 –

回答

1

我会说,它必须是能够有像水木清华以下

JavaPairRDD<Key, Iterable<Value>> pair = ...; 
JavaRDD<Iterable<Value>> values = pair.map(t2 -> t2._2()); 
JavaRDD<Value> onlyValues = values.flatMap(it -> it); 

替代办法是

JavaPairRDD<Key, Iterable<Value>> pair = ...; 
JavaRDD<Key, Value> keyValues = pair.flatMapValues(v1 -> v1); 
JavaRDD<Value> values = keyValues.map(t2 -> t2._2()); 
+0

感谢叶甫,因为我需要从JavaPairRDD <钥匙,可迭代>去JavaRDD 里面foreachRDD JavaRDD 值的结果= rdd.flatMap((FlatMapFunction >,字符串>)tuple2 - > { final列表 l = Lists.newArrayList(); tuple2._2()。forEach(l :: add); return l; });与相同的密钥有关? – Daniela

+0

我可能误解了你的问题。我会编辑我的答案,希望这次会更好。 – evgenii

相关问题