我正在读取azure表的数据 - 大约5k个表,并收集不同的度量标准并将它们以异步方式保存回其他一些天蓝色表中。我面临的问题是,当有大量的数据可能偶尔发生,应用程序开始挂起。相同的代码工作正常,数据较少。我做的步骤(所有的人都使用的Rx,异步异步和等待)是仅当数据较少时才异步
- 从Azure的
- 阅读所有的表名阅读所有表以前的测量数据(1 & 2是并联的 -
Task.WhenAll
) - 每个表,过程获取数据并保存(
Task.WhenAll
)
我想要的是,使用asynchronousy直到它不会使我的应用程序挂。如果数据的数量超过了可处理的数据量,则不应再读取表格的数据,而应关注完成可用的数据处理。
是否Parallel.ForEach
负责照顾?
该代码:编辑按Stephen Cleary,仍然不适用于所有表格。而它正在工作500个表,
我认为这是使应用程序(控制台应用程序)停滞而不是线程数量的数据量。 (一个线程最终可能会获取数以万计的行,以千为单位,每传递一个方法并将其计数添加到字典中,因此在需要更多内存时可以进行垃圾回收),还是我已经实现的方式Semaphoreslim
那是错的?
public async Task CalculateMetricsForAllTablesAsync()
{
var allWizardTableNamesTask = GetAllWizardTableNamesAsync();
var allTablesNamesWithLastRunTimeTask = GetAllTableNamesWithLastRunTimeAsync();
await Task.WhenAll(allWizardTableNamesTask, allTablesNamesWithLastRunTimeTask).ConfigureAwait(false);
var allWizardTableNames = allWizardTableNamesTask.Result;
var allTablesNamesWithLastRunTime = allTablesNamesWithLastRunTimeTask.Result;
var throttler = new SemaphoreSlim(10);
var concurrentTableProcessingTasks = new ConcurrentStack<Task>();
foreach (var tname in allWizardTableNames)
{
await throttler.WaitAsync();
try
{
concurrentTableProcessingTasks.Push(ProcessTableDataAsync(tname, getTableNameWithLastRunTime(tname)));
}
finally
{
throttler.Release();
}
}
await Task.WhenAll(concurrentTableProcessingTasks).ConfigureAwait(false);
}
private async Task ProcessTableDataAsync(string tableName, Tuple<string, string> matchingTable)
{
var tableDataRetrieved = new TaskCompletionSource<bool>();
var metricCountsForEachDay = new ConcurrentDictionary<string, Tuple<int, int>>();
_fromATS.GetTableDataAsync<DynamicTableEntity>(tableName, GetFilter(matchingTable))
.Subscribe(entities => ProcessWizardDataChunk(metricCountsForEachDay, entities),() => tableDataRetrieved.TrySetResult(true));
await tableDataRetrieved.Task;
await SaveMetricDataAsync(tableName, metricCountsForEachDay).ConfigureAwait(false);
}
您需要向我们展示您的代码。 – Enigmativity
你也应该坚持Rx或TPL /异步/等待。一个或另一个,不是两个。混合时很容易造成死锁。就我个人而言,我会和Rx一起去做你正在做的事情。 – Enigmativity
我不认为这是死锁,当我在断点处调试并停止时,其他线程正在工作并且内存压力减少,因为我已经停止处理更多线程 – Saravanan