我有代码从SQL流下来的数据,并将其写入不同的商店。代码大致是这样的:如何使用Rx.Nex扩展ForEachAsync与异步动作
using (var cmd = new SqlCommand("select * from MyTable", connection))
{
using (var reader = await cmd.ExecuteReaderAsync())
{
var list = new List<MyData>();
while (await reader.ReadAsync())
{
var row = GetRow(reader);
list.Add(row);
if (list.Count == BatchSize)
{
await WriteDataAsync(list);
list.Clear();
}
}
if (list.Count > 0)
{
await WriteDataAsync(list);
}
}
}
我想使用Reactive扩展来达到这个目的。理想情况下,代码如下所示:
await StreamDataFromSql()
.Buffer(BatchSize)
.ForEachAsync(async batch => await WriteDataAsync(batch));
但是,似乎扩展方法ForEachAsync只接受同步操作。是否可以编写一个可接受异步操作的扩展?
非常好的一点。但是,通过等待,我基本上失去了在WriteDataAsync实现中使用异步的好处。我想知道是否保持原始代码的非阻塞性质。 –