在将9百万行的批次写入12节点cassandra(2.1.2)群集时,spark-cassandra-connector(1.0.4,1.1.0)出现问题。我用一致性ALL编写并读取一致性为ONE,但读取的行数每次都与900万(8.865.753,8.753.213等)不同。异步写入在Cassandra中似乎被破坏
我检查了连接器的代码,发现没有问题。然后,我决定编写自己的应用程序,独立于spark和连接器,以调查问题(唯一的依赖是datastax-driver-code version 2.1.3)。
完整的代码,启动脚本和配置文件现在可以是found on github。
在伪代码,我写了两个不同版本的应用程序,同步一个:
try (Session session = cluster.connect()) {
String cql = "insert into <<a table with 9 normal fields and 2 collections>>";
PreparedStatement pstm = session.prepare(cql);
for(String partitionKey : keySource) {
// keySource is an Iterable<String> of partition keys
BoundStatement bound = pstm.bind(partitionKey /*, << plus the other parameters >> */);
bound.setConsistencyLevel(ConsistencyLevel.ALL);
session.execute(bound);
}
}
而异步之一:
try (Session session = cluster.connect()) {
List<ResultSetFuture> futures = new LinkedList<ResultSetFuture>();
String cql = "insert into <<a table with 9 normal fields and 2 collections>>";
PreparedStatement pstm = session.prepare(cql);
for(String partitionKey : keySource) {
// keySource is an Iterable<String> of partition keys
while(futures.size()>=10 /* Max 10 concurrent writes */) {
// Wait for the first issued write to terminate
ResultSetFuture future = futures.get(0);
future.get();
futures.remove(0);
}
BoundStatement bound = pstm.bind(partitionKey /*, << plus the other parameters >> */);
bound.setConsistencyLevel(ConsistencyLevel.ALL);
futures.add(session.executeAsync(bound));
}
while(futures.size()>0) {
// Wait for the other write requests to terminate
ResultSetFuture future = futures.get(0);
future.get();
futures.remove(0);
}
}
最后一个是类似于使用无批处理配置情况下的连接器。
这两个版本的应用程序在所有情况下都是一样的,除非负载很高。例如,当在9台机器(45个线程)上运行5个线程的同步版本时,将9百万行写入群集,我在随后的读取中找到所有行(使用spark-cassandra-connector)。
如果我运行异步版本,每台机器上有1个线程(9个线程),执行速度要快得多,但我无法在随后的读取中找到所有行(与spark-cassandra连接器相同的问题) 。
代码在执行过程中没有抛出异常。
问题的原因是什么?
我添加一些其他的结果(评论感谢):
- 异步版本与9个机9个线程,每个线程5名并发作家(45名并发作家):没有问题
- 同步版本与9个机90个线程(每个JVM实例10个线程):没有问题
问题似乎开始异步引起的写入和数量的并发作家> 45 < = 90,所以我做了其他测试,以确保该发现是正确的:
- 将ResultSetFuture的“get”方法替换为 “getUninterruptibly”:相同的问题。
- 9台机器上有18个线程,5个并发的异步版本 每个线程的写入者(90个并发写入器):没有问题。
最后的发现表明并发写入程序(90)的高数量不像第一次测试中预期的那样是一个问题。问题是使用同一会话的大量异步写入。
在同一会话中有5个并发异步写入问题不存在。如果我将并发写入数增加到10,某些操作会在没有通知的情况下丢失。
如果您在同一会话上同时发出多个(> 5个)写入,似乎Cassandra 2.1.2(或Cassandra Java驱动程序)中的异步写入被破坏。
您是否考虑过使用BATCH语句而不是分别发送每个更新?我知道这并没有解决你遇到的问题,但它似乎更适合做批量插入。 – Onots
是的,问题也存在于批处理语句中。我没有使用批处理,因为它们受到最新版本连接器中修复的spark cassandra连接器中的另一个问题的影响。我已经使用该修补程序的自编译版本的连接器,并发现相同的问题。 –
我在[github](https://github.com/nibbio84/cassandra-loader-bug-showcase)上添加了所有代码和配置文件 –