2017-03-12 40 views
0

MongoDB的文件我有这样异步更新或插入使用的.Net司机

public class SomeDocument 
{ 
    public Guid Id { get; set; } 
    public string PropertyA { get; set; } 
    public string PropertyB { get; set; } 
} 

文件现在我有两个不同的服务(A和B)以异步方式更新PropertyA和PropertyB适当工作。这意味着我不知道什么样的服务会先完成,应该创建文档以及谁应该更新它。

因此,更新(或创建)的文件我目前在服务中使用的代码像这样的

var filter = new FilterDefinitionBuilder<SomeDocument>().Where(r => r.Id == id); 
var options = new FindOneAndUpdateOptions<SomeDocument, SomeDocument>() { IsUpsert = true }; 
var update = Builders<SomeDocument>.Update.Set(r => r.PropertyA, "Property A value"); 

await Database.GetCollection<SomeDocument>("someDocuments").FindOneAndUpdateAsync(filter, update, options); 

和服务B下一个代码

var filter = new FilterDefinitionBuilder<SomeDocument>().Where(r => r.Id == id); 
var options = new FindOneAndUpdateOptions<SomeDocument, SomeDocument>() { IsUpsert = true }; 
var update = Builders<SomeDocument>.Update.Set(r => r.PropertyB, "Property B value"); 

await Database.GetCollection<SomeDocument>("someDocuments").FindOneAndUpdateAsync(filter, update, options); 

一切看起来很好,但有时我当两种服务同时工作时得到下一个错误

Unhandled Exception: MongoDB.Driver.MongoCommandException: Command findAndModify failed: E11000 duplicate key error collection: someDocuments index: _id_ dup key: { : BinData(3, B85ED193195A274DA94BC86B655B4509) }. 
    at MongoDB.Driver.Core.WireProtocol.CommandWireProtocol`1.ProcessReply(ConnectionId connectionId, ReplyMessage`1 reply) 
    at MongoDB.Driver.Core.WireProtocol.CommandWireProtocol`1.<ExecuteAsync>d__11.MoveNext() 
--- End of stack trace from previous location where exception was thrown --- 
    at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) 
    at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) 
    at MongoDB.Driver.Core.Servers.Server.ServerChannel.<ExecuteProtocolAsync>d__26`1.MoveNext() 
--- End of stack trace from previous location where exception was thrown --- 
    at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) 
    at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) 
    at MongoDB.Driver.Core.Operations.CommandOperationBase`1.<ExecuteProtocolAsync>d__29.MoveNext() 
--- End of stack trace from previous location where exception was thrown --- 
    at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) 
    at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) 
    at MongoDB.Driver.Core.Operations.WriteCommandOperation`1.<ExecuteAsync>d__2.MoveNext() 
--- End of stack trace from previous location where exception was thrown --- 
    at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) 
    at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) 
    at MongoDB.Driver.Core.Operations.FindAndModifyOperationBase`1.<ExecuteAsync>d__19.MoveNext() 
--- End of stack trace from previous location where exception was thrown --- 
    at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) 
    at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) 
    at MongoDB.Driver.OperationExecutor.<ExecuteWriteOperationAsync>d__3`1.MoveNext() 
--- End of stack trace from previous location where exception was thrown --- 
    at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) 
    at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) 
    at MongoDB.Driver.MongoCollectionImpl`1.<ExecuteWriteOperationAsync>d__62`1.MoveNext() 
--- End of stack trace from previous location where exception was thrown --- 
    at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) 
    at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) 
    at System.Runtime.CompilerServices.TaskAwaiter`1.GetResult() 
    at CVSP.MongoDbStore.MongoDbWriteModelFacade.<AddRecordField>d__6.MoveNext() in D:\Projects\Test\Source\MongoDbStore\WriteModel\MongoDbWriteModelFacade.cs:line 58 
--- End of stack trace from previous location where exception was thrown --- 
    at System.Runtime.CompilerServices.AsyncMethodBuilderCore.<>c.<ThrowAsync>b__6_1(Object state) 
    at System.Threading.QueueUserWorkItemCallback.WaitCallback_Context(Object state) 
    at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx) 
    at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx) 
    at System.Threading.QueueUserWorkItemCallback.System.Threading.IThreadPoolWorkItem.ExecuteWorkItem() 
    at System.Threading.ThreadPoolWorkQueue.Dispatch() 
    at System.Threading._ThreadPoolWaitCallback.PerformWaitCallback() 

我该如何插入/更新文件在这种情况下?

UPDATE

推广使用做的try/catch在第一龟头的伎俩

public static async Task<TProjection> FindOneAndUpdateWithConcurrencyAsync<TDocument, TProjection>(this IMongoCollection<TDocument> collection, FilterDefinition<TDocument> filter, UpdateDefinition<TDocument> update, FindOneAndUpdateOptions<TDocument, TProjection> options = null, CancellationToken cancellationToken = default(CancellationToken)) 
    { 
     try 
     { 
      return await collection.FindOneAndUpdateAsync(filter, update, options, cancellationToken); 
     } 
     catch (MongoException ex) 
     { 
      Thread.Sleep(10); 

      return await collection.FindOneAndUpdateAsync(filter, update, options, cancellationToken); 
     } 
    } 

看起来很奇怪,我并没有从一开始但看完https://docs.mongodb.com/manual/reference/method/db.collection.findAndModify/#upsert-and-unique-index所有的疑问消失后,喜欢它。

回答

1

好吧,这是syncronization问题,不幸的是没有简单的解决方案。为了找到黑客,让我们来了解后端可能发生的事情。

我们假设我们有两个线程(服务)尝试插入文档。

t1: 00:00:00.250 -> find document with Id (1) 
t2: 00:00:00.255 -> find document with id (1) 

t1: 00:00:00.260 -> No document found 
t2: 00:00:00.262 -> No document found 

t1: 00:00:00.300 -> Insert a document with Id(1) 
t2: 00:00:00.300 -> Insert a document with Id(1) 

宾果...我们得到了例外。两个线程都试图插入具有相同ID的文档。

没有我们可以在这里做什么?

让我们把这个缺点变成我们的优势。捕获此异常并再次尝试调用upsert。这一次,它会成功找到文件并更新它。

我已经修改代码如下ServiceA第二ServiceB,并试图在紧密循环插入10000个文件:

public async Task ServiceA(Guid id) 
{ 
    var filter = new FilterDefinitionBuilder<SomeDocument>().Where(r => r.Id == id); 
    var update = Builders<SomeDocument>.Update.Set(r => r.PropertyA, "Property A value"); 

    var options = new UpdateOptions() { IsUpsert = true }; 
    var database = _client.GetDatabase("stackoverflow"); 
    var collection = database.GetCollection<SomeDocument>(CollectionName, 
     new MongoCollectionSettings 
     { 
      WriteConcern = WriteConcern.W1 
     }); 

    await collection.UpdateOneAsync(filter, update, options); 
} 

public async Task ServiceB(Guid id) 
{ 
    var filter = new FilterDefinitionBuilder<SomeDocument>().Where(r => r.Id == id); 
    var update = Builders<SomeDocument>.Update.Set(r => r.PropertyB, "Property B value"); 

    var options = new UpdateOptions() { IsUpsert = true }; 
    var database = _client.GetDatabase("stackoverflow"); 
    var collection = database.GetCollection<SomeDocument>(CollectionName, 
     new MongoCollectionSettings 
     { 
      WriteConcern = WriteConcern.W1 
     }); 

    await collection.UpdateOneAsync(filter, update, options); 
} 

这是我lanuching代码。不完美但有助于达到目的。

for (var i = 0; i < 10000; i++) 
{ 
    var _guid = Guid.NewGuid(); 
    var _tasks = new[] 
    { 
     new Task(async (x) => 
     { 
      var p = new Program(); 
      try 
      { 
       await p.ServiceA(Guid.Parse(x.ToString())); 
      } 
      catch (MongoWriteException me) 
      { 
       await Task.Delay(5); 
       await p.ServiceA(Guid.Parse(x.ToString())); 
      } 
     }, _guid), 
     new Task(async (x) => 
     { 
      var p = new Program(); 
      try 
      { 
       await p.ServiceB(Guid.Parse(x.ToString())); 
      } 
      catch (MongoWriteException me) 
      { 
       await Task.Delay(5); 
       await p.ServiceB(Guid.Parse(x.ToString())); 
      } 
     }, _guid) 
    }; 

    _tasks[0].Start(); 
    _tasks[1].Start(); 
    Task.WaitAll(_tasks); 
} 
+0

感谢,为解决@Saleem ...会尝试。我会在服务方法内移动try/catch :) –

+0

确实,您可以根据需要进行修改。正如我在我对驱动程序代码的评论中所提到的那样,它不是完美的,而是达到其目的。 – Saleem