2015-02-05 47 views
1

我正尝试使用Python API将大量文档批量插入到弹性搜索中。如何在使用Python API的弹性搜索中进行批量索引

import elasticsearch 
from pymongo import MongoClient 

es = elasticsearch.Elasticsearch() 

def index_collection(db, collection, fields, host='localhost', port=27017): 
    conn = MongoClient(host, port) 
    coll = conn[db][collection] 
    cursor = coll.find({}, fields=fields, timeout=False) 
    print "Starting Bulk index of {} documents".format(cursor.count()) 

    def action_gen(): 
     """ 
     Generator to use for bulk inserts 
     """ 
     for n, doc in enumerate(cursor): 

      op_dict = { 
       '_index': db.lower(), 
       '_type': collection, 
       '_id': int('0x' + str(doc['_id']), 16), 
      } 
      doc.pop('_id') 
      op_dict['_source'] = doc 
      yield op_dict 

    res = bulk(es, action_gen(), stats_only=True) 
    print res 

的文件来自MongoDB的收集和我会哄上面的函数根据该文档的方式解释来做大量的索引。

批量索引继续填充数千个空文档的弹性搜索。谁能告诉我我做错了什么?

+0

你的索引已经存在于ES吗?如果是这样,是否有为其定义的映射(是否所有可能的来自Mongo的文档都不适合映射)? – rchang 2015-02-05 23:37:14

+0

你的代码适合我。也许你的bug是数据特定的。谨慎举一个最小的例子? – thorwhalen 2015-03-23 16:03:53

回答

2

我从来没有见过把这些数据放在一起,特别是你在用"_source"做什么。可能有办法让这个工作,我不知道副手,但是当我尝试它时,我得到了奇怪的结果。

如果你看看bulk api,ES正期望一个元数据文档,然后是要索引的文档。因此,您需要在批量数据列表中为每个文档输入两个条目。所以可能是这样的:

import elasticsearch 
from pymongo import MongoClient 

es = elasticsearch.Elasticsearch() 

def index_collection(db, collection, fields, host='localhost', port=27017): 
    conn = MongoClient(host, port) 
    coll = conn[db][collection] 
    cursor = coll.find({}, fields=fields, timeout=False) 
    print "Starting Bulk index of {} documents".format(cursor.count()) 

    bulk_data = [] 

    for n, doc in enumerate(cursor): 

     bulk_data.append({ 
      '_index': db.lower(), 
      '_type': collection, 
      '_id': int('0x' + str(doc['_id']), 16), 
     }) 
     bulk_data.append(doc) 

    es.bulk(index=index_name,body=bulk_data,refresh=True) 

虽然我没有尝试运行该代码。这里是我知道的一个脚本,如果有帮助,你可以玩:

from elasticsearch import Elasticsearch 

es_client = Elasticsearch(hosts = [{ "host" : "localhost", "port" : 9200 }]) 

index_name = "test_index" 

if es_client.indices.exists(index_name): 
    print("deleting '%s' index..." % (index_name)) 
    print(es_client.indices.delete(index = index_name, ignore=[400, 404])) 

print("creating '%s' index..." % (index_name)) 
print(es_client.indices.create(index = index_name)) 

bulk_data = [] 

for i in range(4): 
    bulk_data.append({ 
     "index": { 
      "_index": index_name, 
      "_type": 'doc', 
      "_id": i 
     } 
    }) 
    bulk_data.append({ "idx": i }) 

print("bulk indexing...") 
res = es_client.bulk(index=index_name,body=bulk_data,refresh=True) 
print(res) 

print("results:") 
for doc in es_client.search(index=index_name)['hits']['hits']: 
    print(doc) 
+0

它可能工作,但对我的用例,我不想将所有数据加载到内存中的列表中,然后我索引它,因此生成器函数。这些文档让我看到了我发布的代码:https://elasticsearch-py.readthedocs.org/en/master/helpers.html#helpers – fccoelho 2015-02-06 00:28:13

+0

@SloanAhrens OP正在使用'bulk'的帮助器版本,它需要一些东西不同于标准批量API。 – rchang 2015-02-06 02:16:09