2015-10-15 31 views
1

我至今是:打开RDD到广播词典查找

lookup = sc.textFile("/user/myuser/lookup.asv") 
lookup.map(lambda r: r.split(chr(1))) 

而且现在我有一个RDD看起来像

[ 
    [filename1, category1], 
    [filename2, category2], 
    ... 
    [filenamen, categoryn] 
] 

我怎样才能把这一RDD成广播字典一样:

{filename1: category1, filename2: category2, ...} 

这是我试过,但没有工作:

>>> broadcastVar = sc.broadcast({}) 
>>> data = sc.parallelize([[1,1], [2,2], [3,3], [4,4]]) 
>>> def myfunc(x): 
...  broadcastVar[str(x[0])] = x[1] 
... 
>>> result = data.map(myfunc) 
>>> broadcastVar 
<pyspark.broadcast.Broadcast object at 0x7f776555e710> 
>>> broadcastVar.value 
{} 
>>> result.collect() 
... 
ERROR: TypeError: 'Broadcast' object does not support item assignment 
... 
>>> broadcastVar.value 
{} 

有关为什么我建立这个巨大的查找变量的更多信息,请阅读本:

这是本one的后续问题。

我有两个表,其中

表1:其中各列包含该像素信息和第一列中的非常宽(25K列和150K行)表是输入图象文件的文件名。

表2:TSV(制表符分隔文件)文件,有300万行,每行包含图像文件名称和图像的产品类别。

在SQL中,我需要在文件名的这两个表上做一个内部连接,这样我就可以为图像数据添加标签,以便稍后进行机器学习。

在任何类型的SQL中执行它是不现实的,因为您必须为table1创建一个具有25K列的表,而create table语法将会很荒谬。

然后我想创建一个使用table2的查找变量,也许使它成为一个广播变量,其中的关键是文件名,值是产品类别。

回答

0

广播变量对工作人员是只读的。 Spark提供了只写的累加器,但是这些专用于计数器等。在这里,你可以简单地收集并创建一个Python字典:

lookup_bd = sc.broadcast({ 
    k: v for (k, v) in lookup.map(lambda r: r.split(chr(1))).collect() 
}) 

,因为你必须创建一个表table1的具有25K列,创建表的语法是不现实做到在任何类型的SQL的将是可笑的漫长。

创建不应该是一个问题。只要你知道的名字您可以轻松创建表像这样编程:

from pyspark.sql import Row 

colnames = ["x{0}".format(i) for i in range(25000)] # Replace with actual names 

df = sc.parallelize([ 
    row(*[randint(0, 100) for _ in range(25000)]) for x in range(10) 
]).toDF() 

## len(df.columns) 
## 25000 

这里有一个问题,即使你使用纯RDDS这是更严重的。一般来说,非常宽的行很难在任何行式格式中处理。

你可以做的一件事是使用像SparseVectorSparseMatrix这样的稀疏表示。另一个例子是使用RLE编码像素信息。