2013-12-11 169 views
0

我的目标是创建一个产生初始任务数量(完全相同)的系统。这些任务将抓取并执行一些后台操作(从数据库),然后返回,此时应该生成一个新任务并执行相同的操作。异步任务按顺序执行?

我已经写了下面的代码作为概念证明,但它似乎像我所有的任务正在执行1乘1,而不是并行。

代码:

public Form1() 
    { 
     InitializeComponent(); 
    } 
    CancellationTokenSource cts; 
    private async void button1_Click(object sender, EventArgs e) 
    { 
     cts = new CancellationTokenSource(); 
     int reqNumberOfThreads = int.Parse(textBox1.Text); 
     try 
     { 
      await startSlaves(cts.Token, reqNumberOfThreads); 
     } 
     catch (OperationCanceledException) 
     { 
      MessageBox.Show("Canceled Starting Threads"); 
     } 
     cts = null; 

    } 
    async Task startSlaves(CancellationToken ct, int threadNum) 
    { 

     List<Task<int>> allTasks = new List<Task<int>>();// ***Add a loop to process the tasks one at a time until none remain. 
     for (int x = 0; x < threadNum; x++) 
     { 
      allTasks.Add(beginSlaveOperation(ct, x)); 
     } 
     // ***Add a loop to process the tasks one at a time until none remain. 
     while (allTasks.Count <= threadNum) 
     { 
      // Identify the first task that completes. 
      Task<int> output = await Task.WhenAny(allTasks); 
      allTasks.Remove(output); 
      allTasks.Add(beginSlaveOperation(ct, output.Result)); 
     } 
    } 
    public void performExampleImportOperationThread(int inputVal, int whoAmI) 
    { 
     System.Threading.Thread.Sleep(inputVal*10); 
     System.Console.Write("Thread number" + whoAmI.ToString() + "has finished after "+inputVal.ToString()+" secs \n"); 
    } 
    async Task<int> beginSlaveOperation(CancellationToken ct, int whoAmI) 
    { 
     Random random = new Random(); 
     int randomNumber = random.Next(0, 100);//Get command from microSched and remove it from sched 
     performExampleImportOperationThread(randomNumber, whoAmI);//perform operation 
     return whoAmI; 
    } 

    private void button2_Click(object sender, EventArgs e) 
    { 
     if (cts != null) 
     { 
      cts.Cancel(); 
     } 
    } 

输出:

Thread number0has finished after 29 secs 
Thread number1has finished after 45 secs 
Thread number2has finished after 59 secs 
Thread number0has finished after 39 secs 
Thread number1has finished after 13 secs 
Thread number2has finished after 44 secs 
Thread number0has finished after 21 secs 
Thread number1has finished after 62 secs 
Thread number2has finished after 62 secs 
Thread number0has finished after 25 secs 
Thread number1has finished after 86 secs 
Thread number2has finished after 10 secs 
Thread number0has finished after 4 secs 
Thread number1has finished after 24 secs 
Thread number2has finished after 84 secs 
Thread number0has finished after 73 secs 
Thread number1has finished after 19 secs 
Thread number2has finished after 72 secs 
Thread number0has finished after 82 secs
+3

为什么downvotes? – paqogomez

回答

1

beginSlaveOperation被标记为async,和你怎么称呼它,仿佛它是异步的,但你从来没有真正await任何东西,所以它是怎么回事同步运行。 (它使呼叫者同步运行,呼叫者同步运行,等等,以便您的整个应用程序同步运行。)

标记方法async不会奇迹般地使其在新线程中运行。它所做的就是让您使用await关键字。如果你没有使用它,你所做的就是同步执行一堆代码并将结果包装在一个完成的任务中。你甚至应该得到一个编译器警告这样做。

可以修复通过具有performExampleImportOperationThreadasync方法和,而不是使用Thread.Sleep(...),使用await Task.Delay(...)。等待performExampleImportOperationThread使beginSlaveOperation实际上是异步的。

或者您可能无法完成您正在执行的任何操作并将其全部替换为Parallel.For,这可以设置MaxDegreesOfParallelism将您限制为特定数量的并发线程。

+0

感谢您的回答,第一部分解决了我的问题。至于第二部分,我认为并不适合我目前的需求。我并没有通过一组预定义的数据循环,而是设置了一些后台线程来检查连续工作(通过数据库或其他数据源)。 – Ramsey

+0

@Ramsey您没有要迭代的一组数据并不重要。您可以使用'Parallel.For'来运行一段代码'n'次,就像您可以使用常规'for'循环运行一段代码'n'次一样,即使您不'有一个数组来迭代。 – Servy

+0

我明白你在说什么,但是我需要将N设置为无穷大,因为我希望这些任务总能随时循环以检查要做的工作。我的目标是创建一个应用程序,让我可以将它放在新机器上,轻松使其成为运行作业的节点。 实际上,performExampleImportOperationThread()将是几种运行方法之一,具体取决于从数据库中分配任务的子任务。 – Ramsey

0

这里以供将来参考固定的代码:

public Form1() 
    { 
     InitializeComponent(); 
    } 
    CancellationTokenSource cts; 
    private async void button1_Click(object sender, EventArgs e) 
    { 
     cts = new CancellationTokenSource(); 
     int reqNumberOfThreads = int.Parse(textBox1.Text); 
     try 
     { 
      await startSlaves(cts.Token, reqNumberOfThreads); 
     } 
     catch (OperationCanceledException) 
     { 
      MessageBox.Show("Canceled Starting Threads"); 
     } 
     cts = null; 

    } 
    async Task startSlaves(CancellationToken ct, int threadNum) 
    { 

     List<Task<int>> allTasks = new List<Task<int>>();// ***Add a loop to process the tasks one at a time until none remain. 
     for (int x = 0; x < threadNum; x++) 
     { 
      allTasks.Add(beginSlaveOperation(ct, x)); 
     } 
     // ***Add a loop to process the tasks one at a time until none remain. 
     while (allTasks.Count <= threadNum) 
     { 
      // Identify the first task that completes. 
      Task<int> output = await Task.WhenAny(allTasks); 
      allTasks.Remove(output); 
      allTasks.Add(beginSlaveOperation(ct, output.Result)); 
     } 
    } 
    public async Task performExampleImportOperationThread(int inputVal, int whoAmI) 
    { 
     await Task.Delay(inputVal*100); 
     System.Console.Write("Thread number" + whoAmI.ToString() + "has finished after "+inputVal.ToString()+" secs \n"); 
    } 
    async Task<int> beginSlaveOperation(CancellationToken ct, int whoAmI) 
    { 
     Random random = new Random(); 
     int randomNumber = random.Next(0, 100);//Get command from microSched and remove it from sched 
     await performExampleImportOperationThread(randomNumber, whoAmI);//perform operation 
     return whoAmI; 
    } 

    private void button2_Click(object sender, EventArgs e) 
    { 
     if (cts != null) 
     { 
      cts.Cancel(); 
     } 
    }