2015-02-05 41 views
0

我想使用spark来离散一些数据。在PySpark中减少的正确输入

我有以下列格式数据:

date   zip amount 
2013/04/02 04324 32.2 
2013/04/01 23242 1.5 
2013/04/02 99343 12 

然后我有以下代码:

sampleTable = sqlCtx.inferSchema(columns) 
sampleTable.registerAsTable("amounts") 


exTable = sampleTable.map(lambda p: {"date":p.date,"zip":p.zip,"amount":p.amount}) 

然后我有一个函数来离散:

def discretize((key, data), cutoff=0.75): 
    result = (data < np.percentile(index,cutoff)) 
    return result 

我将采取这个结果列,然后加入原始数据集。

我试图用这个语句来执行的操作:

exDiscretized = exTable.map(lambda x: (((dt.datetime.strptime(x.date,'%Y/%m/%d')).year, (dt.datetime.strptime(x.date,'%Y/%m/%d')).month), x.amount)).reduce(discretize).collect() 

基本上,我想((年,月),整行)的元组这样的话我可以找到每个第75百分位月份和年份组合。

我能够让地图部分正常工作。当我拿出减少的部分,我得到的代码工作。

当我运行与地图和减少双方的声明,我得到以下错误:

org.apache.spark.api.python.PythonException: Traceback (most recent call last): 
    File "/opt/cloudera/parcels/CDH-5.2.0-1.cdh5.2.0.p0.36/lib/spark/python/pyspark/worker.py", line 79, in main 
serializer.dump_stream(func(split_index, iterator), outfile) 
    File "/opt/cloudera/parcels/CDH-5.2.0-1.cdh5.2.0.p0.36/lib/spark/python/pyspark/serializers.py", line 196, in dump_stream 
self.serializer.dump_stream(self._batched(iterator), stream) 
    File "/opt/cloudera/parcels/CDH-5.2.0-1.cdh5.2.0.p0.36/lib/spark/python/pyspark/serializers.py", line 127, in dump_stream 
    for obj in iterator: 
    File "/opt/cloudera/parcels/CDH-5.2.0-1.cdh5.2.0.p0.36/lib/spark/python/pyspark/serializers.py", line 185, in _batched 
for item in iterator: 
    File "/opt/cloudera/parcels/CDH-5.2.0-1.cdh5.2.0.p0.36/lib/spark/python/pyspark/rdd.py", line 715, in func 
yield reduce(f, iterator, initial) 
    File "<stdin>", line 2, in discretize 
    File "/usr/local/lib/python2.7/dist-packages/numpy-1.9.1-py2.7-linux-x86_64.egg/numpy/lib/function_base.py", line 3051, in percentile 
    q = array(q, dtype=np.float64, copy=True) 
ValueError: setting an array element with a sequence. 

我不知道我在做什么错。也许这与我生成关键值对的方式有关?

回答

1

所以我认为问题的根源在于减少不能像你尝试使用它的方式那样工作。由于您想将单个键的所有数据放在一起,因此groupByKey函数可能就是您正在寻找的那个函数。这里有一个例子:

input = sc.parallelize([("hi", 1), ("bye", 0), ("hi", 3)]) 
groupedInput = input.groupByKey() 
def top(x): 
    data = list(x) 
    percentile = np.percentile(data, 0.70) 
    return filter(lambda x: x >= percentile , data) 
modifiedGroupedInput = groupedInput.mapValues(top) 
modifiedGroupedInput.collect() 

结果:

[('bye', [0]), ('hi', [3])] 

一般reduceByKey通常是更好地使用,但因为你要考虑所有的每个关键元素的同时计算

+0

问题是,我不想按键分组,但我不想聚合每个组,而是想要生成与原始数据长度相同的矢量。只产生第75个百分点,然后再加入我原来的数据会更好吗? – Michal