2015-12-15 36 views
2

我正在从基于MPI的系统转向Apache Spark。我需要在Spark中执行以下操作。在Spark中为每个Executor创建数组并将其合并为RDD

假设,我有n顶点。我想从这些n顶点创建一个边缘列表。边只是两个整数(u,v)的元组,不需要属性。

但是,我想在每个执行器中独立地并行创建它们。因此,我想要为P Spark Executors独立创建P边阵列。每个数组可能具有不同的大小,并且取决于顶点,因此,我还需要执行者ID从0n-1。接下来,我想要一个全局RDD阵列的边缘。

在MPI中,我会在每个使用处理器级别的处理器中创建一个数组。我如何在Spark中做到这一点,特别是使用GraphX库?

因此,我的主要目标是在每个执行器中创建一个边数组,并将它们合并为一个RDD。

我首先尝试了鄂尔多斯 - 仁义模型的一个修改版本。作为参数,我只有节点数n和概率p。

假设,执行者i必须处理从101200的节点。对于任何节点而言,节点101,它将以概率p创建从101102 -- n的边。在每个执行器创建分配的边后,我将实例化GraphX EdgeRDDVertexRDD。因此,我的计划是在每个执行器中独立创建边界列表,并将它们合并到RDD中。

+1

我不知道MPI什么,但是从你的描述,我可以告诉你想你的问题的办法是太“低级别”。在Spark中,您不用担心哪个执行程序正在存储哪个数组。只需创建RDD,Spark将自动处理数据的分发和处理。我还建议你阅读'GraphX'的文档,因为需要以某种方式定义Vertices和Edges以用于'GraphX'。 –

+0

感谢您的建议。我试图并行实现一个图形发生器。发生器必须以这种方式创建边缘,以使计算负载得到很好的平衡。 – max

回答

4

让我们开始与一些进口和变量,将需要的下游加工:

import org.apache.spark._ 
import org.apache.spark.graphx._ 
import org.apache.spark.rdd.RDD 
import scala.util.Random 
import org.apache.spark.HashPartitioner 

val nPartitions: Integer = ??? 
val n: Long = ??? 
val p: Double = ??? 

下一步我们需要可以用来生成边缘种子ID的RDD。处理这种幼稚的方法是简单地是这样的:

sc.parallelize(0L to n) 

由于产生的边缘的数量取决于节点ID这种方法会给出一个极不平衡的负载。我们可以做的更好一点与重新分区:

sc.parallelize(0L to n) 
    .map((_, None)) 
    .partitionBy(new HashPartitioner(nPartitions)) 
    .keys 

,但更好的方法是先从空RDD和生成到位的ID。我们需要一个小帮手:

def genNodeIds(nPartitions: Int, n: Long)(i: Int) = { 
    (0L until n).filter(_ % nPartitions == i).toIterator 
} 

可以使用如下:

val empty = sc.parallelize(Seq.empty[Int], nPartitions) 
val ids = empty.mapPartitionsWithIndex((i, _) => genNodeIds(nPartitions, n)(i)) 

只是快速完整性检查(这是相当昂贵,因此在生产中不使用它):

require(ids.distinct.count == n) 

,我们可以使用另一个帮手产生实际的边缘:

def genEdgesForId(p: Double, n: Long, random: Random)(i: Long) = { 
    (i + 1 until n).filter(_ => random.nextDouble < p).map(j => Edge(i, j,())) 
} 

def genEdgesForPartition(iter: Iterator[Long]) = { 
    // It could be an overkill but better safe than sorry 
    // Depending on your requirement it could worth to 
    // consider using commons-math 
    // https://commons.apache.org/proper/commons-math/userguide/random.html 
    val random = new Random(new java.security.SecureRandom()) 
    iter.flatMap(genEdgesForId(p, n, random)) 
} 

val edges = ids.mapPartitions(genEdgesForPartition) 

最后,我们可以创建一个图表:

val graph = Graph.fromEdges(edges,()) 
+0

非常感谢,这是非常彻底的指导。这对我对Spark的理解也很有帮助。 – max

+1

我简化了这一点。它还应该解决您在其他问题中提到的问题。 – zero323

相关问题