2013-08-25 99 views
3

我是NEventStore和一般事件采购的新手。在一个项目中,我想使用NEventStore来持久化由我们的聚合生成的事件,但是我有一些问题需要正确处理并发。NEventStore乐观锁

如何使用乐观锁写入相同的流?

比方说,我有2个实例从2个不同的线程在修订1加载相同的聚合。然后是第一个线程调用命令A和第二个线程调用命令B.使用乐观锁定其中一个聚合应该失败,并发异常。

我以为使用maxRevision从加载聚集的角度打开流,但似乎CommitChanges永远不会失败,如果我通过旧版本。

我失踪了什么?使用NEventStore /事件采购时,乐观锁定可能/正确吗?

这里是我用来再现该问题的代码:

namespace NEventStore.Example 
{ 
    using System; 
    using System.Transactions; 
    using NEventStore; 
    using NEventStore.Dispatcher; 
    using NEventStore.Persistence.SqlPersistence.SqlDialects; 

    internal static class MainProgram 
    { 
     private static readonly Guid StreamId = Guid.NewGuid(); // aggregate identifier 
     private static IStoreEvents store; 

     private static void Main() 
     { 
      using (var scope = new TransactionScope()) 
      using (store = WireupEventStore()) 
      { 
       Client1(revision: 0); 

       Client2(revision: 0); 

       scope.Complete(); 
      } 

      Console.WriteLine(Resources.PressAnyKey); 
      Console.ReadKey(); 
     } 

     private static IStoreEvents WireupEventStore() 
     { 
      return Wireup.Init() 
       .UsingInMemoryPersistence() 
       .Build(); 
     } 

     private static void Client1(int revision) 
     { 
      using (var stream = store.OpenStream(StreamId, 0, revision)) 
      { 
       var @event = new SomeDomainEvent { Value = "Client 1 - event 1." }; 

       stream.Add(new EventMessage { Body = @event }); 


       stream.CommitChanges(Guid.NewGuid()); 
      } 
     } 

     private static void Client2(int revision) 
     { 
      using (var stream = store.OpenStream(StreamId, 0, revision)) 
      { 
       var @event = new SomeDomainEvent { Value = "Client 2 - event 1." }; 

       stream.Add(new EventMessage { Body = @event }); 


       stream.CommitChanges(Guid.NewGuid()); 
      } 
     } 
    } 
} 

我希望客户端2失败,因为我用旧版本打开流。

UPDATE 26/08/2013: 我已经测试了相同的代码,使用Sql服务器,似乎按预期工作。

namespace NEventStore.Example 
{ 
    using System; 
    using System.Transactions; 
    using NEventStore; 
    using NEventStore.Dispatcher; 
    using NEventStore.Persistence.SqlPersistence.SqlDialects; 

    internal static class MainProgram 
    { 
     private static readonly Guid StreamId = Guid.NewGuid(); // aggregate identifier 
     private static IStoreEvents store; 

     private static void Main() 
     { 
      using (store = WireupEventStore()) 
      { 
       OpenOrCreateStream(); 

       AppendToStream_Client1(revision: 1); 

       AppendToStream_Client2(revision: 1); // throws an error 
       // AppendToStream_Client2(revision: 2); // works 
      } 

      Console.WriteLine(Resources.PressAnyKey); 
      Console.ReadKey(); 
     } 

     private static IStoreEvents WireupEventStore() 
     { 
      return Wireup.Init() 
       .LogToOutputWindow() 
       .UsingInMemoryPersistence() 
       .UsingSqlPersistence("EventStore") // Connection string is in app.config 
        .WithDialect(new MsSqlDialect()) 
        .InitializeStorageEngine() 
        .UsingJsonSerialization() 
       .Build(); 
     } 

     private static void OpenOrCreateStream() 
     { 
      using (var stream = store.OpenStream(StreamId, 0, int.MaxValue)) 
      { 
       var @event = new SomeDomainEvent { Value = "Initial event." }; 

       stream.Add(new EventMessage { Body = @event }); 
       stream.CommitChanges(Guid.NewGuid()); 
      } 
     } 

     private static void AppendToStream_Client1(int revision) 
     { 
      using (var stream = store.OpenStream(StreamId, int.MinValue, revision)) 
      { 
       var @event = new SomeDomainEvent { Value = "Second event 1." }; 

       stream.Add(new EventMessage { Body = @event }); 
       stream.CommitChanges(Guid.NewGuid()); 
      } 
     } 

     private static void AppendToStream_Client2(int revision) 
     { 
      using (var stream = store.OpenStream(StreamId, int.MinValue, revision)) 
      { 
       var @event = new SomeDomainEvent { Value = "Second event 2." }; 

       stream.Add(new EventMessage { Body = @event }); 
       stream.CommitChanges(Guid.NewGuid()); 
      } 
     } 
    } 
} 

所以回到我的问题:要启用乐观锁定,我应该使用修订时打开流?还有其他可能的实现或准则?

感谢

回答

6

首先,在内存中的持久性实现,其主要目的是测试,是不是事务感知。在您的原始示例中,客户端2将简单地将其事件附加到流中。尝试使用支持事务的持久性存储(SQL & Raven,但不是Mongo)来运行上述操作。

其次,当打开一个流用于不同的目的指定的最小/最大修订:

  1. 当重新水合的聚集体,并且没有快照是可用的,应指定(分:0,最大:int.MaxValue),因为你有兴趣检索所有事件。
  2. 重新提供聚合和快照时,您可以指定(min:snapshot.Version,max:int.MaxValue)来获取自快照以来发生的所有事件。
  3. 保存聚合时,可以指定(min:0,max:Aggregate.Version)。 Aggregate.Version是在重新水合过程中获得的。如果同一个骨料在同一时间在其他地方重新补充并保存,则会出现竞赛状况,并且会发生ConcurrencyException

大部分支持将被封装在域框架中。请参阅普通域中的AggregateBaseEventStoreRepository

第三,最重要的是,在单个事务中更新> 1个流是一种代码异味。如果您正在执行DDD/ES,则流代表单个聚合根,根据定义,它是一致性边界。在事务中创建/更新多个AR可以打破这一点。 NEventStore的事务处理支持被(不情愿地)添加了,因此它可以与其他工具一起工作,即事务性地从MSMQ/NServiceBus/whatever读取命令并处理它,或者事务性地将提交消息分派给队列并将其标记为这样。就个人而言,我会建议你尽量避免2PC。

+0

谢谢达米安。但我不确定要理解。假设我删除了TransactionScope。有可能处理乐观锁?怎么样?基本上我只想在这段时间内没有其他事件被提交时写入流。 –

+0

我已经使用SQL Server更新了这个问题并且没有事务。如果我传递错误的修订,现在第二个附加失败。这是处理这种情况的正确方法吗?在这种情况下,我应该将修订保存在聚合状态中,并在保存新事件时将其传回。这是预期的实施? –

+0

乐观锁定由每个持久性引擎处理。在SQL中,它基于StreamId和CommitSequence的主键。因此,如果您同时打开同一个流两次,则向两者添加一个提交,这将导致CommitSequence冲突和一个ConcurrencyException。 – 2013-08-27 11:28:19