2012-02-12 33 views
8

我正在研究appengine-mapreduce函数,并修改了演示以适合我的目的。 基本上,我有以下格式的行数百万:userid,time1,time2。我的目的是为每个用户标识找出time1和time2之间的区别。用appengine-mapreduce命中内存限制

然而,正如我在谷歌应用程序引擎运行此,我遇到在日志部分此错误消息:

超出软专用空间限制与180.56 MB服务130个请求后的总 虽然处理这个请求,处理这个请求的进程被发现使用了太多的内存并被终止。这很可能会导致下一个请求应用程序使用新的进程。如果您经常看到此消息,那么您的应用程序中可能会有内存泄漏。

def time_count_map(data): 
    """Time count map function.""" 
    (entry, text_fn) = data 
    text = text_fn() 

    try: 
    q = text.split('\n') 
    for m in q: 
     reader = csv.reader([m.replace('\0', '')], skipinitialspace=True) 
     for s in reader: 
      """Calculate time elapsed""" 
      sdw = s[1] 
      start_date = time.strptime(sdw,"%m/%d/%y %I:%M:%S%p") 
      edw = s[2] 
      end_date = time.strptime(edw,"%m/%d/%y %I:%M:%S%p") 
      time_difference = time.mktime(end_date) - time.mktime(start_date) 
      yield (s[0], time_difference) 
    except IndexError, e: 
    logging.debug(e) 


def time_count_reduce(key, values): 
    """Time count reduce function.""" 
    time = 0.0 
    for subtime in values: 
    time += float(subtime) 
    realtime = int(time) 
    yield "%s: %d\n" % (key, realtime) 

任何人都可以建议我怎么回事,可以优化我的代码更好?谢谢!!

编辑:

这里的管线处理程序:

class TimeCountPipeline(base_handler.PipelineBase): 
    """A pipeline to run Time count demo. 

    Args: 
    blobkey: blobkey to process as string. Should be a zip archive with 
     text files inside. 
    """ 

    def run(self, filekey, blobkey): 
    logging.debug("filename is %s" % filekey) 
    output = yield mapreduce_pipeline.MapreducePipeline(
     "time_count", 
     "main.time_count_map", 
     "main.time_count_reduce", 
     "mapreduce.input_readers.BlobstoreZipInputReader", 
     "mapreduce.output_writers.BlobstoreOutputWriter", 
     mapper_params={ 
      "blob_key": blobkey, 
     }, 
     reducer_params={ 
      "mime_type": "text/plain", 
     }, 
     shards=32) 
    yield StoreOutput("TimeCount", filekey, output) 

Mapreduce.yaml:

mapreduce: 
- name: Make messages lowercase 
    params: 
    - name: done_callback 
    value: /done 
    mapper: 
    handler: main.lower_case_posts 
    input_reader: mapreduce.input_readers.DatastoreInputReader 
    params: 
    - name: entity_kind 
     default: main.Post 
    - name: processing_rate 
     default: 100 
    - name: shard_count 
     default: 4 
- name: Make messages upper case 
    params: 
    - name: done_callback 
    value: /done 
    mapper: 
    handler: main.upper_case_posts 
    input_reader: mapreduce.input_readers.DatastoreInputReader 
    params: 
    - name: entity_kind 
     default: main.Post 
    - name: processing_rate 
     default: 100 
    - name: shard_count 
     default: 4 

文件的其余部分是完全一样的演示。

我上传的代码我复制上的Dropbox:http://dl.dropbox.com/u/4288806/demo%20compressed%20fail%20memory.zip

+0

你可以显示你的mapreduce配置吗?出于某种原因,它看起来像将整个文件传递给映射器,而不是逐行映射它。 – 2012-02-12 18:45:09

+0

嗨丹尼尔,我的问题已被编辑。谢谢,真的很感激! – autumngard 2012-02-13 00:42:00

回答

2

很可能你的输入文件的大小超过了软内存限制。对于大文件,请使用BlobstoreLineInputReaderBlobstoreZipLineInputReader

这些输入阅读器将不同的东西传递给map函数,它们传递文件中的start_position和文本行。

map功能可能看起来像:

def time_count_map(data): 
    """Time count map function.""" 
    text = data[1] 

    try: 
     reader = csv.reader([text.replace('\0', '')], skipinitialspace=True) 
     for s in reader: 
      """Calculate time elapsed""" 
      sdw = s[1] 
      start_date = time.strptime(sdw,"%m/%d/%y %I:%M:%S%p") 
      edw = s[2] 
      end_date = time.strptime(edw,"%m/%d/%y %I:%M:%S%p") 
      time_difference = time.mktime(end_date) - time.mktime(start_date) 
      yield (s[0], time_difference) 
    except IndexError, e: 
     logging.debug(e) 

使用BlobstoreLineInputReader将允许作业更快,因为它可以使用一个以上的碎片,最多256个运行,但它意味着你需要上传你的文件未压缩,这可能是一个痛苦。我通过将压缩文件上传到EC2 windows服务器来处理它,然后从那里解压并上传,因为上行带宽非常大。

+0

这对我来说非常好!非常感谢! :) – autumngard 2012-02-13 07:11:24

6

还可以考虑在代码期间的常规点调用gc.collect()。我看到过几个SO问题,关于通过调用gc.collect()缓解的超限软内存限制,大部分都与blobstore有关。

+0

调用gc.collect()只适用于blobstore或一般? – marcadian 2014-07-07 19:58:13