2017-09-14 14 views
0

鉴于以下方法观察到:非折返在C#

如果我离开了黑客到位,我的单元测试立即“可观察有没有数据”完成。

如果我采取了破解,有多个线程都尝试在同一时间登录。
主机服务不允许此

如何确保只有一个线程生产在任何给定时间点观测。

private static object obj = new object(); 
    private static bool here = true; 
    public IObservable<Party> LoadAllParties(CancellationToken token) 
    { 
     var parties = Observable.Create<Party>(
      async (observer, cancel) => 
      { 
       // this is just a hack to test behavior 
       lock (obj) 
       { 
        if (!here) 
         return; 
        here = false; 
       } 
       // end of hack. 
       try 
       { 
        if (!await this.RequestLogin(observer, cancel)) 
         return; 

        // request list. 
        await this._request.GetAsync(this._configuration.Url.RequestList); 
        if (this.IsCancelled(observer, cancel)) 
         return; 

        while (!cancel.IsCancellationRequested) 
        { 
         var entities = await this._request.GetAsync(this._configuration.Url.ProcessList); 
         if (this.IsCancelled(observer, cancel)) 
          return; 

         var tranche = this.ExtractParties(entities); 

         // break out if it's the last page. 
         if (!tranche.Any()) 
          break; 

         Array.ForEach(tranche, observer.OnNext); 

         await this._request.GetAsync(this._configuration.Url.ProceedList); 
         if (this.IsCancelled(observer, cancel)) 
          return; 
        } 

        observer.OnCompleted(); 
       } 
       catch (Exception ex) 
       { 
        observer.OnError(ex); 
       } 
      }); 
     return parties; 
    } 

我的单元测试:

var sut = container.Resolve<SyncDataManager>(); 
var count = 0; 
var token = new CancellationTokenSource(); 
var observable = sut.LoadAllParties(token.Token); 
observable.Subscribe(party => count++); 
await observable.ToTask(token.Token); 
count.Should().BeGreaterThan(0); 
+0

我看到一个'Observable.Create'调用,一个锁,以及一大堆无法解释的不可编译的自定义代码。如果你能产生[mcve],那可能会有所帮助。 – Shlomo

+0

我可以尝试做一个更简单的例子 - 但问题仍然是锁被多个线程击中。为什么?很明显,可观察性如何工作......我需要使其不可重入。我会努力简化这个例子。您可以用await Task.Delay(1000) – Jim

+0

替换每个异步调用您的客户端代码可能导致该问题。你对创建的可观测物体做了什么? – Shlomo

回答

3

我认为你的问题是从XY Problem痛苦 - 代码中包含几个电话不包括可能包含重要的副作用的方法,我觉得那回事可用的信息不会导致最好的建议。

这就是说,我怀疑你没有打算订阅observable两次 - 一次用明确的Subscribe打电话,一次用ToTask()打电话。这肯定会解释并发呼叫,这是发生在两个不同的订阅。

编辑:

如何断言的长度,而不是(调整以适应超时):

var length = await observable.Count().Timeout(TimeSpan.FromSeconds(3)); 

更好的方式是寻找到RX-测试和嘲笑你的依赖。这是一个很大的话题,但this long blog post from the Rx team explains it very well和关于TPL-Rx相互作用的这个答案可能有所帮助:Executing TPL code in a reactive pipeline and controlling execution via test scheduler

+0

哈 - 詹姆斯!我很确定你已经得到了它。我没有意识到ToTask会这样做。有关如何等待它完成的任何建议? – Jim

+0

顺便说一句,您可以直接“等待”您的'observable'(无需订阅调用) - 只要您期待得到单一结果,它就会隐式地进行必要的订阅和转换。虽然没有取消令牌支持。 –

+0

感谢詹姆斯,我想最终它可能是错误的模式 - 虽然它感觉不错,但它将最终通过单一的非折返式挂火工作来协调。我想确保不可重入,我将不得不用一个锁来包围它 - 但锁和异步不会让一对幸福的夫妻......也许是一个异步互斥 - 这是有例子的。我认为Stephen Cleary已经发布了一些例子。 – Jim