2011-08-18 57 views
1

我有类似以下的情况:这种情况下推荐的F#模式是什么?

let mutable stopped = false 

let runAsync() = async { 
    while not stopped do 
     let! item = fetchItemToProcessAsync 
     match item with 
     | Some job -> job |> runJobAsync |> Async.Start 
     | None -> do! Async.Sleep(1000) 
} 

let run() = Async.Start runAsync 
let stop() = 
    stopped <- true 

现在,当停止方法被调用时,我不得不停止阅读从DB进一步的项目,并等待当前开始到完成返回前的那些从这个功能。

完成此操作的最佳方法是什么?我想用一个计数器,(与联锁API)组成,当计数器达到0

如果没有做到这一点的另一种方式,我将不胜感激指导从停止方法返回。我有一种感觉,我可以在这里使用代理,但我不确定是否有任何可用的方法来与代理完成此操作,或者如果我仍然必须编写自定义逻辑以确定作业已完成执行。

回答

1

看看actor-based patterns and MailboxProcessor

基本上你能想象,作为一个异步队列。如果您使用的运行列表(开始Async.StartChildAsync.StartAsTask)作为MailboxProcessor里面的参数为你的循环,你可以从容地处理通过等待或的CancellationToken)停产

下面是一个简单示例中,我放在一起:


type Commands = 
    | RunJob of Async 
    | JobDone of int 
    | Quit of AsyncReplyChannel 

type JobRunner() = 
    let processor = 
     MailboxProcessor.Start (fun inbox -> 
      let rec loop (nextId, jobs) = async { 
       let! cmd = inbox.Receive() 
       match cmd with 
       | Quit cb -> 
        if not (Map.isEmpty jobs) 
        then async { 
          do! Async.Sleep 100 
          inbox.Post (Quit cb)} 
         |> Async.Start 
         return! loop (nextId, jobs) 
        else 
         cb.Reply() 
         return() 
       | JobDone id -> 
        return! loop (nextId, jobs |> Map.remove id) 
       | RunJob job -> 
        let runJob i = async { 
         do! job 
         inbox.Post (JobDone i) 
        } 
        let! child = Async.StartChild (runJob nextId) 
        return! loop (nextId+1, jobs |> Map.add nextId child) 
      } 
      loop (0, Map.empty)) 
    member jr.PostJob(job) = processor.Post (RunJob job) 
    member jr.Quit() = processor.PostAndReply(fun cb -> Quit cb) 

let postWaitJob (jobRunner : JobRunner) time = 
    let job = async { 
     do! Async.Sleep time 
     printfn "sleept for %d ms" time } 
    jobRunner.PostJob job 

let testRun() = 
    let jr = new JobRunner() 
    printfn "starting jobs..." 
    [10..-1..1] |> List.iter (fun i -> postWaitJob jr (i*1000)) 
    printfn "sending quit" 
    jr.Quit() 
    printfn "done!" 

嗯...有与编辑器的一些问题在这里:它只是杀死了大量的代码,当我用管道回运营商...哎呀

简短的解释:你可以看到我总是提供内部循环与下一个免费的作业ID和一个Map-> AsyncChild作业。 (你当然可以实现其他/更好的解决方案 - 该地图是不是在这个例子中neccessary,但你可以用命令“Cancell JobNr”或任何这样的程度) 作业完成消息仅用于internaly删除从这个职位刚刚退出地图 检查,如果该映射为空 - 如果没有额外的工作需要和Mailboxprocessor退出(返程()) - 如果它不是空的一个新的异步孩子开始,只是等待100毫秒,然后重新发送退出 - 消息 RunJob相当简单 - 它只是将给定的作业连同JobDone的后台链接到MessabeBoxProcessor中,并使用更新后的值(nextId是一个,并且新的Job映射到旧的nextId)返回recursivley循环。

+0

感谢您的答复 - 我下去相同的路径了。不过,我有两个问题,看你的例子:1)当我们开始用'Async.StartChild异步:异步<'u>'然后不,我们需要有另一个呼叫“从异步<'u>检索'U(如让! x =孩子或者做!孩子)'? 2)鉴于这些工作可以在不同的时间完成,我们不需要为Map函数使用某种同步吗? –

+1

嗨 - 是的,如果你想resutl你会净另一个让!为孩子 - 但我不关心答案(没有),你不必。第二:不,因为我从来没有访问过相同的地图,但总是生成新的地图(地图是不可变的)我不必考虑很多的协调性 - 除了那些只有当前继续/运行循环的线程才能访问它(这就是为什么我发布JobDone消息到处理器而不是更改全球地图/字典或其他) – Carsten

+0

我建议你在那里放一些“printf”并且用代码来玩 - 总是帮助我在这些情况下(如果你运行testRun函数(你可以把它全部粘贴到F#交互中),你可以看到例如在任何任务完成之前调用quit,但函数只会在所有10个任务完成时返回) – Carsten

1

看看这个snippet上fssnip.net。这是您可以使用的通用作业处理器。

+0

几乎与我的解决方案相同 - 但我喜欢分成“退出”阶段。 – Carsten