我正在对Cassandra进行一些测试,看看我们是否可以将它用于支持乐观并发的可伸缩键值存储。如何避免在Cassandra中使用轻量级事务(CAS)时丢失写入?
由于键值存储只需要一张表并且每个项都可以通过键访问,因此似乎lightweight transactions可以很容易地为我们的问题提供技术基础。
但是,当运行a test which does a number of concurrent updates(并且只要检测到并发,则重试),我们看到我们丢失了写入。
测试创建一个表:
CREATE TABLE objects (key text, version int, PRIMARY KEY(key));
并采用插入一个数字键的:
INSERT INTO objects (key, version) VALUES (?, 0) IF NOT EXISTS;
这些项目的版本然后使用CAS操作递增的次数:
-- client retrieves the current version
SELECT version FROM objects WHERE key = ?;
-- and updates the item using the retrieved version as version check
UPDATE objects SET version = ? WHERE key = ? IF version = ?;
客户端代码实际上看起来像这样用于更新:
private async Task<bool> CompareAndSet(string key, int currrentCount, PreparedStatement updateStatement)
{
// increment the version
IStatement statement = updateStatement.Bind(currrentCount + 1, key, currrentCount);
// execute the statement
RowSet result = await Session.ExecuteAsync(statement);
// check the result
Row row = result.GetRows().SingleOrDefault();
if (row == null)
throw new Exception("No row in update result.");
// check if the CAS operation was applied or not
return row.GetValue<bool>("[applied]");
}
正如您所看到的,CAS操作因为并发而无法应用。所以,这个操作会被重试直到成功。写超时异常也被处理。 The rationale behind handling the write timeout exceptions is explained here.
private async Task Update(string key, PreparedStatement selectStatement, PreparedStatement updateStatement)
{
bool done = false;
// try update (increase version) until it succeeds
while (!done)
{
// get current version
TestItem item = null;
while (item == null)
item = await GetItem(key, selectStatement);
try
{
// update version using lightweight transaction
done = await CompareAndSet(key, item.Version, updateStatement);
// lightweight transaction (CAS) failed, because compare failed --> simply not updated
if (!done)
Interlocked.Increment(ref abortedUpdates);
}
catch (WriteTimeoutException wte)
{
// partial write timeout (some have been updated, so all must be eventually updated, because it is a CAS operation)
if (wte.ReceivedAcknowledgements > 0)
{
Interlocked.Increment(ref partialWriteTimeouts);
done = true;
}
else
// complete write timeout --> unsure about this one...
Interlocked.Increment(ref totalWriteTimeouts);
}
}
}
下面是它使用100个项目,并更新每个项目的测试输出10倍:
Running test with 100 items and 10 updates per item.
Number of updates: 1000
Number of aborted updates due to concurrency: 3485
Number of total write timeouts: 18
Number of partial write timeouts: 162
LOST WRITES: 94 (or 9,40%)
Results:
Updates | Item count
10 | 35
9 | 43
8 | 17
7 | 3
6 | 2
Xunit.Sdk.EqualExceptionAssert.Equal() Failure
Expected: 0
Actual: 94
正如你可以看到,这是一个高度并行测试(见中止操作的数量更新必须重试)。 但是,坏消息是我们正在失去写作。客户认为应该执行1000次更新,但在这种情况下,有94次写入丢失。
丢失的写入数量是写入超时数量级的数量级。所以,他们似乎有联系。问题是:
- 我们需要以更好的方式处理超时异常吗?
- 有没有办法避免在Cassandra上进行CAS操作时丢失写入?
这看起来像一个优秀的JIRA票 - 卡桑德拉版本,使用JVM和DEBUG system.log也可能有帮助:) https://issues.apache.org/jira/browse/CASSANDRA – mshuler 2014-12-05 15:33:06
好吧,首先我们是要使用最新的Cassandra版本进行测试(我们目前使用2.0.9版本)。 – Jochen 2014-12-08 10:42:09
使用版本2.1.2的问题相同。 – Jochen 2014-12-09 09:41:30