2011-07-03 107 views
2

我有一些工作(工作)在队列中(所以有几个),我希望每个工作都由一个线程处理。需要线程处理的作业队列

我在看Rx,但这不是我想要的,然后遇到并行任务库。

由于我的工作将在一个Web应用程序,我不希望客户端在等待各项工作完成完成,所以我也做了以下内容:

public void FromWebClientRequest(int[] ids); 
    { 
     // I will get the objects for the ids from a repository using a container (UNITY) 


     ThreadPool.QueueUserWorkItem(delegate 
             { 
              DoSomeWorkInParallel(ids, container); 
             }); 
    } 

    private static void DoSomeWorkInParallel(int[] ids, container) 
    { 
     Parallel.ForEach(ids, id=> 
             { 
              Some work will be done here... 
              var respository = container.Resolve... 
             }); 


     // Here all the work will be done. 
     container.Resolve<ILogger>().Log("finished all work"); 
    } 

我将在调用上面的代码一个Web请求,然后客户端不必等待。

这是正确的方法吗?

TIA

+2

我知道它只是伪代码,但请告诉我你不会实例化你的仓库(从IOC容器)在你的lambda内.. –

+0

不,它已经存在,它是一个单身人士。 –

回答

3

从MSDN文档我看到Unitys的IContainer解决方法不是线程安全的(或不写)。这意味着你需要从线程循环中完成。编辑:改为Task

public void FromWebClientRequest(int[] ids); 
{ 
    IRepoType repoType = container.Resolve<IRepoType>(); 
    ILogger logger = container.Resolve<ILogger>(); 
    // remove LongRunning if your operations are not blocking (Ie. read file or download file long running queries etc) 
    // prefer fairness is here to try to complete first the requests that came first, so client are more likely to be able to be served "first come, first served" in case of high CPU use with lot of requests 
    Task.Factory.StartNew(() => DoSomeWorkInParallel(ids, repoType, logger), TaskCreationOptions.LongRunning | TaskCreationOptions.PreferFairness); 
} 

private static void DoSomeWorkInParallel(int[] ids, IRepoType repository, ILogger logger) 
{ 
    // if there are blocking operations inside this loop you ought to convert it to tasks with LongRunning 
    // why this? to force more threads as usually would be used to run the loop, and try to saturate cpu use, which would be doing nothing most of the time 
    // beware of doing this if you work on a non clustered database, since you can saturate it and have a bottleneck there, you should try and see how it handles your workload 
    Parallel.ForEach(ids, id=>{ 
        // Some work will be done here... 
        // use repository 
      }); 
    logger.Log("finished all work"); 
} 

另外,正如fiver所述,如果你有.Net 4,那么任务就是要走的路。

为什么去工作(在评论的问题):

如果你的方法fromClientRequest会被疯狂往往被炒了,你将填补线程池,和整体系统性能可能不会像在.NET 4中有良好细粒化。这是任务进入游戏的地方。每个任务不是它自己的线程,但是新的.Net 4线程池创建足够的线程来最大化系统性能,并且不需要打扰多少个cpus和多少线程上下文切换。

一些MSDN行情线程池:

当所有线程池中的线程已经 分配给任务,线程池 不会立即开始创建新 空闲线程。为避免 不必要地为线程分配堆栈空间 ,它会间隔创建新的空闲线程 。间隔为 ,目前为半秒,但它在 .NET Framework的未来版本中可能会发生更改 。

线程池具有的每间可用 处理器

250工作线程默认大小增加不必要的 空闲线程数也可能导致 性能问题。堆栈空间必须为每个线程分配 。如果太多 许多任务同时开始,它们的所有 可能看起来很慢。 寻找合适的余额是 性能调整问题。

通过使用任务,您可以放弃这些问题。

另一件好事是你可以细化运行类型的操作。如果您的任务运行阻止操作,这一点很重要。这是一个更多线程同时分配的情况,因为他们大多会等待。线程池不能自动地实现这一点:

Task.Factory.StartNew(() => DoSomeWork(), TaskCreationOptions.LongRunning); 

当然,你可以让它完成需求,而不诉诸ManualResetEvent的:

var task = Task.Factory.StartNew(() => DoSomeWork()); 
task.Wait(); 

除了这个你不必改变Parallel.ForEach如果您不希望出现异常或阻止,因为它是.Net 4任务并行库的一部分,并且(通常)可以很好地在.NET 4池上运行并进行优化,就像任务一样。

但是,如果您确实转到Tasks而不是parallel for,请从调用程序Task中删除LongRunning,因为Parallel.For是阻止操作,启动任务(包含fiver循环)不是。但是,这种方式可能会导致先到先得的优化失败,或者您必须在更多的任务(全部通过ID生成)上执行这些任务,否则这些任务可能会导致更少的正确行为。另一种选择是等待DoSomeWorkInParallel结尾的所有任务。

+0

谢谢你,没有意识到容器不是线程安全的。 –

+0

为什么要完成任务? –

+0

在编辑中添加了回复 –

3

另一种方法是使用任务:

public static void FromWebClientRequest(int[] ids) 
{ 
    foreach (var id in ids) 
    { 
     Task.Factory.StartNew(i => 
     { 
      Wl(i); 
     } 
     , id); 
    } 
} 
+0

但是现在您必须处理每项工作中的例外情况。 –

+0

@亨克:我的每份工作都处理可能会出现的任何异常情况。那么这种模式会很好用吗?说到这一点,我喜欢Parallel.ForEach()的语法和Task一起使用。 –

1

我会叫上的Web 请求,上面的代码,然后客户端将不会 等待。

这将工作,只要客户端不需要答案(如确定/失败)。

这是正确的 方式做到这一点?

差不多。您可以使用Parallel.ForEach(TPL)作业,但可以从“普通”的Threadpool作业运行它。更好地使用外部工作的任务。

另外,处理该外部任务中的所有异常。并注意容器的线程安全等

+0

感谢您的意见。我仍然在学习如何在这种情况下最好地使用线程。为什么使用Task来代替(异常处理)会更好? –

+1

TPL调度程序可以在主作业等待时重新使用该线程。在其他情况下,您还可以获得更好的集成。 –

+0

下面的其他人提到使用Task.Factory.StartNew()。这会更好,而不是使用Parallel.ForEach()? –