2016-02-10 43 views
0

我有这样一段代码:多处理池:操作超时

import urllib2, json, csv 
import requests 
import itertools 
import multiprocessing 
import numpy 
from datetime import datetime, date, timedelta 

def getTaxiTrips(date): 
    """ 
    Gets the taxi trips occurred in NY from a starting date. 
    :param date: (Y-m-d). 
    :return: list of tuples (long, lat, drop off date). 
    """ 
    today = str(datetime.date(datetime.now())).split('-') 
    today_y = today[0] 
    today_m = today[1] 

    start = date.split('-') 
    start_y = start[0] 
    start_m = start[1] 

    print start_m + "-" + start_y + "/" + today_m + "-" + today_y 

    data = [] 
    y = int(start_y) 
    m = int(start_m) 
    while int(start_y) <= int(today_y): 
     # Month transformation 
     if m > 12: 
      m %= 12 
      y += 1 

     mt = str(m) if m > 9 else '0' + str(m) 
     # Green cabs 
     if readCSV("https://storage.googleapis.com/tlc-trip-data/" + str(y) + \ 
        "/green_tripdata_" + str(y) + "-" + mt + ".csv") is not None: 
      data.append("https://storage.googleapis.com/tlc-trip-data/" + str(y) + \ 
         "/green_tripdata_" + str(y) + "-" + mt + ".csv") 

     # Yellow cabs 
     if readCSV("https://storage.googleapis.com/tlc-trip-data/" + str(y) + 
        "/yellow_tripdata_" + str(y) + "-" + mt + ".csv") is not None: 
      data.append("https://storage.googleapis.com/tlc-trip-data/" + str(y) + 
         "/yellow_tripdata_" + str(y) + "-" + mt + ".csv") 

     if m == int(today_m): 
      break 
     m += 1 

    pool = multiprocessing.Pool(mps-1) 
    result = pool.map(consumeTaxiData, data) 
    pool.close() 
    pool.join() 

    return list(itertools.chain(*result)) 


def consumeTaxiData(url): 
    """ 
    Given a url, reads its content and process its data. 
    :param url: the url to be readen. 
    :return: a list of tuples in the form (long, lat, hour). 
    """ 
    print "Processing", url 
    points = [] 

    data = readCSV(url) 
    for line in data: 
     latitude = line.get('dropoff_latitude', None) 
     if latitude is None: 
      latitude = line.get('Dropoff_latitude', None) 

     longitude = line.get('dropoff_longitude', None) 
     if longitude is None: 
      longitude = line.get('Dropoff_longitude', None) 

     time = line.get('tpep_dropoff_datetime', None) 
     if time is None: 
      time = line.get('Lpep_dropoff_datetime', None) 

     if time is not None: 
      time = datetime.strptime(time, '%Y-%m-%d %H:%M:%S') 
      if latitude is not None and longitude is not None and time >= datetime.strptime(date, '%Y-%m-%d') and \ 
        time.weekday(): 
       time = roundTime(time, roundTo=60 * 60).hour 
       points.append((float(longitude), float(latitude), time)) 

    return points 

def readCSV(url): 
    """ 
    Read a csv file. 
    :param url: url to be read. 
    :return: an array of dictionaries. 
    """ 
    try: 
     response = urllib2.urlopen(url) 
     return csv.DictReader(response, delimiter=',') 
    except urllib2.HTTPError as e: 
     return None 

def roundTime(dt=None, roundTo=60): 
    """ 
    Round a datetime object to any time laps in seconds 
    :param dt: datetime.datetime object, default now. 
    :param roundTo: closest number of seconds to round to, default 1 minute. 
    :return: the rounded time. 
    """ 
    if dt == None : dt = datetime.now() 
    seconds = (dt - dt.min).seconds 
    rounding = (seconds+roundTo/2) // roundTo * roundTo 
    return dt + timedelta(0, rounding-seconds, -dt.microsecond) 

if __name__ == '__main__': 
    mps = multiprocessing.cpu_count() 

    date = str(datetime.date(datetime.now()) - timedelta(31*8)) 
    print "-----> Inital date:", date 

    print "-----> Getting taxi data..." 
    taxi_dropoffs = getTaxiTrips(date) 
    print len(taxi_dropoffs), "taxi trips" 

,这是工作得很好,这些数据:

https://storage.googleapis.com/tlc-trip-data/2015/green_tripdata_2015-06.csv 
https://storage.googleapis.com/tlc-trip-data/2015/yellow_tripdata_2015-06.csv 

现在,我想处理更多的数据:

https://storage.googleapis.com/tlc-trip-data/2015/green_tripdata_2015-06.csv 
https://storage.googleapis.com/tlc-trip-data/2015/green_tripdata_2015-07.csv 
https://storage.googleapis.com/tlc-trip-data/2015/green_tripdata_2015-08.csv 
https://storage.googleapis.com/tlc-trip-data/2015/yellow_tripdata_2015-08.csv 
https://storage.googleapis.com/tlc-trip-data/2015/yellow_tripdata_2015-07.csv 
https://storage.googleapis.com/tlc-trip-data/2015/yellow_tripdata_2015-06.csv 
https://storage.googleapis.com/tlc-trip-data/2015/green_tripdata_2015-09.csv 

我不断收到这样的信息:

Traceback (most recent call last): 
    File "noiseInference.py", line 489, in <module> 
    taxi_dropoffs = getTaxiTrips(date) 
    File "noiseInference.py", line 300, in getTaxiTrips 
    result = pool.map(consumeTaxiData, data) 
    File "/usr/local/Cellar/python/2.7.11/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/pool.py", line 251, in map 
    return self.map_async(func, iterable, chunksize).get() 
    File "/usr/local/Cellar/python/2.7.11/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/pool.py", line 567, in get 
    raise self._value 
socket.error: [Errno 60] Operation timed out 

因为每个.csv文件很大,我正在处理它们中的很多,所以我期待处理方法需要一些时间。但是,处理正在终止。我该如何解决这个问题?

本数据来源于此:http://www.nyc.gov/html/tlc/html/about/trip_record_data.shtml

+0

1.6GB不是很大。如果您确实需要使用CSV,那就好比多了......但如果产生的CSV为1.6GB,则没有人应该尝试使用CSV。但是,1.6GB并不是很多。 –

+0

我看@MarcusMüller。问题在于,数据如何存储在这里http://www.nyc.gov/html/tlc/html/about/trip_record_data.shtml。 – pceccon

+0

嗯,重点是可能你下载的持续时间是有问题的事情? –

回答

0

multiprocessing.Pool的输出通常是非常容易引起误解。

回溯:

return self.map_async(func, iterable, chunksize).get() 
File "/usr/local/Cellar/python/2.7.11/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/pool.py", line 567, in get 
    raise self._value 
    socket.error: [Errno 60] Operation timed out 

显示map_async.get提高存储在其_value属性的错误。该属性仅包含在该过程中引发的错误。 错误非常明显:套接字操作超时。

为了更好地了解我建议您在multiprocessing.Pool.map_async之外重现的问题。