2015-07-09 124 views
1

我正在编写一组异步任务,这些任务会消除下载和解析数据,但是我在下一步更新数据库时遇到了一些空白。将异步任务与阻塞同步任务混合使用

问题是,出于性能的考虑,我使用TableLock来加载相当大的数据集,所以我想要做的是让我的导入服务等待第一个任务返回,开始导入。如果在第一次导入运行时完成另一个任务,则该过程将加入队列并等待任务1的导入服务已完成。

例如,

异步 - 任务1 - 任务2 - 任务3

同步 - ImportService

RunAsync任务

Task3 returns first > ImportService.Import(Task3) 
Task1 return, ImportService is still running. Wait() 
ImportService.Complete() event 
Task2 returns. Wait() 
ImportService.Import(Task1) 
ImportService.Complete() event 
ImportService.Import(Task2) 
ImportService.Complete() event 

希望这是有道理的!

+0

您可能应该考虑[TPL DataFlow](https://msdn.microsoft.com/library/hh228603.aspx“Dataflow(任务并行库)”)。 –

+0

Paulo,那正是我正在寻找的!谢谢! –

回答

1

虽然这可能不是最优雅的解决方案,但我会尝试启动工作任务并让他们将输出放在ConcurrentQueue中。您可以检查队列中的计时器,直到完成所有任务。

var rand = new Random(); 
var importedData = new List<string>(); 
var results = new ConcurrentQueue<string>(); 
var tasks = new List<Task<string>> 
{ 
    new Task<string>(() => 
    { 
     Thread.Sleep(rand.Next(1000, 5000)); 
     Debug.WriteLine("Task 1 Completed"); 
     return "ABC"; 
    }), 
    new Task<string>(() => 
    { 
     Thread.Sleep(rand.Next(1000, 5000)); 
     Debug.WriteLine("Task 2 Completed"); 
     return "FOO"; 
    }), 
    new Task<string>(() => 
    { 
     Thread.Sleep(rand.Next(1000, 5000)); 
     Debug.WriteLine("Task 3 Completed"); 
     return "BAR"; 
    }) 
}; 

tasks.ForEach(t => 
{ 
    t.ContinueWith(r => results.Enqueue(r.Result)); 
    t.Start(); 
}); 

var allTasksCompleted = new AutoResetEvent(false); 
new Timer(state => 
{ 
    var timer = (Timer) state; 
    string item; 

    if (!results.TryDequeue(out item)) 
     return; 

    importedData.Add(item); 
    Debug.WriteLine("Imported " + item); 

    if (importedData.Count == tasks.Count) 
    { 
     timer.Dispose(); 
     Debug.WriteLine("Completed."); 
     allTasksCompleted.Set(); 
    } 
}).Change(1000, 100); 


allTasksCompleted.WaitOne(); 
+0

'Task.WhenAll'可以简化你的代码。 – EZI

2

你真的不能用等待在这里,但你可以等待多个任务来完成:

var tasks = new List<Task)(); 
// start the tasks however 
tasks.Add(Task.Run(Task1Function); 
tasks.Add(Task.Run(Task2Function); 
tasks.Add(Task.Run(Task2Function); 

while (tasks.Count > 0) 
{ 
    var i = Task.WaitAny(tasks.ToArray()); // yes this is ugly but an array is required 
    var task = tasks[i]; 
    tasks.RemoveAt(i); 
    ImportService.Import(task); // do you need to pass the task or the task.Result 
} 

在我看来,不过应该有一个更好的选择。你可以让任务和进口跑在ImportService部分例如加锁:

// This is the task code doing whatever 
.... 
// Task finishes and calls ImportService.Import 
lock(typeof(ImportService)) // actually the lock should probably be inside the Import method 
{ 
    ImportService.Import(....); 
} 

有几件事情困扰着我你的要求(包括使用静态ImportService,静态类很少是个好主意),但没有进一步的细节,我无法提供更好的建议。

+1

对于OP:我同意Eli你应该简单地同步导入,每个任务在导入之前获取锁定作为最后的操作。根据具体的实现,这个主题可能会有更好的变化,但上面的例子和合理提供的一样好,除非你用更多的细节来改进问题,包括[一个好的,_minimal_,_complete_代码示例]( https://stackoverflow.com/help/mcve),这清楚地说明了这个问题。 –

+0

谢谢伊利和彼得。 我非常喜欢锁定线程的想法,这是我原先想到的,但并不认为这是可能的任务,你每天都会学到新的东西。谢谢! :) –

+0

@Al。你应该考虑你是否需要在应用程序级别锁定。我不知道你使用的是什么数据库,但数据库最适合处理并发操作。如果您在应用程序级别进行锁定,假设甚至需要锁定,那么您仅限于同时运行的单个应用程序。无需重写锁定机制,您无法扩展到多个服务器。也许你的问题应该是:我如何让我的查询在没有TableLock的情况下运行,或者没有应用程序级锁。 –