我是NEventStore和一般事件采购的新手。在一个项目中,我想使用NEventStore来持久化由我们的聚合生成的事件,但是我有一些问题需要正确处理并发。NEventStore乐观锁
如何使用乐观锁写入相同的流?
比方说,我有2个实例从2个不同的线程在修订1加载相同的聚合。然后是第一个线程调用命令A和第二个线程调用命令B.使用乐观锁定其中一个聚合应该失败,并发异常。
我以为使用maxRevision从加载聚集的角度打开流,但似乎CommitChanges永远不会失败,如果我通过旧版本。
我失踪了什么?使用NEventStore /事件采购时,乐观锁定可能/正确吗?
这里是我用来再现该问题的代码:
namespace NEventStore.Example
{
using System;
using System.Transactions;
using NEventStore;
using NEventStore.Dispatcher;
using NEventStore.Persistence.SqlPersistence.SqlDialects;
internal static class MainProgram
{
private static readonly Guid StreamId = Guid.NewGuid(); // aggregate identifier
private static IStoreEvents store;
private static void Main()
{
using (var scope = new TransactionScope())
using (store = WireupEventStore())
{
Client1(revision: 0);
Client2(revision: 0);
scope.Complete();
}
Console.WriteLine(Resources.PressAnyKey);
Console.ReadKey();
}
private static IStoreEvents WireupEventStore()
{
return Wireup.Init()
.UsingInMemoryPersistence()
.Build();
}
private static void Client1(int revision)
{
using (var stream = store.OpenStream(StreamId, 0, revision))
{
var @event = new SomeDomainEvent { Value = "Client 1 - event 1." };
stream.Add(new EventMessage { Body = @event });
stream.CommitChanges(Guid.NewGuid());
}
}
private static void Client2(int revision)
{
using (var stream = store.OpenStream(StreamId, 0, revision))
{
var @event = new SomeDomainEvent { Value = "Client 2 - event 1." };
stream.Add(new EventMessage { Body = @event });
stream.CommitChanges(Guid.NewGuid());
}
}
}
}
我希望客户端2失败,因为我用旧版本打开流。
UPDATE 26/08/2013: 我已经测试了相同的代码,使用Sql服务器,似乎按预期工作。
namespace NEventStore.Example
{
using System;
using System.Transactions;
using NEventStore;
using NEventStore.Dispatcher;
using NEventStore.Persistence.SqlPersistence.SqlDialects;
internal static class MainProgram
{
private static readonly Guid StreamId = Guid.NewGuid(); // aggregate identifier
private static IStoreEvents store;
private static void Main()
{
using (store = WireupEventStore())
{
OpenOrCreateStream();
AppendToStream_Client1(revision: 1);
AppendToStream_Client2(revision: 1); // throws an error
// AppendToStream_Client2(revision: 2); // works
}
Console.WriteLine(Resources.PressAnyKey);
Console.ReadKey();
}
private static IStoreEvents WireupEventStore()
{
return Wireup.Init()
.LogToOutputWindow()
.UsingInMemoryPersistence()
.UsingSqlPersistence("EventStore") // Connection string is in app.config
.WithDialect(new MsSqlDialect())
.InitializeStorageEngine()
.UsingJsonSerialization()
.Build();
}
private static void OpenOrCreateStream()
{
using (var stream = store.OpenStream(StreamId, 0, int.MaxValue))
{
var @event = new SomeDomainEvent { Value = "Initial event." };
stream.Add(new EventMessage { Body = @event });
stream.CommitChanges(Guid.NewGuid());
}
}
private static void AppendToStream_Client1(int revision)
{
using (var stream = store.OpenStream(StreamId, int.MinValue, revision))
{
var @event = new SomeDomainEvent { Value = "Second event 1." };
stream.Add(new EventMessage { Body = @event });
stream.CommitChanges(Guid.NewGuid());
}
}
private static void AppendToStream_Client2(int revision)
{
using (var stream = store.OpenStream(StreamId, int.MinValue, revision))
{
var @event = new SomeDomainEvent { Value = "Second event 2." };
stream.Add(new EventMessage { Body = @event });
stream.CommitChanges(Guid.NewGuid());
}
}
}
}
所以回到我的问题:要启用乐观锁定,我应该使用修订时打开流?还有其他可能的实现或准则?
感谢
谢谢达米安。但我不确定要理解。假设我删除了TransactionScope。有可能处理乐观锁?怎么样?基本上我只想在这段时间内没有其他事件被提交时写入流。 –
我已经使用SQL Server更新了这个问题并且没有事务。如果我传递错误的修订,现在第二个附加失败。这是处理这种情况的正确方法吗?在这种情况下,我应该将修订保存在聚合状态中,并在保存新事件时将其传回。这是预期的实施? –
乐观锁定由每个持久性引擎处理。在SQL中,它基于StreamId和CommitSequence的主键。因此,如果您同时打开同一个流两次,则向两者添加一个提交,这将导致CommitSequence冲突和一个ConcurrencyException。 – 2013-08-27 11:28:19