2016-12-29 34 views
0

我正在做一些基本的使用scala的spark。为什么计数函数不适用于Spark中的mapvalues?

我想知道为什么计数功能不与mapValues地图功能

工作当我申请总和,最小值,最大值然后它的作品。也有没有什么地方我可以参考groupbykeyRDD中可应用于Iterable [String]上的所有适用函数?

mycode的:

scala> val records = List("CHN|2", "CHN|3" , "BNG|2","BNG|65") 
records: List[String] = List(CHN|2, CHN|3, BNG|2, BNG|65) 

scala> val recordsRDD = sc.parallelize(records) 
recordsRDD: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[119] at parallelize at <console>:23 

scala> val mapRDD = recordsRDD.map(elem => elem.split("\\|")) 
mapRDD: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[120] at map at <console>:25 

scala> val keyvalueRDD = mapRDD.map(elem => (elem(0),elem(1))) 
keyvalueRDD: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[121] at map at <console>:27 

scala> val groupbykeyRDD = keyvalueRDD.groupByKey() 
groupbykeyRDD: org.apache.spark.rdd.RDD[(String, Iterable[String])] = ShuffledRDD[122] at groupByKey at <console>:29 

scala> groupbykeyRDD.mapValues(elem => elem.count).collect 
<console>:32: error: missing arguments for method count in trait TraversableOnce; 
follow this method with `_' if you want to treat it as a partially applied function 
      groupbykeyRDD.mapValues(elem => elem.count).collect 
              ^

scala> groupbykeyRDD.map(elem => (elem._1 ,elem._2.count)).collect 
<console>:32: error: missing arguments for method count in trait TraversableOnce; 
follow this method with `_' if you want to treat it as a partially applied function 
      groupbykeyRDD.map(elem => (elem._1 ,elem._2.count)).collect 

预期输出:

Array((CHN,2) ,(BNG,2)) 

回答

1

您有错误无关火花,这是一个纯粹的Scala编译错误。

您可以在尝试阶(无火花可言)控制台:

scala> val iterableTest: Iterable[String] = Iterable("test") 
iterableTest: Iterable[String] = List(test) 

scala> iterableTest.count 
<console>:29: error: missing argument list for method count in trait TraversableOnce 

这是因为Iterable does not define a count (with no arguments) method。它确实定义了一个count方法,但是它需要一个谓词函数参数,这就是为什么你会得到关于部分未应用函数的特定错误。

虽然它有一个size方法,您可以交换您的示例使其工作。

+0

我根据你的答案应用了大小方法,它工作。 –

1

ELEM你得到的类型是可迭代[字符串]的然后尝试长法或尺寸的方法,因为可迭代没有计数方法,如果它不工作 你可以投Iteratable [String]列出,并尝试长度的方法

计数方法avalaible为RDD

1

count - 计算参数条件提供值的出现(布尔)

计数与您的代码:这里计算的 “2”, “3” 出现的#

scala> groupbykeyRDD.collect().foreach(println) 
(CHN,CompactBuffer(2, 3)) 
(BNG,CompactBuffer(2, 65)) 

scala> groupbykeyRDD.map(elem => (elem._1 ,elem._2.count(_ == "2"))).collect 
res14: Array[(String, Int)] = Array((CHN,1), (BNG,1)) 

scala> groupbykeyRDD.map(elem => (elem._1 ,elem._2.count(_ == "3"))).collect 
res15: Array[(String, Int)] = Array((CHN,1), (BNG,0)) 

count with with small fix to your code:如果你扭动你的代码相比,这样算应该给您预期的结果:

val keyvalueRDD = mapRDD.map(elem => (elem(0),1))

测试:

scala> val groupbykeyRDD = mapRDD.map(elem => (elem(0),1)).groupByKey() 
groupbykeyRDD: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[9] at groupByKey at <console>:18 

scala> groupbykeyRDD.collect().foreach(println) 
(CHN,CompactBuffer(1, 1)) 
(BNG,CompactBuffer(1, 1)) 

scala> groupbykeyRDD.map(elem => (elem._1 ,elem._2.count(_ == 1))).collect 
res18: Array[(String, Int)] = Array((CHN,2), (BNG,2)) 
+0

很好的解释!从你的答案我明白,我们可以通过传递函数给它返回布尔值使用计数。非常感谢。 –

+0

@SurenderRaja - 酷 - 随时投票! :) –

相关问题