2016-10-27 61 views
3

的范围内有2个非常大的RDD(每个都有超过畅想记录),第一个是:比较2火花RDD确保值从第一个是在第二RDD

rdd1.txt(name,value):  
chr1 10016 
chr1 10017 
chr1 10018 
chr1 20026 
chr1 20036 
chr1 25016 
chr1 26026 
chr2 40016 
chr2 40116 
chr2 50016 
chr3 70016 

rdd2.txt(name,min,max): 
chr1  10000 20000 
chr1  20000 30000 
chr2  40000 50000 
chr2  50000 60000 
chr3  70000 80000 
chr3 810001 910000 
chr3 860001 960000 
chr3 910001 1010000 

值是有效的,只有当它在最小值和第二RDD,名称的的计数的最大值之间的范围内发生的意志加1,如果其有效

取上述作为一个例子,CHR1的发生7.

我怎么能得到火花斯卡拉的结果?

千恩万谢

+0

为什么第二个rdd中的列不是唯一的?这是否意味着我们可以在第一个rdd的价值适合第二个? – jtitusj

+0

第二个RDD定义了RDD1中值的范围 –

回答

2

尝试:

val rdd1 = sc.parallelize(Seq(
    ("chr1", 10016), ("chr1", 10017), ("chr1", 10018))) 
val rdd2 = sc.parallelize(Seq(
    ("chr1", 10000, 20000), ("chr1",20000, 30000))) 

rdd1.toDF("name", "value").join(rdd2.toDF("name", "min", "max"), Seq("name")) 
.where($"value".between($"min", $"max")) 
+0

scala> r1.toDF(“name”,“value”)。join(r2.toDF(“name”,“min”,“max”),Seq(“ ($“min”,$“max”)) java.lang.IllegalArgumentException:需求失败:列数不匹配。 旧列名称(1):_1 新列名称(2):名称,值 –

+0

如果我理解正确,您还可以'rdd1.toDF(“name”,“min”,“max”)。groupBy('名称).agg(min('min).as(“min”),max('max).as(“max”))''在加入之前让它更有效率(我敢肯定这是一个优化.. ..) – Wilmerton

+0

我会尝试它,并检查它是否更有效后,我有一个完整的答案我的问题 –

0

据我了解,你想从RDD1集在RDD2 MIN和MAX之间下降值。请看看下面的工作

val rdd1 = sc.parallelize(Seq(("chr1", 10016), ("chr1", 10017), ("chr1", 10018))) 
val rdd2 = sc.parallelize(Seq(("chr1", 10000, 20000), ("chr1",20000, 30000))) 
rdd1.toDF("name", "value").join(rdd2.toDF("name", "min", "max"), Seq("name")).where($"value".between($"min", $"max")).groupBy($"name").count().show() 


scala> val rdd1=sc.parallelize(Seq(("chr1", 10016),("chr1", 10017),("chr1", 10018),("chr1", 20026),("chr1", 20036),("chr1", 25016),("chr1", 26026),("chr2", 40016),("chr2", 40116),("chr2", 50016),("chr3", 70016))) 
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[33] at parallelize at <console>:24 

scala> val rdd2=sc.parallelize(Seq(("chr1",  10000, 20000),("chr1",  20000 , 30000),("chr2",  40000 ,50000),("chr2",  50000 ,60000),("chr3",  70000 ,80000),("chr3", 810001 ,910000),("chr3", 860001 ,960000),("chr3", 910001 ,1010000))) 
rdd2: org.apache.spark.rdd.RDD[(String, Int, Int)] = ParallelCollectionRDD[34] at parallelize at <console>:24 


scala> rdd1.toDF("name", "value").join(rdd2.toDF("name", "min", "max"), Seq("name")).where($"value".between($"min", $"max")).groupBy($"name").count().show() 
+----+-----+ 
|name|count| 
+----+-----+ 
|chr3| 1| 
|chr1| 7| 
|chr2| 3| 
+----+-----+ 

编辑 如果从文件中读取,我会用以下

import org.apache.spark.sql.SQLContext 
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}; 

val sqlContext = new SQLContext(sc) 
val nameValueSchema = StructType(Array(StructField("name", StringType, true),StructField("value", IntegerType, true))) 
val nameMinMaxSchema = StructType(Array(StructField("name", StringType, true),StructField("min", IntegerType, true),StructField("max", IntegerType, true))) 
val rdd1 = sqlContext.read.format("com.databricks.spark.csv").option("header", "false").schema(nameValueSchema).load("rdd1.csv") 
val rdd2 = sqlContext.read.format("com.databricks.spark.csv").option("header", "false").schema(nameMinMaxSchema).load("rdd2.csv") 
rdd1.toDF("name", "value").join(rdd2.toDF("name", "min", "max"), Seq("name")).where($"value".between($"min", $"max")).groupBy($"name").count().show() 

这将在所有节点上运行,也没有必要并行化呼叫。引用在这里documentation

DEF并行[T](SEQ ID NO:SEQ [T],numSlices:INT = defaultParallelism)(隐式为arg0:ClassTag [T]):RDD [T]永久 分发的本地的Scala收集以形成RDD。

+0

非常感谢完整的指令和结果是预期的,但rdd1.txt和rdd2.txt是非常非常大,如何在没有硬编码的情况下实现并行化? –

+0

不知道我是否理解没有硬编码的并行化,可以详细说明一下吗? –

+0

硬编码@WuFei是什么意思? – eliasah