2012-02-08 68 views
5

有时我想为网络活动等同时并行运行最大数量的IO操作。我掀起了一个小并发线程函数,它与https://gist.github.com/810920一起工作良好,但这不是真正的池,因为所有IO操作必须在别人可以开始之前完成。如何创建线程池?

类型的东西我要找的会是这样的:

runPool :: Int -> [IO a] -> IO [a] 

,应该能够在有限和无限列表操作。

管道包看起来能够很好地实现这一点,但我觉得可能有类似的解决方案,我只是提供了使用来自haskell平台的mvars等的要点。

有没有人遇到过没有任何沉重依赖的惯用解决方案?

回答

7

你需要一个线程池,如果你想要的东西短,你可以从Control.ThreadPool获得灵感(从控制发动机组件还提供更多的一般功能),例如threadPoolIO就是:

threadPoolIO :: Int -> (a -> IO b) -> IO (Chan a, Chan b) 
threadPoolIO nr mutator = do 
    input <- newChan 
    output <- newChan 
    forM_ [1..nr] $ 
     \_ -> forkIO (forever $ do 
      i <- readChan input 
      o <- mutator i 
      writeChan output o) 
    return (input, output) 

它使用两个Chan与外部通信,但这通常是你想要的,它确实有助于编写不混乱的代码。

如果你绝对要包起来在你的类型的函数可以封装过通信:

runPool :: Int -> [IO a] -> IO [a] 
runPool n as = do 
    (input, output) <- threadPoolIO n (id) 
    forM_ as $ writeChan input 
    sequence (repeat (length as) $ readChan output) 

这会不会让你的操作的顺序,是一个问题(很容易通过传输动作的索引来纠正,或者只是使用数组来存储响应)?

注意:如果您打算在长时间运行的应用程序中创建并清理其中几个池,那么n个线程将永远保持活动状态,向threadPoolIO添加“killAll”返回的操作可以轻松解决此问题(if不是,考虑到Haskell中线程的重量,它可能不值得费神)。 请注意,此函数仅适用于有限列表,这是因为IO通常是严格的,因此如果您真的希望您可以使用IO,则无法在整个列表生成之前开始处理IO [a]的元素懒惰IO与unsafeInterleaveIO(也许不是最好的主意),或完全改变你的模型,并使用类似管道的东西来流式传输你的结果。

+1

如果你不太喜欢你的runPool类型,threadPoolIO本身更健壮一些:你可以很容易地在你的程序中的多个地方重复使用它,你可以控制拆分无限列表并提供它并阅读大块响应等等...... – Jedai 2012-02-08 19:31:38

+0

'threadPoolIO'看起来不错。我将查看代码,看看它是如何实现的,因为我对创建线程池的最佳方式以及知道哪个Hackage版本受到社区青睐表示兴趣。 – 2012-02-09 06:00:57

相关问题