2013-06-04 53 views
5

我想使用java的elasticsearch散装api,想知道如何设置批量大小。elasticsearch java批量大小

目前我使用它作为:

BulkRequestBuilder bulkRequest = getClient().prepareBulk(); 
while(hasMore) { 
    bulkRequest.add(getClient().prepareIndex(indexName, indexType, artist.getDocId()).setSource(json)); 
    hasMore = checkHasMore(); 
} 
BulkResponse bResp = bulkRequest.execute().actionGet(); 
//To check failures 
log.info("Has failures? {}", bResp.hasFailures()); 

任何想法,我怎么可以设置散装/批次大小?

+1

请,标志着答案正确..... –

回答

21

它主要取决于文档的大小,客户端上的可用资源以及客户端(传输客户端或节点客户端)的类型。

节点客户端知道群集上的碎片并将文档直接发送到保存应该被索引的碎片的节点。另一方面,传输客户端是以循环方式将其请求发送到节点列表的普通客户端。批量请求将被发送到一个节点,然后在索引时这将成为您的网关。

由于您使用的是Java API,因此我建议您查看BulkProcessor,这可以使批量索引变得更容易和灵活。您可以定义自上次批量执行以来的最大操作次数,最大大小和最大时间间隔。它将在需要时自动为您执行批量。您还可以设置并发批量请求的最大数量。

后创建的BulkProcessor这样的:

BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() { 
    @Override 
    public void beforeBulk(long executionId, BulkRequest request) { 
     logger.info("Going to execute new bulk composed of {} actions", request.numberOfActions()); 
    } 

    @Override 
    public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { 
     logger.info("Executed bulk composed of {} actions", request.numberOfActions()); 
    } 

    @Override 
    public void afterBulk(long executionId, BulkRequest request, Throwable failure) { 
     logger.warn("Error executing bulk", failure); 
    } 
    }).setBulkActions(bulkSize).setConcurrentRequests(maxConcurrentBulk).build(); 

你只需要你的请求添加到它:

bulkProcessor.add(indexRequest); 

,并在年底关闭它来冲洗,可能有最终的任何请求尚未执行:

bulkProcessor.close(); 

最后回答你的问题:th关于BulkProcessor的一件好事也是它具有合理的默认设置:5 MB大小,1000次操作,1个并发请求,无刷新间隔(可能对设置有用)。

0

当达到批处理大小限制时,您需要对大容量请求生成器进行计数,然后对其进行索引并刷新较早的批量生成。 这里是例子的代码

Settings settings = ImmutableSettings.settingsBuilder() 
    .put("cluster.name", "MyClusterName").build(); 

TransportClient client = new TransportClient(settings); 
String hostname = "myhost ip"; 
int port = 9300; 
client.addTransportAddress(new InetSocketTransportAddress(hostname, port)); 

BulkRequestBuilder bulkBuilder = client.prepareBulk(); 
BufferedReader br = new BufferedReader(new InputStreamReader(new DataInputStream(new FileInputStream("my_file_path")))); 
long bulkBuilderLength = 0; 
String readLine = ""; 
String index = "my_index_name"; 
String type = "my_type_name"; 
String id = ""; 

while((readLine = br.readLine()) != null){ 
    id = somefunction(readLine); 
    String json = new ObjectMapper().writeValueAsString(readLine); 
    bulkBuilder.add(client.prepareIndex(index, type, id).setSource(json)); 
    bulkBuilderLength++; 
    if(bulkBuilderLength % 1000== 0){ 
     logger.info("##### " + bulkBuilderLength + " data indexed."); 
     BulkResponse bulkRes = bulkBuilder.execute().actionGet(); 
     if(bulkRes.hasFailures()){ 
     logger.error("##### Bulk Request failure with error: " + bulkRes.buildFailureMessage()); 
     } 
     bulkBuilder = client.prepareBulk(); 
    } 
} 

br.close(); 

if(bulkBuilder.numberOfActions() > 0){ 
    logger.info("##### " + bulkBuilderLength + " data indexed."); 
    BulkResponse bulkRes = bulkBuilder.execute().actionGet(); 
    if(bulkRes.hasFailures()){ 
     logger.error("##### Bulk Request failure with error: " + bulkRes.buildFailureMessage()); 
    } 
    bulkBuilder = client.prepareBulk(); 
} 

希望这可以帮助你 感谢