1
比方说,我有一个数据服务类,它将数据逐批地批量提取给用户。如何避免在下次完成之前调用onComplete?
public class DataService {
public IObservable<IList<T>> QuerySegmentedObservable<T>(string tableName) where T : TableEntity, new(){
return Observable.Create<IList<T>>(async (observer, token) =>{
TableContinuationToken continuationToken = null;
do{
var currentSegment = CallData();
observer.OnNext(currentSegment.Results);
continuationToken = currentSegment.ContinuationToken;
} while (continuationToken != null);
observer.OnCompleted();
}
}
}
我正在订阅这个可观察的如下。
public async Task<bool> MyMethod()
{
var tcs = new TaskCompletionSource<bool>();
var observable = _dataService.QuerySegmentedObservable<TSource>(_sourceTableName);
var dataCount = 0;
_databaseService.OpenConnection();
observable.Subscribe(async data =>
{
await _databaseService.DoSomething(data);
dataCount += data.Count;
Console.WriteLine($"Processing - {dataCount}");
},
err =>
{
Console.WriteLine($"Error - {err.Message}");
tcs.SetResult(false);
},
() =>
{
_databaseService.CloseConnection();
Console.WriteLine($"Finished");
tcs.SetResult(true);
}
);
return await tcs.Task;
}
问题是OnComplete()在最后的OnNext()完成之前调用。因此,在完成我在Subscribe()中完成的任务之前,我最终关闭了连接。
有什么方法可以解决它吗?谢谢。
尝试从'onNext'块中删除异步操作。 – redent84
'_databaseService'的类型是什么? – Enigmativity
@En这只是一个将数据插入数据库的类,或者做一些可能需要几秒钟才能达到40秒的类。 –