8

我正试图使用​​Spark's GraphX库实现topological sort在Spark GraphX中实现拓扑排序

这是我到目前为止已经编写的代码:

MyObject.scala

import java.util.ArrayList 

import scala.collection.mutable.Queue 

import org.apache.spark.SparkConf 
import org.apache.spark.SparkContext 
import org.apache.spark.graphx.Edge 
import org.apache.spark.graphx.EdgeDirection 
import org.apache.spark.graphx.Graph 
import org.apache.spark.graphx.Graph.graphToGraphOps 
import org.apache.spark.graphx.VertexId 
import org.apache.spark.rdd.RDD 
import org.apache.spark.rdd.RDD.rddToPairRDDFunctions 

object MyObject { 

    def main(args: Array[String]): Unit = { 
    val conf = new SparkConf().setAppName("Spark-App").setMaster("local[2]") 
    val sc = new SparkContext(conf) 

    val resources: RDD[Resource] = makeResources(sc) 
    val relations: RDD[Relation] = makeRelations(sc) 

    println("Building graph ...") 
    var graph = buildGraph(resources, relations, sc) 
    println("Graph built!!") 

    println("Testing topo sort ...") 
    val topoSortResult = topoSort(graph, sc); 
    println("topoSortResult = " + topoSortResult) 
    println("Testing topo sort done!") 
    } 

    def buildGraph(resources: RDD[Resource], relations: RDD[Relation], sc: SparkContext): Graph[Resource, Relation] = 
    { 
     val vertices: RDD[(Long, Resource)] = resources.map(resource => (resource.id, resource)) 
     val edges: RDD[Edge[Relation]] = relations.map(relation => Edge(relation.srcId, relation.dstId, relation)) 
     var graph = Graph[Resource, Relation](vertices, edges) 
     graph 
    } 

    def makeResources(sc: SparkContext): RDD[Resource] = 
    { 
     var list: List[Resource] = List() 
     list = list :+ new Resource(1L) 
     list = list :+ new Resource(2L) 
     list = list :+ new Resource(3L) 
     list = list :+ new Resource(4L) 
     list = list :+ new Resource(5L) 
     sc.parallelize(list) 
    } 

    def makeRelations(sc: SparkContext): RDD[Relation] = 
    { 
     var list: List[Relation] = List() 
     list = list :+ new Relation(1L, "depends_on", 2L) 
     list = list :+ new Relation(3L, "depends_on", 2L) 
     list = list :+ new Relation(4L, "depends_on", 2L) 
     list = list :+ new Relation(5L, "depends_on", 2L) 
     sc.parallelize(list) 

    } 

    def topoSort(graph: Graph[Resource, Relation], sc: SparkContext): java.util.List[(VertexId, Resource)] = 
    { 
     // Will contain the result 
     val sortedResources: java.util.List[(VertexId, Resource)] = new ArrayList() 

     // Contains all the vertices 
     val vertices = graph.vertices 

     // Contains all the vertices whose in-degree > 0 
     val inDegrees = graph.inDegrees; 
     val inDegreesKeys_array = inDegrees.keys.collect(); 

     // Contains all the vertices whose in-degree == 0 
     val inDegreeZeroList = vertices.filter(vertex => !inDegreesKeys_array.contains(vertex._1)) 

     // A map of vertexID vs its in-degree 
     val inDegreeMapRDD = inDegreeZeroList.map(vertex => (vertex._1, 0)).union(inDegrees); 

     // Insert all the resources whose in-degree == 0 into a queue 
     val queue = new Queue[(VertexId, Resource)] 
     for (vertex <- inDegreeZeroList.toLocalIterator) { queue.enqueue(vertex) } 

     // Get an RDD containing the outgoing edges of every vertex 
     val neighbours = graph.collectNeighbors(EdgeDirection.Out) 

     // Initiate the algorithm 
     while (!queue.isEmpty) { 
     val vertex_top = queue.dequeue() 
     // Add the topmost element of the queue to the result 
     sortedResources.add(vertex_top) 

     // Get the neigbours (from outgoing edges) of this vertex 
     // This will be an RDD containing just 1 element which will be an array of neighbour vertices 
     val vertex_neighbours = neighbours.filter(vertex => vertex._1.equals(vertex_top._1)) 

     // For each vertex, decrease its in-degree by 1 
     vertex_neighbours.foreach(arr => { 
      val neighbour_array = arr._2 
      neighbour_array.foreach(vertex => { 
      val oldInDegree = inDegreeMapRDD.filter(vertex_iter => (vertex_iter._1 == vertex._1)).first()._2 
      val newInDegree = oldInDegree - 1 
      // Reflect the new in-degree in the in-degree map RDD 
      inDegreeMapRDD.map(vertex_iter => { 
       if (vertex_iter._1 == vertex._1) { 
       (vertex._1, newInDegree) 
       } 
       else{ 
       vertex_iter 
       } 
      }); 
      // Add this vertex to the result if its in-degree has become zero 
      if (newInDegree == 0) { 
       queue.enqueue(vertex) 
      } 
      }) 
     }) 
     } 

     return sortedResources 
    } 

} 

Resource.scala

class Resource(val id: Long) extends Serializable { 
    override def toString(): String = { 
    "id = " + id 
    } 
} 

Relation.scala

class Relation(val srcId: Long, val name: String, val dstId: Long) extends Serializable { 
    override def toString(): String = { 
    srcId + " " + name + " " + dstId 
    } 
} 

我收到错误:

org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063. 

为线val oldInDegree = inDegreeMapRDD.filter(vertex_iter => (vertex_iter._1 == vertex._1)).first()._2

我想这是因为修改某个其他RDD的for-each循环内的RDD是非法的。

此外,我担心queue.enqueue(vertex)将无法​​正常工作,因为it is not possible to modify a local collection inside a for-each loop

如何正确实现这种拓扑排序算法?

异常的完整堆栈跟踪被上传here(已有从外部将其上传到防止超过的StackOverflow的身体尺寸限制)

回答

0
vertex_neighbours.foreach(arr => { 
     val neighbour_array = arr._2 
     neighbour_array.foreach(vertex => { 
     . . . 

外部foreach可以替换为for循环。

+0

您能否以您提到的更改显示代码的最终工作版本? –

0
val vertex_neighbours = neighbours.filter(vertex => vertex._1.equals(vertex_top._1)).collect() 

您需要在做循环之前获取RDD。