2017-10-15 100 views
0

我有一个数据集,其中包含工人与他们的人口统计信息,如年龄性别,地址等及其工作地点。我从数据集创建了一个RDD并将其转换为DataFrame。计算pyspark中数据框的所有行之间的余弦相似度

每个ID有多个条目。因此,我创建了一个DataFrame,其中只包含工作人员的ID和他/她工作的各个办公地点。

|----------|----------------| 
    | **ID** **Office_Loc** | 
    |----------|----------------| 
    | 1  |Delhi, Mumbai, | 
    |   | Gandhinagar | 
    |---------------------------| 
    | 2  | Delhi, Mandi | 
    |---------------------------| 
    | 3  |Hyderbad, Jaipur| 
    ----------------------------- 

我想根据他们的办公地点计算每个工人与其他工人之间的余弦相似度。

所以,我通过数据帧的行迭代中,从数据帧中检索的单个行:

myIndex = 1 
values = (ID_place_df.rdd.zipWithIndex() 
      .filter(lambda ((l, v), i): i == myIndex) 
      .map(lambda ((l,v), i): (l, v)) 
      .collect()) 

,然后使用地图

cos_weight = ID_place_df.select("ID","office_location").rdd\ 
    .map(lambda x: get_cosine(values,x[0],x[1])) 

到计算的余弦相似性所提取的行之间和整个DataFrame。

我不认为我的方法是一个很好的方法,因为我遍历DataFrame的行,它打败了使用spark的全部目的。 在pyspark有更好的方法吗? 请提醒。

+0

我想了一会儿问题。通常最好的做法是用最简单的案例来问问你是否得到同样的问题。 – ChaosPredictor

回答

1

您可以使用mllib包来计算每行TF-IDF的L2范数。然后乘以表本身由两个L2规范,以获得余弦相似性的两个点积:

1 RDD

rdd = sc.parallelize([[1, "Delhi, Mumbai, Gandhinagar"],[2, " Delhi, Mandi"], [3, "Hyderbad, Jaipur"]]) 
  • 计算TF-IDF

    documents = rdd.map(lambda l: l[1].replace(" ", "").split(",")) 
    
    from pyspark.mllib.feature import HashingTF, IDF 
    hashingTF = HashingTF() 
    tf = hashingTF.transform(documents) 
    

Y您可以指定HashingTF中的特征数量以使特征矩阵更小(更少的列)。

tf.cache() 
    idf = IDF().fit(tf) 
    tfidf = idf.transform(tf) 
  • 计算L2规范:

    from pyspark.mllib.linalg.distributed import IndexedRowMatrix 
    mat = IndexedRowMatrix(data).toBlockMatrix() 
    dot = mat.multiply(mat.transpose()) 
    dot.toLocalMatrix().toArray() 
    
        array([[ 0.  , 0.  , 0.  , 0.  ], 
          [ 0.  , 1.  , 0.10794634, 0.  ], 
          [ 0.  , 0.10794634, 1.  , 0.  ], 
          [ 0.  , 0.  , 0.  , 1.  ]]) 
    

    OR::使用笛卡尔积和功能

    from pyspark.mllib.feature import Normalizer 
    labels = rdd.map(lambda l: l[0]) 
    features = tfidf 
    
    normalizer = Normalizer() 
    data = labels.zip(normalizer.transform(features)) 
    
  • 计算通过使基质与自身相乘的余弦相似度dot在numpy阵列:

    data.cartesian(data)\ 
        .map(lambda l: ((l[0][0], l[1][0]), l[0][1].dot(l[1][1])))\ 
        .sortByKey()\ 
        .collect() 
    
        [((1, 1), 1.0), 
        ((1, 2), 0.10794633570596117), 
        ((1, 3), 0.0), 
        ((2, 1), 0.10794633570596117), 
        ((2, 2), 1.0), 
        ((2, 3), 0.0), 
        ((3, 1), 0.0), 
        ((3, 2), 0.0), 
        ((3, 3), 1.0)] 
    

2。据帧

既然你似乎可以用dataframes,你可以使用spark ml包代替:

import pyspark.sql.functions as psf 
df = rdd.toDF(["ID", "Office_Loc"])\ 
    .withColumn("Office_Loc", psf.split(psf.regexp_replace("Office_Loc", " ", ""), ',')) 
  • 计算TF-IDF:

    from pyspark.ml.feature import HashingTF, IDF 
    hashingTF = HashingTF(inputCol="Office_Loc", outputCol="tf") 
    tf = hashingTF.transform(df) 
    
    idf = IDF(inputCol="tf", outputCol="feature").fit(tf) 
    tfidf = idf.transform(tf) 
    
  • 计算L2规范:

    from pyspark.ml.feature import Normalizer 
    normalizer = Normalizer(inputCol="feature", outputCol="norm") 
    data = normalizer.transform(tfidf) 
    
  • 计算矩阵乘积:

    from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix 
    mat = IndexedRowMatrix(
        data.select("ID", "norm")\ 
         .rdd.map(lambda row: IndexedRow(row.ID, row.norm.toArray()))).toBlockMatrix() 
    dot = mat.multiply(mat.transpose()) 
    dot.toLocalMatrix().toArray() 
    

    OR:使用连接和用于功能dot一个UDF

    dot_udf = psf.udf(lambda x,y: float(x.dot(y)), DoubleType()) 
    data.alias("i").join(data.alias("j"), psf.col("i.ID") < psf.col("j.ID"))\ 
        .select(
         psf.col("i.ID").alias("i"), 
         psf.col("j.ID").alias("j"), 
         dot_udf("i.norm", "j.norm").alias("dot"))\ 
        .sort("i", "j")\ 
        .show() 
    
        +---+---+-------------------+ 
        | i| j|    dot| 
        +---+---+-------------------+ 
        | 1| 2|0.10794633570596117| 
        | 1| 3|    0.0| 
        | 2| 3|    0.0| 
        +---+---+-------------------+ 
    

本教程列出不同的方法来繁殖大型矩阵: https://labs.yodas.com/large-scale-matrix-multiplication-with-pyspark-or-how-to-match-two-large-datasets-of-company-1be4b1b2871e

+0

谢谢你的回答。我非常感谢帮助。但是代码给了我一个错误'要求失败:输入列必须是ArrayType,但是得到了StringType.''。在使用数据帧时进行hashingTF转换期间。 –

+0

您必须首先将字符串列表拆分为单词列表。我添加了关于如何创建'df' – MaFF

+0

的部分嗨,它在我使用'data.cartesian(data)\ .map(lambda l:((l [0] [0],l [1] [0 ]),l [0] [1] .dot(l [1] [1])))\ .sortByKey()\ 。take(5)'。但是当我使用mllib代码并将blockMatrix转换为LocalMatrix时,它给了'u'requirement失败:值数组的长度必须小于Int.MaxValue。目前numRows * numCols:1006095879729669481''我不明白,因为我正在采取一小部分数据(约10个ID),所以numRows * numCols:100。 –