2013-11-28 54 views
1

我需要处理用户请求一个接一个(类似于像队列作业)创建一个线程队列

这是我有:

Thread checkQueue; 
Boolean IsComplete = true; 

protected void Start() 
{ 
    checkQueue = new Thread(CheckQueueState); 
    checkQueue.Start();  
} 

private void CheckQueueState() 
    { 
     while (true) 
     { 
      if (checkIsComplete) 
      { 
       ContinueDoSomething(); 

       checkQueue.Abort(); 
       break; 
      } 
      System.Threading.Thread.Sleep(1000); 
     } 
    } 

protected void ContinueDoSomething() 
{ 
    IsComplete = false; 
    ... 
    ... 
    IsComplete = true; //when done, set it to true 
} 

每次当有来自用户的新要求,系统会调用Start()函数并检查前一个工作是否完成,如果是,则继续下一个工作。

但我不确定这样做是否正确。

任何改进或任何更好的方法来做到这一点?

+0

您有效地不能使用Thread.Abort的。在SO上搜索以找到关于原因的一些讨论。 – usr

+0

您确定要在ASP.NET中执行此操作吗?这听起来像是在服务器上运行并独立于客户端浏览器运行的东西。 –

+0

@usr好的。谢谢你的建议。 – My2ndLovE

回答

1

我喜欢usr关于使用TPL Dataflow的建议。如果您有能力为项目添加外部依赖项(TPL Dataflow不是作为.NET框架的一部分分发),那么它为您的问题提供了一个干净的解决方案。

但是,如果您坚持框架所提供的内容,您应该看看BlockingCollection<T>,它与您试图实施的生产者 - 消费者模式很好地协同工作。

我把一个快速的.NET 4.0例子抛在一起,以说明如何在你的场景中使用它。这不是很瘦,因为它有很多电话Console.WriteLine()。但是,如果你把所有的混乱都解决了,那就非常简单了。

在它的中心是一个BlockingCollection<Action>,它获取Action代表添加到它从任何线程,和一个线程专用于出队,并在它们被添加的确切顺序执行这些Action小号顺序。

using System; 
using System.Collections.Concurrent; 
using System.Threading; 
using System.Threading.Tasks; 

namespace SimpleProducerConsumer 
{ 
    class Program 
    { 
     static void Main(string[] args) 
     { 
      Console.WriteLine("Main thread id is {0}.", Thread.CurrentThread.ManagedThreadId); 

      using (var blockingCollection = new BlockingCollection<Action>()) 
      { 
       // Start our processing loop. 
       var actionLoop = new Thread(() => 
       { 
        Console.WriteLine(
         "Starting action loop on thread {0} (dedicated action loop thread).", 
         Thread.CurrentThread.ManagedThreadId, 
         Thread.CurrentThread.IsThreadPoolThread); 

        // Dequeue actions as they become available. 
        foreach (var action in blockingCollection.GetConsumingEnumerable()) 
        { 
         // Invoke the action synchronously 
         // on the "actionLoop" thread. 
         action(); 
        } 

        Console.WriteLine("Action loop terminating."); 
       }); 

       actionLoop.Start(); 

       // Enqueue some work. 
       Console.WriteLine("Enqueueing action 1 from thread {0} (main thread).", Thread.CurrentThread.ManagedThreadId); 
       blockingCollection.Add(() => SimulateWork(1)); 

       Console.WriteLine("Enqueueing action 2 from thread {0} (main thread).", Thread.CurrentThread.ManagedThreadId); 
       blockingCollection.Add(() => SimulateWork(2)); 

       // Let's enqueue it from another thread just for fun. 
       var enqueueTask = Task.Factory.StartNew(() => 
       { 
        Console.WriteLine(
         "Enqueueing action 3 from thread {0} (task executing on a thread pool thread).", 
         Thread.CurrentThread.ManagedThreadId); 

        blockingCollection.Add(() => SimulateWork(3)); 
       }); 

       // We have to wait for the task to complete 
       // because otherwise we'll end up calling 
       // CompleteAdding before our background task 
       // has had the chance to enqueue action #3. 
       enqueueTask.Wait(); 

       // Tell our loop (and, consequently, the "actionLoop" thread) 
       // to terminate when it's done processing pending actions. 
       blockingCollection.CompleteAdding(); 

       Console.WriteLine("Done enqueueing work. Waiting for the loop to complete."); 

       // Block until the "actionLoop" thread terminates. 
       actionLoop.Join(); 

       Console.WriteLine("Done. Press Enter to quit."); 
       Console.ReadLine(); 
      } 
     } 

     private static void SimulateWork(int actionNo) 
     { 
      Thread.Sleep(500); 
      Console.WriteLine("Finished processing action {0} on thread {1} (dedicated action loop thread).", actionNo, Thread.CurrentThread.ManagedThreadId); 
     } 
    } 
} 

,输出是:

0.016s: Main thread id is 10. 
0.025s: Enqueueing action 1 from thread 10 (main thread). 
0.026s: Enqueueing action 2 from thread 10 (main thread). 
0.027s: Starting action loop on thread 11 (dedicated action loop thread). 
0.028s: Enqueueing action 3 from thread 6 (task executing on a thread pool thread). 
0.028s: Done enqueueing work. Waiting for the loop to complete. 
0.527s: Finished processing action 1 on thread 11 (dedicated action loop thread). 
1.028s: Finished processing action 2 on thread 11 (dedicated action loop thread). 
1.529s: Finished processing action 3 on thread 11 (dedicated action loop thread). 
1.530s: Action loop terminating. 
1.532s: Done. Press Enter to quit. 
+0

感谢您的详细解释。其中一个要求是,它必须一个接一个地执行(完成每个任务可能需要30秒)。我如何确保当前任务在下一个任务开始之前完成? – My2ndLovE

+0

@ My2ndLovE,我意识到这一要求。我在输出行前添加了一个时间戳,以便您可以看到排队的作业实际上是按顺序执行的(每个作业需要〜500ms)。 你用的是“任务”已经混淆,虽然我有点。 *您是否在使用“任务”封装您的工作?因为如果你使用'ConcurrentExclusiveSchedulerPair'的'ExclusiveScheduler'将会是更好的方法。 –

0

使用TPL数据流库中的ActionBlock<T>。将其MaxDegreeOfParalellism设置为1,即可完成。请注意,ASP.NET工作进程可以随时回收(例如,由于计划的回收,内存限制,服务器重新启动或部署),因此排队的工作可能会突然丢失,恕不另行通知。我建议你看看像MSMQ(或其他)的外部排队解决方案,以获得可靠的队列。

+0

ActionBlock需要运行在.net framework 4.5?我正在使用vs2010 – My2ndLovE

+0

它现在已经可以作为图书馆很长一段时间了。它最多需要4.0。 – usr

0

看看微软的反应扩展。该库包含一组可用的调度程序,它们遵循您所需的语义。

最适合您需要的是EventLoopScheduler。它会排列行动并且一个接一个地执行它们。如果它完成一个动作,并且队列中有更多项目,它将顺序处理同一线程上的动作,直到队列为空,然后它处理线程。当一个新的行动排队时,它会创建一个新的线程。这是非常有效的。

的代码是超级简单,看起来像这样:

var scheduler = new System.Reactive.Concurrency.EventLoopScheduler(); 

scheduler.Schedule(() => { /* action here */ }); 

如果你需要有一个新的线程执行的每一个排队的话,就用这样的:

var scheduler = new System.Reactive.Concurrency.NewThreadScheduler(); 

scheduler.Schedule(() => { /* action here */ }); 

很简单。