让我们开始与一些进口和变量,将需要的下游加工:
import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import scala.util.Random
import org.apache.spark.HashPartitioner
val nPartitions: Integer = ???
val n: Long = ???
val p: Double = ???
下一步我们需要可以用来生成边缘种子ID的RDD。处理这种幼稚的方法是简单地是这样的:
sc.parallelize(0L to n)
由于产生的边缘的数量取决于节点ID这种方法会给出一个极不平衡的负载。我们可以做的更好一点与重新分区:
sc.parallelize(0L to n)
.map((_, None))
.partitionBy(new HashPartitioner(nPartitions))
.keys
,但更好的方法是先从空RDD和生成到位的ID。我们需要一个小帮手:
def genNodeIds(nPartitions: Int, n: Long)(i: Int) = {
(0L until n).filter(_ % nPartitions == i).toIterator
}
可以使用如下:
val empty = sc.parallelize(Seq.empty[Int], nPartitions)
val ids = empty.mapPartitionsWithIndex((i, _) => genNodeIds(nPartitions, n)(i))
只是快速完整性检查(这是相当昂贵,因此在生产中不使用它):
require(ids.distinct.count == n)
,我们可以使用另一个帮手产生实际的边缘:
def genEdgesForId(p: Double, n: Long, random: Random)(i: Long) = {
(i + 1 until n).filter(_ => random.nextDouble < p).map(j => Edge(i, j,()))
}
def genEdgesForPartition(iter: Iterator[Long]) = {
// It could be an overkill but better safe than sorry
// Depending on your requirement it could worth to
// consider using commons-math
// https://commons.apache.org/proper/commons-math/userguide/random.html
val random = new Random(new java.security.SecureRandom())
iter.flatMap(genEdgesForId(p, n, random))
}
val edges = ids.mapPartitions(genEdgesForPartition)
最后,我们可以创建一个图表:
val graph = Graph.fromEdges(edges,())
我不知道MPI什么,但是从你的描述,我可以告诉你想你的问题的办法是太“低级别”。在Spark中,您不用担心哪个执行程序正在存储哪个数组。只需创建RDD,Spark将自动处理数据的分发和处理。我还建议你阅读'GraphX'的文档,因为需要以某种方式定义Vertices和Edges以用于'GraphX'。 –
感谢您的建议。我试图并行实现一个图形发生器。发生器必须以这种方式创建边缘,以使计算负载得到很好的平衡。 – max