2017-07-28 34 views
4

我有代码从SQL流下来的数据,并将其写入不同的商店。代码大致是这样的:如何使用Rx.Nex扩展ForEachAsync与异步动作

using (var cmd = new SqlCommand("select * from MyTable", connection)) 
{ 
    using (var reader = await cmd.ExecuteReaderAsync()) 
    { 
     var list = new List<MyData>(); 
     while (await reader.ReadAsync()) 
     { 
      var row = GetRow(reader); 
      list.Add(row); 
      if (list.Count == BatchSize) 
      { 
       await WriteDataAsync(list); 
       list.Clear(); 
      } 
     } 
     if (list.Count > 0) 
     { 
      await WriteDataAsync(list); 
     } 
    } 
} 

我想使用Reactive扩展来达到这个目的。理想情况下,代码如下所示:

await StreamDataFromSql() 
    .Buffer(BatchSize) 
    .ForEachAsync(async batch => await WriteDataAsync(batch)); 

但是,似乎扩展方法ForEachAsync只接受同步操作。是否可以编写一个可接受异步操作的扩展?

回答

0

这里是the source code for ForEachAsync,并在ToEnumerable和AsObservable方法的article

我们可以让周围的ForEachAsync的包装,将等待下一个任务的返回功能:

public static async Task ForEachAsync<T>(this IObservable<T> t, Func<T, Task> onNext) 
{ 
    foreach (var x in t.ToEnumerable()) 
     await onNext(x); 
} 

用法示例:

await ForEachAsync(Observable.Range(0, 10), async x => await Task.FromResult(x)); 
+0

非常好的一点。但是,通过等待,我基本上失去了在WriteDataAsync实现中使用异步的好处。我想知道是否保持原始代码的非阻塞性质。 –

0

要做的正确的事情是正确使用Reactive Extensions来完成这个任务 - 所以从这一点开始,哟直到你编写你的数据,你才能建立连接。

方法如下:

IObservable<IList<MyData>> query = 
    Observable 
     .Using(() => new SqlConnection(""), connection => 
      Observable 
       .Using(() => new SqlCommand("select * from MyTable", connection), cmd => 
        Observable 
         .Using(() => cmd.ExecuteReader(), reader => 
          Observable 
           .While(() => reader.Read(), Observable.Return(GetRow(reader)))))) 
     .Buffer(BatchSize); 

IDisposable subscription = 
    query 
     .Subscribe(async list => await WriteDataAsync(list)); 

我无法测试的代码,但它应该工作。此代码假定WriteDataAsync也可以采取IList<MyData>。如果它不只是在.ToList()下降。

1

是否有可能编写一个可以接受异步操作的扩展?

不直接。

Rx订阅必须同步,因为Rx是基于推送的系统。当数据项到达时,它会遍历您的查询,直到它遇到最终订阅 - 在这种情况下,将执行Action

以Rx提供的await -able方法await荷兰国际集团的序列本身 - 即ForEachAsync是在序列(要异步等待序列完成)方面异步的,但是内ForEachAsync认购(中为每个元素采取的行动)必须仍然是同步的。

为了在数据管道中执行同步到异步转换,您需要有一个缓冲区。 Rx订阅可以(同步)作为生产者添加到缓冲区,而异步使用者正在检索项目并处理它们。所以,你需要一个支持同步和异步操作的生产者/消费者队列。

TPL Dataflow中的各种块类型可以满足此需求。像这样的东西应该就足够了:

var obs = StreamDataFromSql().Buffer(BatchSize); 
var buffer = new ActionBlock<IList<T>>(batch => WriteDataAsync(batch)); 
using (var subscription = obs.Subscribe(buffer.AsObserver())) 
    await buffer.Completion; 

请注意,没有背压;只要StreamDataFromSql可以推送数据,它将被缓存并存储在ActionBlock的传入队列中。根据数据的大小和类型,这可以快速使用大量内存。

+0

您能解释一下“Rx订阅是否必须同步,因为Rx是基于推送的系统”?乍一看,我会说这是不正确的,但也许我误解了你的意思。 – Enigmativity

+0

@Enigmativity我的意思是没有内置的自动背压系统。例如,您的答案中的订阅是同步的,而不是异步的。一旦在订阅中点击第一个“await”,就Rx而言,整个订阅方法已经完成,并且可以自由地启动另一个订阅方法。 –

+0

够公平的。我想我应该放入'.ObserveOn'调用来将执行从UI线程移开。然后,根据“WriteDataAsync”是否使用任何UI元素,它可以运行在用户界面上,避免关于异步的问题。 – Enigmativity