2015-10-14 73 views
1

我有一个值列表以及它们作为一个数组出现的所有事件的聚合长度。Spark中的平均单词长度

例如:如果我的一句话就是

"I have a cat. The cat looks very cute" 

我的阵列看起来像

Array((I,1), (have,4), (a,1), (cat,6), (The, 3), (looks, 5), (very ,4), (cute,4)) 

现在我要计算每个单词的平均长度。即发生的长度/次数。

我试图做斯卡拉使用的编码如下:

val avglen = arr.reduceByKey((x,y) => (x, y.toDouble/x.size.toDouble)) 

我得到一个错误,因为在x.size如下 ^ 错误:值的大小是不是int

成员

请帮我在哪里我错了。

问候 VRK

+0

我在寻找每个单词的平均长度(而不是在整个文本的水平),即如果一个单词出现的次数越多,我需要得到更多的单词的平均长度。例如,在我的段落中的单词猫出现了两次,从而,该单词的平均长度为6/3 = 2换句话说,如“该”,平均长度为3/3 = 1 – VRK

回答

0

您的评论后,我想我明白了:

val words = sc.parallelize(Array(("i", 1), ("have", 4), 
           ("a", 1), ("cat", 6), 
           ("the", 3), ("looks", 5), 
           ("very", 4), ("cute", 4))) 

val avgs = words.map { case (word, count) => (word, count/word.length.toDouble) } 

println("My averages are: ") 
avgs.take(100).foreach(println) 

enter image description here

假如你有这些词的一段,你要计算的一段话的平均尺寸。

在两个步骤中,用map-reduce方法和在spark-1.5.1

val words = sc.parallelize(Array(("i", 1), ("have", 4), 
           ("a", 1), ("cat", 6), 
           ("the", 3), ("looks", 5), 
           ("very", 4), ("cute", 4))) 

val wordCount = words.map { case (word, count) => count}.reduce((a, b) => a + b) 
val wordLength = words.map { case (word, count) => word.length * count}.reduce((a, b) => a + b) 

println("The avg length is: " + wordLength/wordCount.toDouble) 

我使用连接到spark-kernel这一个.ipynb运行这个代码是输出。

enter image description here

+0

这不是平均长度所有单词。该值表示该句子中该单词的所有字符(不是单词的数量!)的总计数。 –

+0

@RohanAletty我不确定他想要什么,这就是我所理解的,通常如果一个段落有一些词语,并且你想计算这个意思,这是适当的方法 –

+0

请看我的回答,我相信这是什么海报要求。从本质上讲,字数已经汇总,因此密钥是唯一的。 –

0

如果我理解正确的问题:

val rdd: RDD[(String, Int) = ??? 
val ave: RDD[(String, Double) = 
    rdd.map { case (name, numOccurance) => 
     (name, name.length.toDouble/numOccurance) 
    } 
0

这是一个略显混乱的问题。如果您的数据已经在Array[(String, Int)]集合中(可能在驱动程序的collect()之后),那么您无需使用任何RDD转换。事实上,还有你可以用fold*()跑过来集合抢平均一个漂亮的窍门:

val average = arr.foldLeft(0.0) { case (sum: Double, (_, count: Int)) => sum + count }/arr.foldLeft(0.0) { case (sum: Double, (word: String, count: Int)) => sum + count/word.length } 

的长篇大论类,但它本质上聚集在分子的总字符数,字数在数分母。在你的例子来看,我看到以下内容:

scala> val arr = Array(("I",1), ("have",4), ("a",1), ("cat",6), ("The", 3), ("looks", 5), ("very" ,4), ("cute",4)) 
arr: Array[(String, Int)] = Array((I,1), (have,4), (a,1), (cat,6), (The,3), (looks,5), (very,4), (cute,4)) 

scala> val average = ... 
average: Double = 3.111111111111111 

如果您有跨RDD[(String, Int)]分布式您(String, Int)元组,你可以使用accumulators来很容易地解决这个问题:

val chars = sc.accumulator(0.0) 
val words = sc.accumulator(0.0) 
wordsRDD.foreach { case (word: String, count: Int) => 
    chars += count; words += count/word.length 
} 

val average = chars.value/words.value 

当在运行例如(放置在RDD)上面,我看到以下内容:

scala> val arr = Array(("I",1), ("have",4), ("a",1), ("cat",6), ("The", 3), ("looks", 5), ("very" ,4), ("cute",4)) 
arr: Array[(String, Int)] = Array((I,1), (have,4), (a,1), (cat,6), (The,3), (looks,5), (very,4), (cute,4)) 

scala> val wordsRDD = sc.parallelize(arr) 
wordsRDD: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:14 

scala> val chars = sc.accumulator(0.0) 
chars: org.apache.spark.Accumulator[Double] = 0.0 

scala> val words = sc.accumulator(0.0) 
words: org.apache.spark.Accumulator[Double] = 0.0 

scala> wordsRDD.foreach { case (word: String, count: Int) => 
    | chars += count; words += count/word.length 
    | } 
... 
scala>  val average = chars.value/words.value 
average: Double = 3.111111111111111