2017-10-16 53 views
2

async package的文档链接描述withAsync功能:如何父异步多孩子异步操作

菌种在一个单独的线程异步操作,并通过其异步 手柄所提供的功能。当函数返回或抛出一个异常时,在Async上调用uninterruptibleCancel。这是异步的一个有用的变体,确保Async永远不会无意中保持运行 。

我在那个一直盯着过去2小时,一直无法弄清楚如何启动一个监视线程,会派生多个工作线程,使得:

  • 如果监视器线程死亡,所有工作线程应该被终止,
  • 但是,如果任何工作线程死亡,则其他工作线程不应受影响。应该通知监视器,它应该能够重新启动工作线程。
+1

好奇:什么是你的使用情况?这听起来像是你试图建立一个Erlang式的监督模型,对于这个模型你可能会更好地服务于像''pipes-concurrency''这样的演员库(https://hackage.haskell.org/package/pipes-并发) –

+0

@BenjaminHodgson的用例是当我的webapp启动时产生一个工作队列。作业队列内部产生一个作业轮询线程和一个作业监听/通知线程。如果其中任何一个线程死亡,则需要重新生成它们。如果应用程序线程被终止,那么工作队列需要被终止。 –

回答

2

看来我们需要两个功能:一个启动所有的异步任务,另一个监视它们并在死亡时重新启动它们。

第一个可以这样写:

withAsyncMany :: [IO t] -> ([Async t] -> IO b) -> IO b 
withAsyncMany []  f = f [] 
withAsyncMany (t:ts) f = withAsync t $ \a -> withAsyncMany ts (f . (a:)) 

如果我们使用的managed包,我们也可以写成这样的:

import Control.Monad.Managed (with,managed) 

withAsyncMany' :: [IO t] -> ([Async t] -> IO b) -> IO b 
withAsyncMany' = with . traverse (\t -> managed (withAsync t)) 

重启功能将循环列表异口同声地询问他们的状况,并在他们失败时更新他们:

{-# language NumDecimals #-} 
import Control.Concurrent (threadDelay) 

resurrect :: IO t -> [Async t] -> IO() 
resurrect restartAction = go [] 
    where 
    go ts [] = do 
     threadDelay 1e6 -- wait a little before the next round of polling 
     go [] (reverse ts) 
    go past (a:pending) = do 
     status <- poll a -- has the task died, or finished? 
     case status of 
      Nothing -> go (a:past) pending 
      Just _ -> withAsync restartAction $ \a' -> go (a':past) pending 

但是我担心很多嵌套的withAsyncs会导致某种类型的资源泄漏(因为某些异常处理程序必须与每个withAsync一起安装以便在父线程死亡的情况下通知孩子)的可能性。

因此,也许在这种情况下,它会更好产卵工人用普通async秒,Async S中的集合存储到某种可变的参考和安装在监视线程一个异常处理程序,它会遍历容器终止每个任务。

0

这是另一个答案,它使用async而不是withAsync。主要功能是

monitor :: Int -> IO() -> IO() 
monitor count task = 
    bracket (do asyncs <- replicateM count (async task) 
       newIORef asyncs) 
      (\ref -> forever (do 
       threadDelay 1e6 
       asyncs <- readIORef ref 
       vivify task (writeIORef ref) asyncs)) 
      (\ref -> do 
       asyncs <- readIORef ref 
       mapM_ uninterruptibleCancel asyncs) 

它使用辅助vivify功能横贯Async个列表,复兴死的和写回更新列表到IORef

vivify :: IO() -> ([Async()] -> IO()) -> [Async()] -> IO() 
vivify task write asyncs = go [] asyncs 
    where 
    go _ [] = do 
     return() 
    go past (a:pending) = do 
     status <- poll a 
     case status of 
      Nothing -> do 
       go (a:past) pending 
      Just _ -> do 
       past' <- mask_ $ do 
        a' <- async task 
        write (reverse (a':past) ++ pending) 
        return (a':past) 
       go past' pending 

我们屏蔽异步例外创建一个新的Async并将其保留在IOref之间的时间间隔,因为否则如果异步异常到达并中止监视器线程,那么Async将保持悬挂状态。