1

我正在对Cassandra进行一些测试,看看我们是否可以将它用于支持乐观并发的可伸缩键值存储。如何避免在Cassandra中使用轻量级事务(CAS)时丢失写入?

由于键值存储只需要一张表并且每个项都可以通过键访问,因此似乎lightweight transactions可以很容易地为我们的问题提供技术基础。

但是,当运行a test which does a number of concurrent updates(并且只要检测到并发,则重试),我们看到我们丢失了写入

测试创建一个表:

CREATE TABLE objects (key text, version int, PRIMARY KEY(key)); 

并采用插入一个数字键的:

INSERT INTO objects (key, version) VALUES (?, 0) IF NOT EXISTS; 

这些项目的版本然后使用CAS操作递增的次数:

-- client retrieves the current version 
SELECT version FROM objects WHERE key = ?; 

-- and updates the item using the retrieved version as version check 
UPDATE objects SET version = ? WHERE key = ? IF version = ?; 

客户端代码实际上看起来像这样用于更新:

private async Task<bool> CompareAndSet(string key, int currrentCount, PreparedStatement updateStatement) 
{ 
    // increment the version 
    IStatement statement = updateStatement.Bind(currrentCount + 1, key, currrentCount); 

    // execute the statement 
    RowSet result = await Session.ExecuteAsync(statement); 

    // check the result 
    Row row = result.GetRows().SingleOrDefault(); 

    if (row == null) 
     throw new Exception("No row in update result."); 

    // check if the CAS operation was applied or not 
    return row.GetValue<bool>("[applied]"); 
} 

正如您所看到的,CAS操作因为并发而无法应用。所以,这个操作会被重试直到成功。写超时异常也被处理。 The rationale behind handling the write timeout exceptions is explained here.

private async Task Update(string key, PreparedStatement selectStatement, PreparedStatement updateStatement) 
{ 
    bool done = false; 

    // try update (increase version) until it succeeds 
    while (!done) 
    { 
     // get current version     
     TestItem item = null; 

     while (item == null) 
      item = await GetItem(key, selectStatement); 

     try 
     { 
      // update version using lightweight transaction 
      done = await CompareAndSet(key, item.Version, updateStatement); 

      // lightweight transaction (CAS) failed, because compare failed --> simply not updated 
      if (!done) 
       Interlocked.Increment(ref abortedUpdates); 
     } 
     catch (WriteTimeoutException wte) 
     { 
      // partial write timeout (some have been updated, so all must be eventually updated, because it is a CAS operation) 
      if (wte.ReceivedAcknowledgements > 0) 
      { 
       Interlocked.Increment(ref partialWriteTimeouts); 
       done = true; 
      } 
      else 
       // complete write timeout --> unsure about this one... 
       Interlocked.Increment(ref totalWriteTimeouts); 
     } 
    } 
} 

下面是它使用100个项目,并更新每个项目的测试输出10倍:

Running test with 100 items and 10 updates per item. 

Number of updates: 1000 
Number of aborted updates due to concurrency: 3485 
Number of total write timeouts: 18 
Number of partial write timeouts: 162 

LOST WRITES: 94 (or 9,40%) 

Results: 

Updates | Item count 
    10 |   35 
     9 |   43 
     8 |   17 
     7 |   3 
     6 |   2 

Xunit.Sdk.EqualExceptionAssert.Equal() Failure 
Expected: 0 
Actual: 94 

正如你可以看到,这是一个高度并行测试(见中止操作的数量更新必须重试)。 但是,坏消息是我们正在失去写作。客户认为应该执行1000次更新,但在这种情况下,有94次写入丢失。

丢失的写入数量是写入超时数量级的数量级。所以,他们似乎有联系。问题是:

  • 我们需要以更好的方式处理超时异常吗?
  • 有没有办法避免在Cassandra上进行CAS操作时丢失写入?
+0

这看起来像一个优秀的JIRA票 - 卡桑德拉版本,使用JVM和DEBUG system.log也可能有帮助:) https://issues.apache.org/jira/browse/CASSANDRA – mshuler 2014-12-05 15:33:06

+0

好吧,首先我们是要使用最新的Cassandra版本进行测试(我们目前使用2.0.9版本)。 – Jochen 2014-12-08 10:42:09

+0

使用版本2.1.2的问题相同。 – Jochen 2014-12-09 09:41:30

回答

2

WriteTimeoutException表示Cassandra无法及时执行操作。通过测试,您可以将Cassandra置于沉重的负载之下,并且任何操作都可能因超时异常而失败。因此,您需要做的是重做您的操作并通过反复尝试从问题中恢复过来。它类似于SQLTimeoutException。你也需要为此辩护。

+0

从WriteTimeoutException无法判断LWT是否会成功?这是https://issues.apache.org/jira/browse/CASSANDRA-9328? – OrangeDog 2016-01-15 12:13:04

+1

如果公司的成功基于数据完整性,并且某些情况下仍然使用关系数据库(即使这些数据库无法扩展),那么这个问题是另一回事,应该阻止您使用Cassandra。 这个bug甚至使我的建议无效。那么在任何情况下都不应该重做交易。 – 2016-01-15 12:33:50

+0

所以基本上是的,就超时而言,你永远不会知道。在这个bug之前,超时发生的机会非常渺茫,但是调用成功了。该窗口是由超时被击中,但有突出的ACK。这就是为什么我们过去把所有碎片放在本地并且超时合理的高。所以基本上,除了争用和破坏系统之外,没有机会超时,如果你重试它,你会发现。 – 2016-01-15 12:34:11

相关问题