2016-03-14 46 views
2

我想转换边列表是按以下格式转换二分图以邻接矩阵火花斯卡拉

data = [('a', 'developer'), 
    ('b', 'tester'), 
    ('b', 'developer'), 
    ('c','developer'), 
    ('c', 'architect')] 

在邻接矩阵将在

 developer  tester architect 
a  1   0   0 
b  1   1   0 
c  1   0   1 

我形式想要以下列格式存储矩阵:

1 0 0 
1 1 0 
1 0 1 

我试过用GraphX

def pageHash(title:String) = title.toLowerCase.replace(" ","").hashCode.toLong 


val edges: RDD[Edge[String]] = sc.textFile("/user/query.csv").map { line => 
    val row = line.split(",") 
    Edge(pageHash(row(0)), pageHash(row(1)), "1") 
} 
val graph: Graph[Int, String] = Graph.fromEdges(edges, defaultValue = 1) 

我能够创建图表但无法转换为相邻矩阵表示。

回答

1

的一种可行方式方法是什么这样的:

  1. 转换RDDDataFrame

    val rdd = sc.parallelize(Seq(
        ("a", "developer"), ("b", "tester"), ("b", "developer"), 
        ("c","developer"), ("c", "architect"))) 
    
    val df = rdd.toDF("row", "col") 
    
  2. 指数列:

    import org.apache.spark.ml.feature.StringIndexer 
    
    val indexers = Seq("row", "col").map(x => 
        new StringIndexer().setInputCol(x).setOutputCol(s"${x}_idx").fit(df) 
    ) 
    
  3. 变换数据并创建RDD[MatrixEntry]

    import org.apache.spark.functions.lit 
    import org.apache.spark.mllib.linalg.distributed.{MatrixEntry, CoordinateMatrix} 
    
    
    val entries = indexers.foldLeft(df)((df, idx) => idx.transform(df)) 
        .select($"row_idx", $"col_idx", lit(1.0)) 
        .as[MatrixEntry] // Spark 1.6. For < 1.5 map manually 
        .rdd 
    
  4. 创建矩阵

    new CoordinateMatrix(entries) 
    

该矩阵可以进一步转化为任何其它类型的分布式矩阵的包括RowMatrixIndexedRowMatrix

+0

我为spark 1.4手动映射,但邻接矩阵的排序是不同的,第一行和第三行是互换的,任何指针都是这个。 –

+0

它似乎不适用于spark 1.6.2。错误:'org.apache.spark.sql.AnalysisException:无法解析给定输入列的'i':[row_idx,col_idx,1]'。 –