1

我正在读取azure表的数据 - 大约5k个表,并收集不同的度量标准并将它们以异步方式保存回其他一些天蓝色表中。我面临的问题是,当有大量的数据可能偶尔发生,应用程序开始挂起。相同的代码工作正常,数据较少。我做的步骤(所有的人都使用的Rx,异步异步和等待)是仅当数据较少时才异步

  1. 从Azure的
  2. 阅读所有的表名阅读所有表以前的测量数据(1 & 2是并联的 - Task.WhenAll
  3. 每个表,过程获取数据并保存(Task.WhenAll

我想要的是,使用asynchronousy直到它不会使我的应用程序挂。如果数据的数量超过了可处理的数据量,则不应再读取表格的数据,而应关注完成可用的数据处理。

是否Parallel.ForEach负责照顾?

该代码:编辑按Stephen Cleary,仍然不适用于所有表格。而它正在工作500个表,

我认为这是使应用程序(控制台应用程序)停滞而不是线程数量的数据量。 (一个线程最终可能会获取数以万计的行,以千为单位,每传递一个方法并将其计数添加到字典中,因此在需要更多内存时可以进行垃圾回收),还是我已经实现的方式Semaphoreslim那是错的?

public async Task CalculateMetricsForAllTablesAsync() 
{ 
    var allWizardTableNamesTask = GetAllWizardTableNamesAsync(); 
    var allTablesNamesWithLastRunTimeTask = GetAllTableNamesWithLastRunTimeAsync(); 

    await Task.WhenAll(allWizardTableNamesTask, allTablesNamesWithLastRunTimeTask).ConfigureAwait(false); 

    var allWizardTableNames = allWizardTableNamesTask.Result; 
    var allTablesNamesWithLastRunTime = allTablesNamesWithLastRunTimeTask.Result; 

    var throttler = new SemaphoreSlim(10); 
    var concurrentTableProcessingTasks = new ConcurrentStack<Task>(); 

    foreach (var tname in allWizardTableNames) 
    { 
     await throttler.WaitAsync(); 
     try 
     { 
      concurrentTableProcessingTasks.Push(ProcessTableDataAsync(tname, getTableNameWithLastRunTime(tname))); 
     } 
     finally 
     { 
      throttler.Release(); 
     } 
    } 

    await Task.WhenAll(concurrentTableProcessingTasks).ConfigureAwait(false); 

} 

private async Task ProcessTableDataAsync(string tableName, Tuple<string, string> matchingTable) 
{ 
    var tableDataRetrieved = new TaskCompletionSource<bool>(); 
    var metricCountsForEachDay = new ConcurrentDictionary<string, Tuple<int, int>>(); 

    _fromATS.GetTableDataAsync<DynamicTableEntity>(tableName, GetFilter(matchingTable)) 
     .Subscribe(entities => ProcessWizardDataChunk(metricCountsForEachDay, entities),() => tableDataRetrieved.TrySetResult(true)); 

    await tableDataRetrieved.Task; 
    await SaveMetricDataAsync(tableName, metricCountsForEachDay).ConfigureAwait(false); 
} 
+0

您需要向我们展示您的代码。 – Enigmativity

+0

你也应该坚持Rx或TPL /异步/等待。一个或另一个,不是两个。混合时很容易造成死锁。就我个人而言,我会和Rx一起去做你正在做的事情。 – Enigmativity

+0

我不认为这是死锁,当我在断点处调试并停止时,其他线程正在工作并且内存压力减少,因为我已经停止处理更多线程 – Saravanan

回答

3

由于您async被包装的Rx,我建议在async级节流。您可以通过定义SemaphoreSlim并将方法逻辑封装在WaitAsync/Release中来完成此操作。

或者,考虑TPL数据流。数据流具有内置的节流选项(MaxDegreeOfParallelism),并且与async和Rx自然互操作。

+0

控制线程数量是否解决数据量?即使很少的线程可以从多个表中获取更多数据? – Saravanan

+2

限制将限制正在处理的任务的数量。您的更新代码没有正确应用调节(它只是限制了“ProcessTableDataAsync”的* start *并将任务添加到堆栈)。相反,你应该在'try'里面的'ProcessTableDataAsync'中拥有所有的代码。 'Semaphoreslim'无法正确调节 –

+0

。使用“TPL数据流”。用“TPL Dataflow”编写异步代码变得很容易。在有限数量的表格中,我看到了可见的性能改进。去测试所有的表格。 – Saravanan

相关问题