2017-03-07 16 views
1

我已经2个RDDS它需要连接加入和映射2个RDDS有条件

val rdd1 = RDD[(v_id, inputObject1)] 

其中v_id是唯一的ID

和inputObject1具有以下字段

g_id, p_id, timestamp=t1 

现在我有另一RDD

val rdd2 = RDD[(g_id, inputObject2)] 

个这里inputObject2有以下领域

p_id, timestamp=t2, e_id 

现在我想加入这2个RDDS下面条件

  • 如果G_ID和p_id的是相同的,| T1-T2 | < 30分钟
  • 否则,如果g_id相同并且| t1-t2 | < 30分钟

所以第二个条件是后备,如果第一个条件不满足。我最后的输出应该是这个

val resuldRDD = RDD[(v_id, inputObject11)] 

其中inputObject11 = inputObject1 +增加从第二RDD E_ID如果条件得到满足。

所以字段将

g_id, p_id, e_id, timestamp=t1 
+0

也不苏你可以加入条件。你可以通过ID加入,然后根据任何条件进行过滤。 – Hlib

回答