2017-05-05 142 views
1

我想找到一种有效的方法来使用数据框在PySpark中创建备用矢量。稀疏矢量pyspark

比方说,考虑到交易输入:

df = spark.createDataFrame([ 
    (0, "a"), 
    (1, "a"), 
    (1, "b"), 
    (1, "c"), 
    (2, "a"), 
    (2, "b"), 
    (2, "b"), 
    (2, "b"), 
    (2, "c"), 
    (0, "a"), 
    (1, "b"), 
    (1, "b"), 
    (2, "cc"), 
    (3, "a"), 
    (4, "a"), 
    (5, "c") 
], ["id", "category"]) 
+---+--------+ 
| id|category| 
+---+--------+ 
| 0|  a| 
| 1|  a| 
| 1|  b| 
| 1|  c| 
| 2|  a| 
| 2|  b| 
| 2|  b| 
| 2|  b| 
| 2|  c| 
| 0|  a| 
| 1|  b| 
| 1|  b| 
| 2|  cc| 
| 3|  a| 
| 4|  a| 
| 5|  c| 
+---+--------+ 

在总结格式:

df.groupBy(df["id"],df["category"]).count().show() 
+---+--------+-----+ 
| id|category|count| 
+---+--------+-----+ 
| 1|  b| 3| 
| 1|  a| 1| 
| 1|  c| 1| 
| 2|  cc| 1| 
| 2|  c| 1| 
| 2|  a| 1| 
| 1|  a| 1| 
| 0|  a| 2| 
+---+--------+-----+ 

我的目标是通过ID来获得这个输出:

+---+-----------------------------------------------+ 
| id|          feature | 
+---+-----------------------------------------------+ 
| 2|SparseVector({a: 1.0, b: 3.0, c: 1.0, cc: 1.0})| 

请您指点我正确的方向?在Java中使用mapreduce对我来说似乎更容易一些。

回答

4

这可以用pivotVectorAssembler很容易地来完成。其结果是,如下所示

from pyspark.ml.feature import VectorAssembler 

input_cols = [x for x in pivoted.columns if x != id] 

result = (VectorAssembler(inputCols=input_cols, outputCol="features") 
    .transform(pivoted) 
    .select("id", "features")) 

:与pivot替换聚合:

pivoted = df.groupBy("id").pivot("category").count().na.fill(0) 

和组装。这将取决于稀疏选择更高效的表示:

+---+---------------------+ 
|id |features    | 
+---+---------------------+ 
|0 |(5,[1],[2.0])  | 
|5 |(5,[0,3],[5.0,1.0]) | 
|1 |[1.0,1.0,3.0,1.0,0.0]| 
|3 |(5,[0,1],[3.0,1.0]) | 
|2 |[2.0,1.0,3.0,1.0,1.0]| 
|4 |(5,[0,1],[4.0,1.0]) | 
+---+---------------------+ 

当然你仍然可以将其转换为单一的表示:

from pyspark.ml.linalg import SparseVector, VectorUDT 
import numpy as np 

def to_sparse(c): 
    def to_sparse_(v): 
     if isinstance(v, SparseVector): 
      return v 
     vs = v.toArray() 
     nonzero = np.nonzero(vs)[0] 
     return SparseVector(v.size, nonzero, vs[nonzero]) 
    return udf(to_sparse_, VectorUDT())(c) 
+---+-------------------------------------+ 
|id |features        | 
+---+-------------------------------------+ 
|0 |(5,[1],[2.0])      | 
|5 |(5,[0,3],[5.0,1.0])     | 
|1 |(5,[0,1,2,3],[1.0,1.0,3.0,1.0])  | 
|3 |(5,[0,1],[3.0,1.0])     | 
|2 |(5,[0,1,2,3,4],[2.0,1.0,3.0,1.0,1.0])| 
|4 |(5,[0,1],[4.0,1.0])     | 
+---+-------------------------------------+ 
2

如果您将数据框转换为RDD,则可以使用类似mapreduce的框架reduceByKey。这里唯一的真正棘手的部分是格式化的日期为火花的斯帕塞夫克托

导入包,创建数据

from pyspark.ml.feature import StringIndexer 
from pyspark.ml.linalg import Vectors 
df = sqlContext.createDataFrame([ 
    (0, "a"), 
    (1, "a"), 
    (1, "b"), 
    (1, "c"), 
    (2, "a"), 
    (2, "b"), 
    (2, "b"), 
    (2, "b"), 
    (2, "c"), 
    (0, "a"), 
    (1, "b"), 
    (1, "b"), 
    (2, "cc"), 
    (3, "a"), 
    (4, "a"), 
    (5, "c") 
], ["id", "category"]) 

创建类的数字表示(需要稀疏向量)

indexer = StringIndexer(inputCol="category", outputCol="categoryIndex") 
df = indexer.fit(df).transform(df) 

集团通过索引获得计数

df = df.groupBy(df["id"],df["categoryIndex"]).count() 

转换为rdd,映射数据以ID &的键值对[categoryIndex,计数]

rdd = df.rdd.map(lambda x: (x.id, [(x.categoryIndex, x['count'])])) 

者皆减少获得所有的ID &列表的键值对[categoryIndex,计数]该ID

rdd = rdd.reduceByKey(lambda a, b: a + b) 

地图中的数据转换成所有的名单[categoryIndex,计数]每个ID到稀疏矢量

rdd = rdd.map(lambda x: (x[0], Vectors.sparse(len(x[1]), x[1]))) 

重新转换为数据帧 finalDf = sqlContext.createDataFrame(RDD,[ 'ID', '特征'])

数据检查

finalDf.take(5) 

[Row(id=0, feature=SparseVector(1, {1: 2.0})), 
    Row(id=1, feature=SparseVector(3, {0: 3.0, 1: 1.0, 2: 1.0})), 
    Row(id=2, feature=SparseVector(4, {0: 3.0, 1: 1.0, 2: 1.0, 3: 1.0})), 
    Row(id=3, feature=SparseVector(1, {1: 1.0})), 
    Row(id=4, feature=SparseVector(1, {1: 1.0}))] 
相关问题