看来我们需要两个功能:一个启动所有的异步任务,另一个监视它们并在死亡时重新启动它们。
第一个可以这样写:
withAsyncMany :: [IO t] -> ([Async t] -> IO b) -> IO b
withAsyncMany [] f = f []
withAsyncMany (t:ts) f = withAsync t $ \a -> withAsyncMany ts (f . (a:))
如果我们使用的managed包,我们也可以写成这样的:
import Control.Monad.Managed (with,managed)
withAsyncMany' :: [IO t] -> ([Async t] -> IO b) -> IO b
withAsyncMany' = with . traverse (\t -> managed (withAsync t))
重启功能将循环列表异口同声地询问他们的状况,并在他们失败时更新他们:
{-# language NumDecimals #-}
import Control.Concurrent (threadDelay)
resurrect :: IO t -> [Async t] -> IO()
resurrect restartAction = go []
where
go ts [] = do
threadDelay 1e6 -- wait a little before the next round of polling
go [] (reverse ts)
go past (a:pending) = do
status <- poll a -- has the task died, or finished?
case status of
Nothing -> go (a:past) pending
Just _ -> withAsync restartAction $ \a' -> go (a':past) pending
但是我担心很多嵌套的withAsyncs
会导致某种类型的资源泄漏(因为某些异常处理程序必须与每个withAsync
一起安装以便在父线程死亡的情况下通知孩子)的可能性。
因此,也许在这种情况下,它会更好产卵工人用普通async
秒,Async
S中的集合存储到某种可变的参考和安装在监视线程一个异常处理程序,它会遍历容器终止每个任务。
好奇:什么是你的使用情况?这听起来像是你试图建立一个Erlang式的监督模型,对于这个模型你可能会更好地服务于像''pipes-concurrency''这样的演员库(https://hackage.haskell.org/package/pipes-并发) –
@BenjaminHodgson的用例是当我的webapp启动时产生一个工作队列。作业队列内部产生一个作业轮询线程和一个作业监听/通知线程。如果其中任何一个线程死亡,则需要重新生成它们。如果应用程序线程被终止,那么工作队列需要被终止。 –