2015-10-18 146 views
2

我有一个键值对RDD其中的关键是演员和值的影片,这个演员参与两个键,形式对RDD:PySpark - 创建与共享相同的值

["actor 1", "movie 1"] 
["actor 1", "movie 2"] 
["actor 1", "movie 3"] 
... 
["actor n", "movie 2"] 

我想将它映射到另一个键值对RDD,其中每对由两个参与普通电影的演员组成。

在上例中,这意味着新的RDD将包含["actor 1", "actor n"]对,因为它们都参与"movie 2"

回答

2

一个简单的交换和加入应该做的伎俩。首先,让我们创建一些虚拟的数据和一个小的辅助函数:

actor_movie = sc.parallelize([ 
    ("actor 1", "movie 1"), 
    ("actor 1", "movie 3"), 
    ("actor 1", "movie 3"), 
    ("actor n", "movie 2") 
]) 

swap = lambda x: (x[1], x[0]) 

接下来你交换顺序:

movie_actor = (actor_movie.map(swap) 
    .partitionBy(actor_movie.getNumPartitions()) 
    .cache()) 

并加入:

(movie_actor 
    .join(movie_actor) # Join by movie 
    .values() # Extract values (actors) 
    .filter(lambda x: x[0] != x[1])) 
+0

正是我需要的,谢谢! – nikos

1

这不完全是你要求什么,但我认为这已经足够了:

import itertools as iter 

movies = sc.parallelize([("P", "SW4"), ("P", "SW5"), ("P", "SW6"), 
         ("A", "SW4"), ("A", "SW5"), 
         ("B", "SW5"), ("B", "SW6"), 
         ("W", "SW4"), 
         ("X", "SW1"), ("X", "SW7"), ("X", "SW2"), ("X", "SW3"), 
         ("Y", "SW1"), ("Y", "SW7"), ("Y", "SW2"), ("Y", "SW3")]) 

swap_tuple = lambda (k, v): (v, k) 

movies = movies.groupByKey().mapValues(list) 

all_pairs = movies.flatMap(lambda (movie, actors): map(lambda actors:(movie, actors), iter.combinations(actors, 2))) 

print all_pairs.collect() 

""" 
    >> [('SW1', ('X', 'Y')), 
     ('SW3', ('X', 'Y')), 
     ('SW5', ('P', 'A')), 
     ('SW5', ('P', 'B')), 
     ('SW5', ('A', 'B')), 
     ('SW7', ('X', 'Y')), 
     ('SW2', ('X', 'Y')), 
     ('SW4', ('P', 'A')), 
     ('SW4', ('P', 'W')), 
     ('SW4', ('A', 'W')), 
     ('SW6', ('P', 'B'))] 
""" 

Here正在运行使用.ipynb