2016-07-21 33 views
0

我是pyspark的新手。我有一对RDD(键值)。我想为每个键创建一个n桶的直方图。输出会是这样的:Pyspark:为Pair中的每个键创建直方图RDD

[(key1, [...buckets...], [...counts...]), 
(key2, [...buckets...], [...counts...])] 

我已经看到了检索最大值或每个键的总和的例子,但有没有办法通过直方图(n)函数被应用到每个键的值?

回答

-1

尝试:

>>> import numpy as np 
>>> 
>>> rdd.groupByKey().map(lambda (x, y): np.histogram(list(y))) 
+0

这不适合我。 np.histogram不会接受groupByKey产生的'ResultIterable'。 – Petrichor

+0

请解释您为什么认为这会有所帮助。只有代码答案通常不是很有用。 – Pureferret

0

我知道这个职位是比较旧的,但人们仍然在寻找一PySpark的解决方案,这是我的两分钱的问题。

让我们考虑一个(键,值)对RDD,让我们说“直方图”我们主要讨论每个键有多少不同的值以及它们各自的基数。

aggregateByKey()是一个很好的选择。在aggregateByKey()中,基本上声明了三个输入值:聚合器默认值,分区内聚合函数,分区间聚合函数。

让我们考虑到有放射性散布的形式

[(124, 2), 
(124, 2), 
(124, 2), 
(125, 2), 
(125, 2), 
(125, 2), 
(126, 2), 
(126, 2), 
(126, 2), 
(127, 2), 
(127, 2), 
(127, 2), 
(128, 2), 
(128, 2), 
(128, 2), 
(129, 2), 
(129, 2), 
(129, 2), 
(130, 2), 
(130, 2), 
(130, 2), 
(131, 2), 
(131, 2), 
(131, 2), 
(132, 2), 
(132, 2), 
(132, 2), 
(133, 2), 
(133, 2), 
(133, 2), 
(134, 2), 
(134, 2), 
(134, 2), 
(135, 2), 
(135, 2), 
(135, 2), 
(136, 2), 
(136, 1), 
(136, 2), 
(137, 2), 
(137, 2), 
(137, 2), 
(138, 2), 
(138, 2), 
(138, 2), 
(139, 2), 
(139, 2), 
(139, 2), 
(140, 2), 
(140, 2), 
(140, 2), 
(141, 2), 
(141, 1), 
(141, 1), 
(142, 2), 
(142, 2), 
(142, 2), 
(143, 2), 
(143, 2), 
(143, 2), 
(144, 1), 
(144, 1), 
(144, 2), 
(145, 1), 
(145, 1), 
(145, 1), 
(146, 2), 
(146, 2), 
(146, 2), 
(147, 2), 
(147, 2), 
(147, 2), 
(148, 2), 
(148, 2), 
(148, 2), 
(149, 2), 
(149, 2), 
(149, 2), 
(150, 2), 
(150, 2), 
(150, 2), 
(151, 2), 
(151, 2), 
(151, 2), 
(152, 2), 
(152, 2), 
(152, 2), 
(153, 2), 
(153, 1), 
(153, 2), 
(154, 2), 
(154, 2), 
(154, 2), 
(155, 2), 
(155, 1), 
(155, 2), 
(156, 2), 
(156, 2), 
(156, 2), 
(157, 1), 
(157, 2), 
(157, 2), 
(158, 2), 
(158, 2), 
(158, 2), 
(159, 2), 
(159, 2), 
(159, 2), 
(160, 2), 
(160, 2), 
(160, 2), 
(161, 2), 
(161, 1), 
(161, 2), 
(162, 2), 
(162, 2), 
(162, 2), 
(163, 2), 
(163, 1), 
(163, 2), 
(164, 2), 
(164, 2), 
(164, 2), 
(165, 2), 
(165, 2), 
(165, 2), 
(166, 2), 
(166, 1), 
(166, 2), 
(167, 2), 
(167, 2), 
(167, 2), 
(168, 2), 
(168, 1), 
(168, 1), 
(169, 2), 
(169, 2), 
(169, 2), 
(170, 2), 
(170, 2), 
(170, 2), 
(171, 2), 
(171, 2), 
(171, 2), 
(172, 2), 
(172, 2), 
(172, 2), 
(173, 2), 
(173, 2), 
(173, 1), 
(174, 2), 
(174, 1), 
(174, 1), 
(175, 1), 
(175, 1), 
(175, 1), 
(176, 1), 
(176, 1), 
(176, 1), 
(177, 2), 
(177, 2), 
(177, 2)] 

据我所知,做最简单的方法是在每个键聚合值根据Python字典在字典关键是RDD值和与每个字典关键字关联的值是每个RDD值有多少RDD值的计数器。 RDD密钥不需要考虑,因为aggregateByKey()函数会自动处理RDD密钥。

聚集调用的形式

myRDD.aggregateByKey(dict(), withinPartition, betweenPartition) 

,我们初始化所有这些蓄能器为空字典。

的内分区聚合功能,因此,具有以下形式

def withinPartition(dictionary, record): 
    if record in dictionary.keys(): 
     dictionary[record] += 1 
    else: 
     dictionary[record] = 1 
    return dictionary 

其中dictionary是每RDD值计数器,而record在给定RDD值(一个整数,在该实例中,看到上面的RDD示例)。基本上,如果字典中已经存在给定的RDD值,我们会增加一个+1计数器。否则,我们初始化柜台。

分区之间功能的工作原理几乎相同

def betweenPartition(dictionary1, dictionary2): 
    return {k: dictionary1.get(k, 0) + dictionary2.get(k, 0) for k in set(dictionary1) | set(dictionary2)} 

基本上,对于给定的RDD关键,让我们考虑有两个库。我们将这两个字典合并成一个唯一的字典,方法是对给定的键值进行求和,或者在给定的键不存在于两个字典之一(逻辑或)的情况下添加给定的键。字典合并的信用额为georg's solution in this post

产生的RDD将有形式

[(162, {2: 3}), 
(132, {2: 3}), 
(168, {1: 2, 2: 1}), 
(138, {2: 3}), 
(174, {1: 2, 2: 1}), 
(144, {1: 2, 2: 1}), 
(150, {2: 3}), 
(156, {2: 3}), 
(126, {2: 3}), 
(163, {1: 1, 2: 2}), 
(133, {2: 3}), 
(169, {2: 3}), 
(139, {2: 3}), 
(175, {1: 3}), 
(145, {1: 3}), 
(151, {2: 3}), 
(157, {1: 1, 2: 2}), 
(127, {2: 3}), 
(128, {2: 3}), 
(164, {2: 3}), 
(134, {2: 3}), 
(170, {2: 3}), 
(140, {2: 3}), 
(176, {1: 3}), 
(146, {2: 3}), 
(152, {2: 3}), 
(158, {2: 3}), 
(129, {2: 3}), 
(165, {2: 3}), 
(135, {2: 3}), 
(171, {2: 3}), 
(141, {1: 2, 2: 1}), 
(177, {2: 3}), 
(147, {2: 3}), 
(153, {1: 1, 2: 2}), 
(159, {2: 3}), 
(160, {2: 3}), 
(130, {2: 3}), 
(166, {1: 1, 2: 2}), 
(136, {1: 1, 2: 2}), 
(172, {2: 3}), 
(142, {2: 3}), 
(148, {2: 3}), 
(154, {2: 3}), 
(124, {2: 3}), 
(161, {1: 1, 2: 2}), 
(131, {2: 3}), 
(167, {2: 3}), 
(137, {2: 3}), 
(173, {1: 1, 2: 2}), 
(143, {2: 3}), 
(149, {2: 3}), 
(155, {1: 1, 2: 2}), 
(125, {2: 3})] 

原来RDD键仍然可以在这一新的RDD被发现。每个新的RDD值都是一个字典。反过来,每个词典键对应于可能的RDD值之一,而每个词典值是每个RDD键给定RDD值存在多少次的计数器。

相关问题