2014-12-27 44 views
9

在将9百万行的批次写入12节点cassandra(2.1.2)群集时,spark-cassandra-connector(1.0.4,1.1.0)出现问题。我用一致性ALL编写并读取一致性为ONE,但读取的行数每次都与900万(8.865.753,8.753.213等)不同。异步写入在Cassandra中似乎被破坏

我检查了连接器的代码,发现没有问题。然后,我决定编写自己的应用程序,独立于spark和连接器,以调查问题(唯一的依赖是datastax-driver-code version 2.1.3)。

完整的代码,启动脚本和配置文件现在可以是found on github

在伪代码,我写了两个不同版本的应用程序,同步一个:

try (Session session = cluster.connect()) { 

    String cql = "insert into <<a table with 9 normal fields and 2 collections>>"; 
    PreparedStatement pstm = session.prepare(cql); 

    for(String partitionKey : keySource) { 
     // keySource is an Iterable<String> of partition keys 

     BoundStatement bound = pstm.bind(partitionKey /*, << plus the other parameters >> */); 
     bound.setConsistencyLevel(ConsistencyLevel.ALL); 

     session.execute(bound); 
    } 

} 

而异步之一:

try (Session session = cluster.connect()) { 

    List<ResultSetFuture> futures = new LinkedList<ResultSetFuture>(); 

    String cql = "insert into <<a table with 9 normal fields and 2 collections>>"; 
    PreparedStatement pstm = session.prepare(cql); 

    for(String partitionKey : keySource) { 
     // keySource is an Iterable<String> of partition keys 

     while(futures.size()>=10 /* Max 10 concurrent writes */) { 
      // Wait for the first issued write to terminate 
      ResultSetFuture future = futures.get(0); 
      future.get(); 
      futures.remove(0); 
     } 

     BoundStatement bound = pstm.bind(partitionKey /*, << plus the other parameters >> */); 
     bound.setConsistencyLevel(ConsistencyLevel.ALL); 

     futures.add(session.executeAsync(bound)); 
    } 

    while(futures.size()>0) { 
     // Wait for the other write requests to terminate 
     ResultSetFuture future = futures.get(0); 
     future.get(); 
     futures.remove(0); 
    } 
} 

最后一个是类似于使用无批处理配置情况下的连接器。

这两个版本的应用程序在所有情况下都是一样的,除非负载很高。例如,当在9台机器(45个线程)上运行5个线程的同步版本时,将9百万行写入群集,我在随后的读取中找到所有行(使用spark-cassandra-connector)。

如果我运行异步版本,每台机器上有1个线程(9个线程),执行速度要快得多,但我无法在随后的读取中找到所有行(与spark-cassandra连接器相同的问题) 。

代码在执行过程中没有抛出异常。

问题的原因是什么?

我添加一些其他的结果(评论感谢):

  • 异步版本与9个机9个线程,每个线程5名并发作家(45名并发作家):没有问题
  • 同步版本与9个机90个线程(每个JVM实例10个线程):没有问题

问题似乎开始异步引起的写入和数量的并发作家> 45 < = 90,所以我做了其他测试,以确保该发现是正确的:

  • 将ResultSetFuture的“get”方法替换为 “getUninterruptibly”:相同的问题。
  • 9台机器上有18个线程,5个并发的异步版本 每个线程的写入者(90个并发写入器):没有问题

最后的发现表明并发写入程序(90)的高数量不像第一次测试中预期的那样是一个问题。问题是使用同一会话的大量异步写入。

在同一会话中有5个并发异步写入问题不存在。如果我将并发写入数增加到10,某些操作会在没有通知的情况下丢失。

如果您在同一会话上同时发出多个(> 5个)写入,似乎Cassandra 2.1.2(或Cassandra Java驱动程序)中的异步写入被破坏。

+0

您是否考虑过使用BATCH语句而不是分别发送每个更新?我知道这并没有解决你遇到的问题,但它似乎更适合做批量插入。 – Onots

+0

是的,问题也存在于批处理语句中。我没有使用批处理,因为它们受到最新版本连接器中修复的spark cassandra连接器中的另一个问题的影响。我已经使用该修补程序的自编译版本的连接器,并发现相同的问题。 –

+0

我在[github](https://github.com/nibbio84/cassandra-loader-bug-showcase)上添加了所有代码和配置文件 –

回答

5

尼古拉和我本周末通过电子邮件进行了交流,并认为我会用我现在的理论提供一个更新。我看了一下Nicola分享的github project,并试验了EC2上的8节点集群。

我能够重现2.1.2的问题,但确实观察到经过一段时间后,我可以重新执行spark工作,并返回所有9百万行。

我似乎注意到,尽管节点处于压缩状态,但我没有获得全部900万行。一时兴起,我看了看change log for 2.1,并观察到可能解释此问题的问题CASSANDRA-8429 - "Some keys unreadable during compaction"

看到问题已被修复为2.1.3的目标,我重新测试了cassandra-2.1分支,并在压缩活动发生时运行计数工作,并获得了900万行。

我想尝试一下更多,因为我对cassandra-2.1分支的测试非常有限,压缩活动可能纯属巧合,但我希望这可以解释这些问题。

+0

没有使用2.1.3进行测试,但只有在自动压缩进行时,我才能确认问题仅出现在水平压实策略中。随着大小分层压实或平稳压实,Cassandra运作良好。 –

6

几个可能性:

  • 你的异步例子发出10在一次写在时间9个线程,因此90中,而您的同步例子只是做45写入的时间,所以我会尝试将异步下降到相同的速度,所以这是一个苹果比较苹果。

    你不说你是如何检查与异步方法异常。我看你是使用future.get(),但建议使用getUninterruptibly()如文档中指出:

    等待查询返回,并返回其结果。此方法通常比Future.get()方便 ,因为它:不间断地等待 结果,所以不会抛出InterruptedException。 返回有意义的异常,而不必处理 ExecutionException。因此,这是获得未来 结果的首选方式。

    所以也许你没有看到与你的异步例子发生的写例外。

  • 另一个不太可能的可能性是,你的keySource出于某种原因返回重复的分区键,所以当你执行写操作时,其中一些最终会覆盖以前插入的行并且不会增加行数。但是这也会影响同步版本,所以我就说这不太可能。

    我会尝试写入比900万更小的集合,并且速度很慢,并且看看问题是否仅在某个插入次数或特定插入次数开始发生。如果插入次数有影响,那么我会怀疑数据中的行键有问题。如果插入率有影响,那么我会怀疑热点导致写入超时错误。

  • 要检查的另一件事是Cassandra日志文件,以查看是否有任何异常在那里报告。

附录:14年12月30日

我尝试使用示例代码与卡桑德拉2.1.2和2.1.3驱动重现症状。我使用了一个单一的表格和一个递增数字的关键字,这样我就可以看到数据中的空白。我做了很多异步插入(每个线程一次30个,每个线程在10个线程中全部使用一个全局会话)。然后我做了一个“select count(*)”的表,事实上它报告的表中行数比预期的少。然后我做了一个“select *”并将这些行转储到一个文件并检查丢失的密钥。它们似乎是随机分布的,但是当我查询那些缺失的单行时,事实证明它们实际上存在于表格中。然后我注意到每次我执行“select count(*)”时,都会返回一个不同的数字,所以它似乎给出了表中的行数的近似值,而不是实际的数字。

因此,我修改了测试程序,在所有写入之后执行回读阶段,因为我知道所有的关键值。当我这样做时,所有的异步写入都出现在表格中。

所以我的问题是,你如何检查完成后写在表中的行数?您是在查询每个单独的键值还是使用某种操作(如“select *”)?如果后者似乎给出了大部分行,但不是全部行,那么也许你的数据实际上是存在的。由于没有例外被抛出,它似乎表明写入都是成功的。另一个问题是,你确定你的关键值对于所有900万行是唯一的。

+0

我没有使用count(*),因为它向我展示了错误的结果开始。我使用了两种计算行的方法:1)Spark-cassandra连接器,它在令牌环空间执行多个查询并总结结果; 2)带有hadoop mapreduce API的Spark。我注意到两种方法的相同行为。 –

+0

我也确定行号是不同的。我多次检查它们,当我在启动脚本中将“异步”参数更改为“同步”时,它的行ID是OK。我也经历过你正在讨论的关于阅读时间的行为。发现单行的原因可能是由于:1)读取修复(如果它们在群集中启用)2)每次读取行时,都可以从不同的节点读取相对于计数(*)的值。既然你正在写一致性一致,这不应该发生。 –

+1

您可能想要尝试为测试设置1的复制因子,并查看是否可以在异步写入后查找实际缺少的行。通过单个键读回行是确定行是否丢失的确定性测试,因为这些其他方法似乎在计数中而不是丢失的行。如果你可以发布更多的代码,我可以尝试重现症状,但是到目前为止,当我用一个会话进行大量的异步写操作时,它们都出现在表中。 –