2

使用新的Google App Engine用于input_reader的MapReduce库过滤器我想知道如何通过ndb.Key过滤。用于MapReduce的ndb.Key过滤器input_reader

我读这个post,我已经玩过日期时间,字符串,int,浮动,在过滤器元组,但我怎么可以通过ndb.Key?

当我试图通过一个ndb.Key筛选我得到这个错误:

BadReaderParamsError: Expected Key, got u"Key('Clients', 406)" 

或者这个错误:

TypeError: Key('Clients', 406) is not JSON serializable 

我试图通过的ndb.Key对象和字符串表示ndb.Key。

这里是我的两个过滤器的元组:

示例1:

input_reader': { 
    'input_reader': 'mapreduce.input_readers.DatastoreInputReader', 
    'entity_kind': 'model.Sales', 
    'filters': [("client","=", ndb.Key('Clients', 406))] 
} 

示例2:

input_reader': { 
    'input_reader': 'mapreduce.input_readers.DatastoreInputReader', 
    'entity_kind': 'model.Sales', 
    'filters': [("client","=", "%s" % ndb.Key('Clients', 406))] 
} 

回答

1

使基于DatastoreInputReader自己的输入读卡器,它知道如何解码关键基础过滤器:

class DatastoreKeyInputReader(input_readers.DatastoreKeyInputReader): 
    """Augment the base input reader to accommodate ReferenceProperty filters""" 
    def __init__(self, *args, **kwargs): 
     try: 
      filters = kwargs['filters'] 
      decoded = [] 
      for f in filters: 
       value = f[2] 
       if isinstance(value, list): 
        value = db.Key.from_path(*value) 
       decoded.append((f[0], f[1], value)) 
      kwargs['filters'] = decoded 
     except KeyError: 
      pass 

     super(DatastoreKeyInputReader, self).__init__(*args, **kwargs) 

运行在您的过滤器这个功能将它们作为选项之前:

def encode_filters(filters): 
    if filters is not None: 
     encoded = [] 
     for f in filters: 
      value = f[2] 
      if isinstance(value, db.Model): 
       value = value.key() 
      if isinstance(value, db.Key): 
       value = value.to_path() 
      entry = (f[0], f[1], value) 
      encoded.append(entry) 
     filters = encoded 

    return filters 
2

这是一个有点棘手。

如果你看看code on Google Code你可以看到,mapreduce.model定义JSON_DEFAULTS字典这就决定了获得特殊情况下,JSON序列化/反序列化处理类:在默认情况下,只是日期时间。所以,你可以猴子修补ndb.Key类到那里,它提供的功能来做到这一点的序列化/反序列化 - 这样的:

from mapreduce import model 

def _JsonEncodeKey(o): 
    """Json encode an ndb.Key object.""" 
    return {'key_string': o.urlsafe()} 

def _JsonDecodeKey(d): 
    """Json decode a ndb.Key object.""" 
    return ndb.Key(urlsafe=d['key_string']) 

model.JSON_DEFAULTS[ndb.Key] = (_JsonEncodeKey, _JsonDecodeKey) 
model._TYPE_IDS['Key'] = ndb.Key 

你也可能需要重复最后两行修补mapreduce.lib.pipeline.util以及。

另外请注意,如果你这样做,你需要确保它运行在任何运行mapreduce的任何部分的实例上:最简单的方法是编写一个包装脚本来导入上面的注册码,以及mapreduce.main.APP,并覆盖您的app.yaml中的mapreduce网址以指向您的包装。

1

您是否知道to_old_key()和from_old_key()方法?

+0

这些方法在这种情况下应该如何使用? – ssidorenko 2013-07-17 20:08:26

1

我有同样的问题,并提出了一个计算属性的解决方法。

您可以使用密钥ID向您的销售模型添加新的ndb.ComputedProperty。 Ids只是字符串,所以你不会有任何JSON问题。

client_id = ndb.ComputedProperty(lambda self: self.client.id()) 

,然后添加一个条件,你的映射缩减查询过滤器

input_reader': { 
    'input_reader': 'mapreduce.input_readers.DatastoreInputReader', 
    'entity_kind': 'model.Sales', 
    'filters': [("client_id","=", '406'] 
} 

唯一的缺点是计算属性没有索引和存储,直到你调用put()的参数,所以你必须要遍历所有销售实体并保存它们:

for sale in Sales.query().fetch(): 
    sale.put()