2011-02-02 138 views
4

我试图找到一个关于如何使用TryScan的例子,但是没有找到任何帮助吗?如何在F#中正确使用TryScan

我想要做什么(很简单的例子):我有一个MailboxProcessor接受 两种类型的消息。

  • 第一个GetState返回当前状态。 GetState消息发送相当频繁

  • 其他UpdateState是非常昂贵(耗时) - 例如,从互联网上下载一些东西,然后相应地更新状态。 UpdateState只被调用很少。

我的问题是 - 消息GetState被封锁,等到前面的UpdateState供应。这就是为什么我试图使用TryScan来处理所有GetState消息,但没有运气。

我的示例代码:

type Msg = GetState of AsyncReplyChannel<int> | UpdateState 
let mbox = MailboxProcessor.Start(fun mbox -> 
      let rec loop state = async { 
       // this TryScan doesn't work as expected 
       // it should process GetState messages and then continue 
       mbox.TryScan(fun m -> 
        match m with 
        | GetState(chnl) -> 
         printfn "G processing TryScan" 
         chnl.Reply(state) 
         Some(async { return! loop state}) 
        | _ -> None 
       ) |> ignore 

       let! msg = mbox.Receive() 
       match msg with 
       | UpdateState -> 
        printfn "U processing" 
        // something very time consuming here... 
        async { do! Async.Sleep(1000) } |> Async.RunSynchronously 
        return! loop (state+1) 
       | GetState(chnl) -> 
        printfn "G processing" 
        chnl.Reply(state) 
        return! loop state 
      } 
      loop 0 
) 

[async { for i in 1..10 do 
      printfn " U" 
      mbox.Post(UpdateState) 
      async { do! Async.Sleep(200) } |> Async.RunSynchronously 
}; 
async { // wait some time so that several `UpdateState` messages are fired 
     async { do! Async.Sleep(500) } |> Async.RunSynchronously 
     for i in 1..20 do 
      printfn "G" 
      printfn "%d" (mbox.PostAndReply(GetState)) 
}] |> Async.Parallel |> Async.RunSynchronously 

如果您尝试运行代码,你会看到,GetState消息几乎没有处理,因为它等待结果。另一方面,UpdateState只是失火,从而阻止有效获取状态。

编辑

为我的作品目前的解决办法是这样的一个:

type Msg = GetState of AsyncReplyChannel<int> | UpdateState 
let mbox = MailboxProcessor.Start(fun mbox -> 
      let rec loop state = async { 
       // this TryScan doesn't work as expected 
       // it should process GetState messages and then continue 
       let! res = mbox.TryScan((function 
        | GetState(chnl) -> Some(async { 
          chnl.Reply(state) 
          return state 
         }) 
        | _ -> None 
       ), 5) 

       match res with 
       | None -> 
        let! msg = mbox.Receive() 
        match msg with 
         | UpdateState -> 
          async { do! Async.Sleep(1000) } |> Async.RunSynchronously 
          return! loop (state+1) 
         | _ -> return! loop state 
       | Some n -> return! loop n 
      } 
      loop 0 
) 

反应来评价:与其他MailboxProcessorThreadPool执行并行UpdateState的想法是伟大的,但我目前不需要它。 我只想处理所有GetState消息,然后处理其他消息。我不在乎在处理UpdateState时代理被阻止。

我会告诉你什么是对输出的问题:我不认为TryScan方法将帮助你在这种情况下

// GetState messages are delayed 500 ms - see do! Async.Sleep(500) 
// each UpdateState is sent after 200ms 
// each GetState is sent immediatelly! (not real example, but illustrates the problem) 
U   200ms <-- issue UpdateState 
U processing   <-- process UpdateState, it takes 1sec, so other 
U   200ms  5 requests are sent; sent means, that it is 
U   200ms  fire-and-forget message - it doesn't wait for any result 
          and therefore it can send every 200ms one UpdateState message 
G      <-- first GetState sent, but waiting for reply - so all 
          previous UpdateState messages have to be processed! = 3 seconds 
          and AFTER all the UpdateState messages are processed, result 
          is returned and new GetState can be sent. 
U   200ms 
U   200ms  because each UpdateState takes 1 second 
U   200ms 
U processing 
U 
U 
U 
U 
U processing 
G processing   <-- now first GetState is processed! so late? uh.. 
U processing   <-- takes 1sec 
3 
G 
U processing   <-- takes 1sec 
U processing   <-- takes 1sec 
U processing   <-- takes 1sec 
U processing   <-- takes 1sec 
U processing   <-- takes 1sec 
U processing   <-- takes 1sec 
G processing   <-- after MANY seconds, second GetState is processed! 
10 
G 
G processing 
// from this line, only GetState are issued and processed, because 
// there is no UpdateState message in the queue, neither it is sent 
+0

在TryScan调用之后,你有'|>忽略'的事实应该提醒你,你错误地使用了API。 (现在没有时间给出完整的答案,希望有人会打败我。) – Brian 2011-02-02 21:48:03

+0

我知道我以错误的方式使用它。但我还没有发现任何关于使用它的帖子。 – stej 2011-02-02 21:56:06

+0

我认为`TryScan`和`Scan`的意思是等待一条消息,如果没有收到消息则超时。两者之间的唯一区别在于,超时TryScan在Scan引发异常时返回一个选项。 – gradbot 2011-02-03 02:58:27

回答

4

。它允许您指定等待消息时使用的超时。一旦收到一条消息,它将开始处理消息(忽略超时)。

例如,如果你想等待一些具体消息,但执行一些其他的检查每一秒(在等待),你可以写:

let loop() = async { 
    let! res = mbox.TryScan(function 
    | ImportantMessage -> Some(async { 
      // process message 
      return 0 
     }) 
    | _ -> None) 
    match res with 
    | None -> 
     // perform some check & continue waiting 
     return! loop() 
    | Some n -> 
     // ImportantMessage was received and processed 
} 

你能做些什么,以避免阻塞邮箱处理器时,处理UpdateState消息?邮箱处理器(逻辑上)是单线程的 - 您可能不想取消处理UpdateState消息,所以最好的选择是在后台开始处理它,并等待处理完成。处理UpdateState的代码然后可以发送一些消息回邮箱(例如UpdateStateCompleted)。

这里是一个草图如何看起来:

let rec loop (state) = async { 
    let! msg = mbox.Receive() 
    match msg with 
    | GetState(repl) -> 
     repl.Reply(state) 
     return! scanning state 
    | UpdateState -> 
     async { 
     // complex calculation (runs in parallel) 
     mbox.Post(UpdateStateCompleted newState) } 
     |> Async.Start 
    | UpdateStateCompleted newState -> 
     // Received new state from background workflow 
     return! loop newState } 

现在,后台任务并行运行,你需要小心可变状态。另外,如果您发送的消息比您能处理的速度更快,您将遇到麻烦。这可以通过例如在处理前一个请求时忽略或排队请求来解决。

2

正如托马斯提到的MailboxProcessor是单线程的。您需要另一个MailboxProcessor才能在状态getter的单独线程上运行更新。

#nowarn "40" 

type Msg = 
    | GetState of AsyncReplyChannel<int> 
    | UpdateState 

let runner_UpdateState = MailboxProcessor.Start(fun mbox -> 
    let rec loop = async { 
     let! state = mbox.Receive() 
     printfn "U start processing %d" !state 
     // something very time consuming here... 
     do! Async.Sleep 100 
     printfn "U done processing %d" !state 
     state := !state + 1 
     do! loop 
    } 
    loop 
) 

let mbox = MailboxProcessor.Start(fun mbox -> 
    // we need a mutiple state if another thread can change it at any time 
    let state = ref 0 

    let rec loop = async { 
     let! msg = mbox.Receive() 

     match msg with 
     | UpdateState -> runner_UpdateState.Post state 
     | GetState chnl -> chnl.Reply !state 

     return! loop 
    } 
    loop) 

[ 
    async { 
     for i in 1..10 do 
      mbox.Post UpdateState 
      do! Async.Sleep 200 
    }; 
    async { 
     // wait some time so that several `UpdateState` messages are fired 
     do! Async.Sleep 1000 

     for i in 1..20 do 
      printfn "G %d" (mbox.PostAndReply GetState) 
      do! Async.Sleep 50 
    } 
] 
|> Async.Parallel 
|> Async.RunSynchronously 
|> ignore 

System.Console.ReadLine() |> ignore 

输出:

U start processing 0 
U done processing 0 
U start processing 1 
U done processing 1 
U start processing 2 
U done processing 2 
U start processing 3 
U done processing 3 
U start processing 4 
U done processing 4 
G 5 
U start processing 5 
G 5 
U done processing 5 
G 5 
G 6 
U start processing 6 
G 6 
G 6 
U done processing 6 
G 7 
U start processing 7 
G 7 
G 7 
U done processing 7 
G 8 
G U start processing 8 
8 
G 8 
U done processing 8 
G 9 
G 9 
U start processing 9 
G 9 
U done processing 9 
G 9 
G 10 
G 10 
G 10 
G 10 

你也可以使用线程池。

open System.Threading 

type Msg = 
    | GetState of AsyncReplyChannel<int> 
    | SetState of int 
    | UpdateState 

let mbox = MailboxProcessor.Start(fun mbox -> 
    let rec loop state = async { 
     let! msg = mbox.Receive() 

     match msg with 
     | UpdateState -> 
      ThreadPool.QueueUserWorkItem((fun obj -> 
       let state = obj :?> int 

       printfn "U start processing %d" state 
       Async.Sleep 100 |> Async.RunSynchronously 
       printfn "U done processing %d" state 
       mbox.Post(SetState(state + 1)) 

       ), state) 
      |> ignore 
     | GetState chnl -> 
      chnl.Reply state 
     | SetState newState -> 
      return! loop newState 
     return! loop state 
    } 
    loop 0) 

[ 
    async { 
     for i in 1..10 do 
      mbox.Post UpdateState 
      do! Async.Sleep 200 
    }; 
    async { 
     // wait some time so that several `UpdateState` messages are fired 
     do! Async.Sleep 1000 

     for i in 1..20 do 
      printfn "G %d" (mbox.PostAndReply GetState) 
      do! Async.Sleep 50 
    } 
] 
|> Async.Parallel 
|> Async.RunSynchronously 
|> ignore 

System.Console.ReadLine()|>忽略

3

不要使用TRYSCAN!

不幸的是,当前版本的F#中的TryScan函数在两种方式中被打破。首先,整个问题是指定一个超时,但实现并没有真正遵守它。具体来说,不相关的消息重置计时器。其次,与其他Scan函数一样,消息队列在一个锁定下进行检查,该锁定可以防止任何其他线程在扫描期间发布,这可能是一段任意长的时间。因此,TryScan函数本身倾向于锁定并发系统,甚至可能引入死锁,因为调用者的代码是在锁内部进行评估的(例如,从函数参数发布到ScanTryScan可能会在代码锁等待时等待代理程序死锁获取它已经在的锁)。

我在我的生产代码的早期原型中使用了TryScan,并没有造成任何问题。不过,我设法围绕它进行构建,所产生的架构实际上更好。实质上,我急切地使用我自己的本地队列Receive所有消息和过滤器。