是否可以结合GraphX和DataFrames?我希望Graph中的每个节点都有自己的DataFrame。我知道GraphX和DataFrame扩展了RDD,并且嵌套的RDD不可能,而SparkContext不是可序列化的。但在Spark 2.0.0中,我看到SparkSession是可序列化的。我试过了,但仍然无法使用。 我也尝试将DataFrame全局存储在数组中。但我无法访问工作节点中的数组。忽略sendMsg和merge方法:火花结合DataFrames和GraphX
object Main{
def main(args: Array[String]) : Unit = {
val spark = SparkSession
.builder
.appName("ScalaGraphX_SQL")
.master("spark://home:7077")
.enableHiveSupport()
.getOrCreate()
val sc = spark.sparkContext
val node_pair : RDD[(Array[String],Long)] = sc.textFile(args(0)).map(l=>l.split(" ")).zipWithIndex()
//set array size
Tables.tables = new Array[Dataset[Row]](node_pair.count().toInt)
//insert dataframe inside array tables
node_pair.collect().foreach{ case (arr,l) => {
val fields = arr.takeRight(arr.length-2).map(fieldName => StructField(fieldName, BooleanType, nullable = true))
val schema = StructType(fields)
val rows = new util.ArrayList[Row]
Tables.tables{l.toInt} = spark.createDataFrame(rows, schema)
//val f =
}
}
//create vertices
val vertices : RDD[(VertexId,TreeNode)]= node_pair.map{ case (arr,l) => {
(l,new TreeNode(l,false))
}
}
//create edges
val edges : RDD[Edge[Boolean]] = node_pair
.filter{ case (arr,l) => arr(0).toLong != -1}
.map{ case (arr,l) => Edge(l,arr(0).toLong,true)
}
var init_node : TreeNode = new TreeNode(-1,false)
val graph = Graph(vertices,edges,init_node)
val graph_pregel = Pregel(graph,init_node,Int.MaxValue,EdgeDirection.Out)(vProg,sendMsg,merge)
graph_pregel.vertices.collect().foreach(v => println(v._2.index))
}
def vProg(id:VertexId, act: TreeNode, other: TreeNode): TreeNode = {
println(Tables.tables{act.index.toInt})
act
}
def sendMsg(et : EdgeTriplet[TreeNode,Boolean]) : Iterator[(VertexId, TreeNode)] = {
if(et.srcAttr.v){
println(et.srcId + "--->" + et.dstId)
Iterator((et.dstId,et.srcAttr))
}else{
//println(et.srcId + "-/->" + et.dstId)
Iterator.empty
}
}
def merge(n1:TreeNode, n2:TreeNode): TreeNode = {
n1
}
}
object Tables extends Serializable{
var tables : scala.Array[Dataset[Row]] = null
}
class TreeNode(val index:Long, var v: Boolean) extends Serializable {
}
也许有可能使用RDD访问全局数组?或者有人有这个问题的其他解决方案?
问题不是,也从来没有序列化过。不可串行化仅仅是一个提示,它指出了Spark体系结构不适合嵌套处理而不明显限制编程模型的主要问题。所以,只是因为你可以序列化'SparkSession'(你可以用同样的方式在1.x中序列化'SQLContext'),这并不意味着任何改变。 – zero323