2014-07-01 73 views
9

我想用pyspark解决以下问题。 我有一个hdfs格式的文件,它是查找表的转储。在pyspark中创建一个大字典

key1, value1 
key2, value2 
... 

我想加载到python字典中的pyspark并将其用于其他目的。所以我试图做到:

table = {} 
def populateDict(line): 
    (k,v) = line.split(",", 1) 
    table[k] = v 

kvfile = sc.textFile("pathtofile") 
kvfile.foreach(populateDict) 

我发现表变量没有被修改。那么,有没有办法在spark中创建一个大的内存哈希表?

回答

4

foreach是一个分布式计算,所以你不能指望它修改只在驱动程序中可见的数据结构。你想要的是。

kv.map(line => { line.split(" ") match { 
    case Array(k,v) => (k,v) 
    case _ => ("","") 
}.collectAsMap() 

这是Scala,但你的想法,最重要的功能是collectAsMap()返回地图给司机。

如果您的数据非常大,您可以使用PairRDD作为地图。第一地图中对

kv.map(line => { line.split(" ") match { 
     case Array(k,v) => (k,v) 
     case _ => ("","") 
    } 

那么你就可以rdd.lookup("key")返回与该键关联的值的顺序访问,虽然这肯定不会像其他分布式KV存储一样高效,火花是不是真的为建那。

+0

很感谢。这是否意味着地图必须适合驱动程序的内存?还是它仍然分布? – Kamal

+0

@Kamal是啊它必须适应内存。 U可以使用pair rdd作为查找表。也想到一个可积累的解决方案,将很快发布 – aaronman

+0

好吧。我正在寻找火花中的分布式地图。看起来不可能! – Kamal

1

为了提高效率,请参见:sortByKey() and lookup()

查找(键):

返回值的列表中RDD的关键钥匙。如果RDD具有已知的分区程序,则只需搜索该键映射到的分区即可高效地执行此操作。

的RDD将由sortByKey()(see: OrderedRDD)重新分配,并且在lookup()呼叫高效地搜索。在代码中,类似的,

kvfile = sc.textFile("pathtofile") 
sorted_kv = kvfile.flatMap(lambda x: x.split("," , 1)).sortByKey() 

sorted_kv.lookup('key1').take(10) 

将作为一个RDD和有效的伎俩。