2016-09-19 81 views
1

是否可以结合GraphX和DataFrames?我希望Graph中的每个节点都有自己的DataFrame。我知道GraphX和DataFrame扩展了RDD,并且嵌套的RDD不可能,而SparkContext不是可序列化的。但在Spark 2.0.0中,我看到SparkSession是可序列化的。我试过了,但仍然无法使用。 我也尝试将DataFrame全局存储在数组中。但我无法访问工作节点中的数组。忽略sendMsg和merge方法:火花结合DataFrames和GraphX

object Main{ 
    def main(args: Array[String]) : Unit = {  
    val spark = SparkSession 
     .builder 
     .appName("ScalaGraphX_SQL") 
     .master("spark://home:7077") 
     .enableHiveSupport() 
     .getOrCreate() 

    val sc = spark.sparkContext 

    val node_pair : RDD[(Array[String],Long)] = sc.textFile(args(0)).map(l=>l.split(" ")).zipWithIndex() 

    //set array size 
    Tables.tables = new Array[Dataset[Row]](node_pair.count().toInt) 

    //insert dataframe inside array tables 
    node_pair.collect().foreach{ case (arr,l) => { 
     val fields = arr.takeRight(arr.length-2).map(fieldName => StructField(fieldName, BooleanType, nullable = true)) 
     val schema = StructType(fields) 
     val rows = new util.ArrayList[Row] 
     Tables.tables{l.toInt} = spark.createDataFrame(rows, schema) 
     //val f = 
     } 
    } 

    //create vertices 
    val vertices : RDD[(VertexId,TreeNode)]= node_pair.map{ case (arr,l) => { 
     (l,new TreeNode(l,false)) 
    } 
    } 

    //create edges 
    val edges : RDD[Edge[Boolean]] = node_pair 
     .filter{ case (arr,l) => arr(0).toLong != -1} 
     .map{ case (arr,l) => Edge(l,arr(0).toLong,true) 
     } 

    var init_node : TreeNode = new TreeNode(-1,false) 
    val graph = Graph(vertices,edges,init_node) 
    val graph_pregel = Pregel(graph,init_node,Int.MaxValue,EdgeDirection.Out)(vProg,sendMsg,merge) 

    graph_pregel.vertices.collect().foreach(v => println(v._2.index)) 
    } 

    def vProg(id:VertexId, act: TreeNode, other: TreeNode): TreeNode = { 
    println(Tables.tables{act.index.toInt}) 
    act 
    } 

    def sendMsg(et : EdgeTriplet[TreeNode,Boolean]) : Iterator[(VertexId, TreeNode)] = { 

    if(et.srcAttr.v){ 
     println(et.srcId + "--->" + et.dstId) 
     Iterator((et.dstId,et.srcAttr)) 
    }else{ 
     //println(et.srcId + "-/->" + et.dstId) 
     Iterator.empty 
    } 
    } 

    def merge(n1:TreeNode, n2:TreeNode): TreeNode = { 
    n1 
    } 
} 

object Tables extends Serializable{ 
    var tables : scala.Array[Dataset[Row]] = null 
} 

class TreeNode(val index:Long, var v: Boolean) extends Serializable { 
} 

也许有可能使用RDD访问全局数组?或者有人有这个问题的其他解决方案?

+0

问题不是,也从来没有序列化过。不可串行化仅仅是一个提示,它指出了Spark体系结构不适合嵌套处理而不明显限制编程模型的主要问题。所以,只是因为你可以序列化'SparkSession'(你可以用同样的方式在1.x中序列化'SQLContext'),这并不意味着任何改变。 – zero323

回答

1

请看看GraphFrames - 这是一个为GraphX提供DataFrame API的软件包。一旦GraphFrames提供诸如在GraphX中重要的分区功能以及更详尽地测试API时,GraphFrames将被考虑包含到Spark中。

对于在下面的评论中描述的问题,你有一个数据帧与节点,即机场:

val airports = sqlContext.createDataFrame(List(
    ("A1", "Wrocław"), 
    ("A2", "London"), 
    ("A3", "NYC") 
)).toDF("id", "name") 

ID是唯一的。您可以创建其他DataFrame,即detailsDF,其结构如下: ID | AirPortID | other data。然后你有一对多和一个机场(所以GraphFrame verticle),你在detailsDF中有很多条目。现在您可以查询: spark.sql("select a.name, d.id as detailID from airports a join detailsDF d on a.id = d.airportID");。如果你想在那里存储更多的信息,你也可以在机场数据框中有很多列

+0

谢谢,但没有GraphFrames,结构为DataFrames的图?我需要一个Graph和DataFrames在节点内。这就像每个节点的表格。或者我误解了GraphFrames? –

+0

是的,而不是:)在GraphFrames中,每个节点都有一个表格(DataFrame)。然而,这个节点可以有一些ID,然后可以有另一个DataFrame,即NodeDetails将有列“baseNodeId”。那么你可以有一个节点的许多行 –

+0

对不起,但我不明白。那不是嵌套的DataFrame?你能举个简单的例子吗?非常感谢你! –