2017-03-07 49 views
1

比方说,我有一个数据服务类,它将数据逐批地批量提取给用户。如何避免在下次完成之前调用onComplete?

public class DataService { 
    public IObservable<IList<T>> QuerySegmentedObservable<T>(string tableName) where T : TableEntity, new(){ 

     return Observable.Create<IList<T>>(async (observer, token) =>{ 
      TableContinuationToken continuationToken = null; 
      do{ 
       var currentSegment = CallData(); 
       observer.OnNext(currentSegment.Results); 

       continuationToken = currentSegment.ContinuationToken; 
      } while (continuationToken != null); 
      observer.OnCompleted(); 
     } 
    } 
} 

我正在订阅这个可观察的如下。

public async Task<bool> MyMethod() 
     { 
      var tcs = new TaskCompletionSource<bool>(); 
      var observable = _dataService.QuerySegmentedObservable<TSource>(_sourceTableName); 

      var dataCount = 0; 

      _databaseService.OpenConnection(); 

      observable.Subscribe(async data => 
      {     
       await _databaseService.DoSomething(data); 

       dataCount += data.Count; 

       Console.WriteLine($"Processing - {dataCount}"); 
      }, 
      err => 
      { 
       Console.WriteLine($"Error - {err.Message}"); 
       tcs.SetResult(false); 
      }, 
      () => 
      {  
       _databaseService.CloseConnection(); 
       Console.WriteLine($"Finished"); 
       tcs.SetResult(true); 
      } 
      ); 

      return await tcs.Task; 
     } 

问题是OnComplete()在最后的OnNext()完成之前调用。因此,在完成我在Subscribe()中完成的任务之前,我最终关闭了连接。

有什么方法可以解决它吗?谢谢。

+3

尝试从'onNext'块中删除异步操作。 – redent84

+0

'_databaseService'的类型是什么? – Enigmativity

+0

@En这只是一个将数据插入数据库的类,或者做一些可能需要几秒钟才能达到40秒的类。 –

回答

0

Rx支持运算符中的异步/等待。您在订阅中使用它。所以(希望)你可以改变你的代码是这样的:

public async Task<bool> MyMethod() 
{ 
    var tcs = new TaskCompletionSource<bool>(); 
    _databaseService.OpenConnection(); 
    var dataCount = 0; 
    _dataService.QuerySegmentedObservable<TSource>(_sourceTableName) 
     .SelectMany(async data => 
     { 
      await _databaseService.DoSomething(data); 
      return data; 
     }) 
     //.Finally(() => _databaseService.CloseConnection()) //This would be called on OnComplete and OnError, just like try-finally 
     .Subscribe(data => 
      { 
       dataCount += data.Count; 

       Console.WriteLine($"Processing - {dataCount}"); 
      }, 
      err => 
      { 
       Console.WriteLine($"Error - {err.Message}"); 
       tcs.SetResult(false); 
      }, 
      () => 
      { 
       _databaseService.CloseConnection(); //Maybe move this to a Finally call? 
       Console.WriteLine($"Finished"); 
       tcs.SetResult(true); 
      } 
     ); 

    return await tcs.Task; 
} 

我不能真的测试它,所以我希望你设置在正确的道路上。如果您需要更多帮助,请发布更好的MCVE

相关问题