2016-01-09 37 views
16

我有一个批量推送(INSERT)记录到Cassandra集群的Java客户端。批处理中的元素都具有相同的行键,因此它们全部将放置在同一个节点中。此外,我不需要交易是原子的,所以我一直在使用未记录的批次。卡桑德拉的批量限制是多少?

每个批次中的INSERT命令的数量取决于不同的因素,但可以是5到50000之间的任何值。首先,我只放入与我在一批中一样多的命令并提交它。这扔了com.datastax.driver.core.exceptions.InvalidQueryException: Batch too large。然后,我使用了每批1000 INSERT的帽子,然后降到300.我注意到我只是随机猜测,并不知道这个限制来自哪里,这可能会导致麻烦。

我的问题是,这是什么限制?我可以修改它吗?我如何知道一批中可以放置多少个元素?当我的批次是“满”?

回答

17

我会建议不要增加上限,只是分裂成多个请求。将所有内容都放在一个巨大的单一请求中会显着地对协调员产生负面影响。将所有内容都放在一个分区中可以通过减少一些延迟来提高某些大小的批处理中的吞吐量,但批处理决不会用于提高性能。因此,试图通过使用不同的批量来优化以获得最大吞吐量将主要取决于用例/模式/节点,并且需要特定的测试,因为通常在开始降级的大小上存在悬崖。

。在你的cassandra.yaml一个

# Fail any batch exceeding this value. 50kb (10x warn threshold) by default. 
batch_size_fail_threshold_in_kb: 50 

选项来增加它,但一定要进行测试,以确保您的实际帮助,而不是伤害你的吞吐量。

+0

这就是我正在寻找,谢谢。你知道什么是监视客户端批量大小的最佳方法吗? –

+2

取决于您使用的驱动程序,但在java驱动程序中,您可以在批处理中的每个单独语句中使用getValues(),它会返回一个ByteBuffers数组,以便使用remaining()方法获取大小缓冲区单独和总结他们,但总的来说,我不会推荐这样做。你不应该创造超大型的批次,只要足够大,你就可以感觉到你远不及接近这个极限。 –

+0

这里还有一堆东西。 C *按列而不是行来设计,C *表示每个分区有2B列,但根据经验我们知道最佳点是100MB。所以即使使用100MB分区,并且如果批处理的默认大小为50KB,就像100MB/50KB = 3125请求检索100MB分区一样,那么请求的方式太多。 – user1870400

2

工作望着卡桑德拉记录你能够发现的东西,如:

ERROR 19点54分13秒批为[匹配]是尺寸为103.072KiB,超过50.000KiB的规定阈值53.072KiB。 (见batch_size_fail_threshold_in_kb)

0

跑过一个类似的问题在Java中,这里是如何一批批的功能例如:

import com.datastax.driver.core.BatchStatement; 
import com.datastax.driver.core.PreparedStatement; 
import com.datastax.driver.core.Session; 
import com.google.common.collect.Lists; 

import java.util.Collection; 
import java.util.stream.Collectors; 

public class CassandraBatchOfBatchesExample { 

    private final PreparedStatement statement; 
    private final Session session; 
    private final int batchSize; 

    public CassandraBatchOfBatchesExample(Session session, int batchSize) { 
     this.session = session; 
     this.batchSize = batchSize; 
     statement = session.prepare("INSERT_INTO some_table JSON ?"); 
    } 

    public void execute(Collection<String> jsons) { 
     Lists.partition(jsons 
       .stream() 
       .map(statement::bind) 
       .collect(Collectors.toList() 
      ), batchSize).stream() 
      .map(statements -> new BatchStatement().addAll(statements)) 
      .forEach(session::execute); 
    } 
} 

batchSize变量需要根据被插入的个人记录的大小来改变。