2017-10-28 103 views
0

我刚刚开始使用MRJob库在Python中编写MapReduce程序。在单个mapreduce中同时产生最大值和最小值

在视频教程中演示的一个示例是通过location_id查找最高温度。接下来写的另一个程序,通过location_id找到最低温度也很简单。

我在想,是否有一种方法可以通过location_id在单个mapreduce程序中产生最大和最小温度?下面是我走在它:

from mrjob.job import MRJob 

'''Sample Data 
ITE00100554,18000101,TMAX,-75,,,E, 
ITE00100554,18000101,TMIN,-148,,,E, 
GM000010962,18000101,PRCP,0,,,E, 
EZE00100082,18000101,TMAX,-86,,,E, 
EZE00100082,18000101,TMIN,-135,,,E, 
ITE00100554,18000102,TMAX,-60,,I,E, 
ITE00100554,18000102,TMIN,-125,,,E, 
GM000010962,18000102,PRCP,0,,,E, 
EZE00100082,18000102,TMAX,-44,,,E, 

Output I am expecting to see: 
ITE00100554 32.3 20.2 
EZE00100082 34.4 19.6 
''' 

class MaxMinTemperature(MRJob): 
    def mapper(self, _, line): 
     location, datetime, measure, temperature, w, x, y, z = line.split(',') 
     temperature = float(temperature)/10 
     if measure == 'TMAX' or measure == 'TMIN': 
      yield location, temperature 

    def reducer(self, location, temperatures): 
     yield location, max(temperatures), min(temperatures) 


if __name__ == '__main__': 
    MaxMinTemperature.run() 

我得到以下错误:

File "MaxMinTemperature.py", line 12, in reducer 
yield location, max(temperatures), min(temperatures) 
ValueError: min() arg is an empty sequence 

这可能吗?

感谢您的协助。

希夫

回答

0

你有减速两个问题:

  1. 如果检查温度参数的类型,你会发现它是一台发电机。一个发电机只能运行一次,所以你不能将同一个发电机传递给'min'和'max'功能。正确的解决方案是手动遍历它。一个错误的解决方案 - 将其转换为列表 - 可能会导致内存不足,从而导致内存不足,因为列表将其所有元素保存在内存中,而生成器不会。

  2. 减速器的结果必须是双元组元组。所以你需要将你的最小和最大温度结合到另一个元组中。

完整的工作方案:

class MaxMinTemperature(MRJob): 
    def mapper(self, _, line): 
     location, datetime, measure, temperature, w, x, y, z = line.split(',') 
     temperature = float(temperature)/10 
     if measure in ('TMAX', 'TMIN'): 
      yield location, temperature 

    def reducer(self, location, temperatures): 
     min_temp = next(temperatures) 
     max_temp = min_temp 
     for item in temperatures: 
      min_temp = min(item, min_temp) 
      max_temp = max(item, max_temp) 
     yield location, (min_temp, max_temp) 
+0

谢谢你@ AleksandrBorisov。了解你的解决方案在做什么! –

0

的问题是,temperaturesreducer方法是generator


为了更好的理解,让我们创建一个简单的发电机,并期待其行为:

def my_gen(an_iterable): 
    for item in an_iterable: 
     yield item 

my_generator = my_gen([1,2,3,4,5]) 
print(type(my_generator)) # <class 'generator'> 

的这种对象的特征Оne是,一旦用完,不能再使用它:

print(list(my_generator)) # [1, 2, 3, 4, 5] 
print(list(my_generator)) # [] 

max()min()因此顺序执行导致了一个错误:

my_generator = my_gen([1,2,3,4,5]) 

print(max(my_generator)) # 5 
print(min(my_generator)) # ValueError: min() arg is an empty sequence 

所以,你不能因为在第二次使用发电机将被耗尽使用既max()min()内置函数同一发生器。


相反,你可以:

1)发电机转换成一个列表,并使用它:在1换

my_generator = my_gen([1,2,3,4,5]) 
my_list = list(my_generator) 

print(max(my_list)) # 5 
print(min(my_list)) # 1 

2)或提取分钟和发电机的最大值循环:

my_generator = my_gen([1,2,3,4,5]) 

from functools import reduce 
val_max, val_min = reduce(lambda x,y: (max(y, x[0]), min(y, x[1])), my_generator, (float('-inf'), float('inf'))) 

print(val_max, val_min) # 5 1 

所以, reducer的下列编辑:

def reducer(self, location, temperatures): 
    tempr_list = list(temperatures) 
    yield location, max(tempr_list), min(tempr_list) 

应该修复错误。

+0

非常感谢@MaximTitarenko向我解释发电机是什么。非常感激。 –