2016-12-13 37 views
2

问题。我一直负责研究如何在Elasticsearch中回填数据。到目前为止有点空虚。基本要点是:Elasticsearch在计算后回填两个字段到一个新字段

注意:所有文件都存储在每日索引下,每天约200k文件。

  • 我需要能够重新索引60天左右的数据。
  • 我需要为每个doc payload.time_sec和payload.time_nanosec带两个字段,取其中的值并对它们进行一些计算(time_sec * 10 ** 9 + time_nanosec),然后将其作为单个字段返回到重新索引文件

我期待在散装佣工的Python API文档: http://elasticsearch-py.readthedocs.io/en/master/helpers.html

但我想知道如果这甚至有可能。

我的想法是使用: 批量助手拉取滚动ID(批量_update?),迭代每个文档ID,从两个字段中为每个码头拉入数据,进行数学计算并完成更新请求新的现场数据。

任何人都这样做?也许有一个groovy脚本的东西?

谢谢!

回答

1

批量帮助器拉取滚动ID(批量_update?),迭代每个文档ID,从每个扩展坞的两个字段中提取数据,进行数学计算,并使用新字段完成更新请求数据。

基本上是这样:

  • 使用/_search?scroll来获取文档
  • 执行您的操作
  • 发送/_bulk更新请求

其他选项是:

两者都支持脚本,如果我理解正确的话,沃尔德是完美的选择,因为你的更新不取决于外部因素,所以这可以直接在服务器内完成。

+0

我一直在使用python来解决这个问题到目前为止,将在一个新的回复中发布代码片段 – fastfiveoh

+0

@fastfiveoh你最终采用哪种解决方案?我遇到类似的问题,并想知道哪种方式是最好的。 –

+0

@RobinWang我终于写了脚本,你可以在我的回购中看到它:[link] https://github.com/fastfiveoh/python-es-reindex/blob/master/backfill_data – fastfiveoh

0

这是我在哪里,在(大约):

我一直努力用Python和大宗助手,到目前为止我在这里:

doc = helpers.scan(es, query={ 
"query": { 
"match_all": {} 

}, 
"size":1000 
},index=INDEX, scroll='5m', raise_on_error=False) 


    for x in doc: 
x['_index'] = NEW_INDEX 
try: 
    time_sec = x['_source']['payload']['time_sec'] 
    time_nanosec=x['_source']['payload']['time_nanosec'] 
    duration = (time_sec * 10**9) + time_nanosec 
except KeyError: pass 

count = count + 1 

x['_source']['payload']['duration'] = duration 
new_index_data.append(x) 

helpers.bulk(es,new_index_data) 

在这里,我只是用大头python助手插入到一个新的索引。不过,我会尝试使用批量更新对现有索引进行更改和测试。

这看起来像一个正确的方法?

+0

也是我搬到一个新的索引,然后将删除旧的索引,并指向一个别名到新的索引是采取一个新的映射模板。 – fastfiveoh

+0

听起来更适合使用'_reindex' API – mark

+0

雅我觉得reindex会很好。我正在检查数据完整性,回溯3个月,回填大约6-8个新领域。 – fastfiveoh