2015-08-24 21 views
0

我有两个星火1.4.1 PipelineRDD(我不知道什么样的对象是:-s:星火 - 在特定领域加入JSON RDDS(键 - 值)

1)名单IDS(ids_alsaciens RDD)

2)的列表personne(personnes RDD)

在 'Personnes' RDD有4个字段,在JSON格式,关键是 “ID”。 我可能在这张表中有同一人的几条线(id是相同的)

我想获取'personnes'RDD上的'alsacien'表中包含的所有行。

我怎么能这样做在火花?

>type(ids_alsaciens) 
pyspark.rdd.PipelinedRDD 
>type(personnes) 
pyspark.rdd.PipelinedRDD 

>ids_alsaciens.take(10) 
    [u'1933992', 
    u'2705919', 
    u'2914684', 
    u'2915444', 
    u'11602833', 
    u'11801394', 
    u'10707371', 
    u'2018422', 
    u'2312432', 
    u'233375'] 
    >personnes.take(3) 
    [{'date': '2013-06-03 00:00', 
     'field': 'WAID_INDIVIDU_WC_NUMNNI', 
     'id': '10000149', 
     'value': '2770278'}, 
    {'date': '2013-05-15 00:00', 
     'field': 'WAID_INDIVIDU_WC_NUMNNI', 
     'id': '10009910', 
     'value': '2570631'}, 
    {'date': '2013-03-01 00:00', 
     'field': 'WAID_INDIVIDU_WC_NUMNNI', 
     'id': '10014405', 
     'value': '1840288'}] 

编辑

尝试: personnes.filter(拉姆达X:X在ids_alsaciens)

了异常: 例外:看来您正在尝试播放的RDD或引用RDD从行动或转变。 RDD转换和操作只能由驱动程序调用,而不能在其他转换中调用;例如,rdd1.map(lambda x:rdd2.values.count()* x)无效,因为值转换和计数操作不能在rdd1.map转换中执行。有关更多信息,请参阅SPARK-5063。

回答

0

出现SPARK-5063错误是因为不允许在地图内调用RDD函数,因为运行地图任务的火花工作人员无法自行完成工作。

使用星火RDD.join:

documentation

join(otherDataset, [numTasks])  

当呼吁(K, V)类型和(K, W)的数据集,返回(K, (V, W))对所有元素对每个关键

数据集

秘诀就是知道Spark对待所有2作为元组(key,value)对,您可以使用RDD.map(),使自己对:

kv_ids_alsaciens = ids_alsaciens.map(lambda id: (id, 0)) 

使得(k,v)双从ids_alsaciens其中k=idv=0。这有点浪费,但我还没有测试过是否可以消除v

与personnes

然后:

kv_personnes = personnes.map(lambda p: (p['id'],p)) 

现在我们可以使用加入成为这样

joined_kv_ids_alsaciens_personnes = kv_ids_alsaciens.join(kv_personnes) 

,同时将与条目进行RDD像

(10000149, (0, {'date': '2013-06-03 00:00', 'field': 'WAID_INDIVIDU_WC_NUMNNI', 'id': '10000149', 'value': '2770278'}))

,其中第一项是一个匹配的ID,并且第二个是 项目是一对(match1,match2)其中 match1总是0因为设置我们的第一个数据始终 有0在对中的值,并且match2是一个字典的personnes数据 。

这不完全是需要的。更好的格式可能是只发布字典。我们可以用另一张地图做到这一点。

match_personnes = joined_kv_ids_alsaciens_personnes.map(lambda (k,(v1,v2)): v2) 

总之,在内存缓存()的最终结果:

match_personnes = (ids_alsaciens 
        .map(lambda id: (id, 0)) 
        .join(personnes.map(lambda p: (p['id'],p))) 
        .map(lambda (k,(v1,v2)): v2) 
        .cache() 
        ) 

测试:

match_personnes.take(10) 
+0

我没有测试你的代码,当我达到一个相当类似的方案谢谢:-) –

+0

不客气。 – Paul