App Engine Mapreduce API是否根据最终reduce工作中的逻辑决定计算分片大小?设置App Engine mapreduce分片大小
我正在使用App Engine mapreduce API并提供了kwarg来设置我的mapreduce分片大小。
在我的mapreduce作业中,分片大小尤其重要,因为我不想将太多结果批量分配给执行reduce函数最后一步的任何结果。换句话说,我正在硬编码碎片大小,以根据系统上的外部约束将用户平均分配。
地图作业似乎分裂得很好,但减速器只使用我指定的一小部分碎片。
这里是排序的代码我处理的粗略轮廓:
SHARD_SIZE = 42
def map_fun(entity):
shard_key = random.randint(1, SHARD_SIZE)
yield (
shard_key,
db.model_to_protobuf(entity).SerializeToString().encode('base64')
)
def reduce_fun(key, entities):
batch = []
for entity in entities:
#check for stuff
batch.append(entity)
expensive_side_effect(batch)
class MyGreatPipeline(base_handler.PipelineBase):
def run(self, *args, **kw):
yield mapreduce_pipeline.MapreducePipeline(
'label'
'path.to.map_fun',
'path.to.reduce_fun',
'mapreduce.input_readers.DatastoreInputReader',
'mapreduce.output_writers.BlobstoreOutputWriter',
mapper_params={
'entity_kind': 'path.to.entity',
'queue_name': 'coolQueue'
},
reducer_params={},
shard_size = SHARD_SIZE
)
map_fun
特别指定每个实体对根据碎片大小随机确定一个碎片。我很困惑,为什么我的reducer将会有比SHARD_SIZE
更少的碎片,因为有很多实体,并且极不可能重复选择相同的整数。