我正在研究appengine-mapreduce函数,并修改了演示以适合我的目的。 基本上,我有以下格式的行数百万:userid,time1,time2。我的目的是为每个用户标识找出time1和time2之间的区别。用appengine-mapreduce命中内存限制
然而,正如我在谷歌应用程序引擎运行此,我遇到在日志部分此错误消息:
超出软专用空间限制与180.56 MB服务130个请求后的总 虽然处理这个请求,处理这个请求的进程被发现使用了太多的内存并被终止。这很可能会导致下一个请求应用程序使用新的进程。如果您经常看到此消息,那么您的应用程序中可能会有内存泄漏。
def time_count_map(data):
"""Time count map function."""
(entry, text_fn) = data
text = text_fn()
try:
q = text.split('\n')
for m in q:
reader = csv.reader([m.replace('\0', '')], skipinitialspace=True)
for s in reader:
"""Calculate time elapsed"""
sdw = s[1]
start_date = time.strptime(sdw,"%m/%d/%y %I:%M:%S%p")
edw = s[2]
end_date = time.strptime(edw,"%m/%d/%y %I:%M:%S%p")
time_difference = time.mktime(end_date) - time.mktime(start_date)
yield (s[0], time_difference)
except IndexError, e:
logging.debug(e)
def time_count_reduce(key, values):
"""Time count reduce function."""
time = 0.0
for subtime in values:
time += float(subtime)
realtime = int(time)
yield "%s: %d\n" % (key, realtime)
任何人都可以建议我怎么回事,可以优化我的代码更好?谢谢!!
编辑:
这里的管线处理程序:
class TimeCountPipeline(base_handler.PipelineBase):
"""A pipeline to run Time count demo.
Args:
blobkey: blobkey to process as string. Should be a zip archive with
text files inside.
"""
def run(self, filekey, blobkey):
logging.debug("filename is %s" % filekey)
output = yield mapreduce_pipeline.MapreducePipeline(
"time_count",
"main.time_count_map",
"main.time_count_reduce",
"mapreduce.input_readers.BlobstoreZipInputReader",
"mapreduce.output_writers.BlobstoreOutputWriter",
mapper_params={
"blob_key": blobkey,
},
reducer_params={
"mime_type": "text/plain",
},
shards=32)
yield StoreOutput("TimeCount", filekey, output)
Mapreduce.yaml:
mapreduce:
- name: Make messages lowercase
params:
- name: done_callback
value: /done
mapper:
handler: main.lower_case_posts
input_reader: mapreduce.input_readers.DatastoreInputReader
params:
- name: entity_kind
default: main.Post
- name: processing_rate
default: 100
- name: shard_count
default: 4
- name: Make messages upper case
params:
- name: done_callback
value: /done
mapper:
handler: main.upper_case_posts
input_reader: mapreduce.input_readers.DatastoreInputReader
params:
- name: entity_kind
default: main.Post
- name: processing_rate
default: 100
- name: shard_count
default: 4
文件的其余部分是完全一样的演示。
我上传的代码我复制上的Dropbox:http://dl.dropbox.com/u/4288806/demo%20compressed%20fail%20memory.zip
你可以显示你的mapreduce配置吗?出于某种原因,它看起来像将整个文件传递给映射器,而不是逐行映射它。 – 2012-02-12 18:45:09
嗨丹尼尔,我的问题已被编辑。谢谢,真的很感激! – autumngard 2012-02-13 00:42:00