2014-03-07 111 views
7

我真的很喜欢F#的async工作流程,但就我而言,它有一个严重问题:它不允许创建不应超过某个特定时间段的工作流。F#异步工作流程

使其更清晰,这里有一个简单的功能,我写我自己:

let withTimeout operation timeout = async { 
    try 
     return Some <| Async.RunSynchronously (operation, timeout) 
    with :? TimeoutException -> return None 
} 

即签名是

val withTimeout : operation:Async<'a> -> timeout:int -> Async<'a option> 

用法示例这里:

let op = async { 
    do! Async.Sleep(1000) 
    return 1 
} 
#time 
withTimeout op 2000 |> Async.RunSynchronously;; 
// Real: 00:00:01.116, CPU: 00:00:00.015, GC gen0: 0, gen1: 0, gen2: 0 
// val it : unit option = Some 1 
withTimeout op 2000 |> Async.RunSynchronously;; 
// Real: 00:00:01.004, CPU: 00:00:00.000, GC gen0: 0, gen1: 0, gen2: 0 
// val it : unit option = Some 1 
withTimeout op 500 |> Async.RunSynchronously;; 
// Real: 00:00:00.569, CPU: 00:00:00.000, GC gen0: 0, gen1: 0, gen2: 0 
// val it : unit option = None  

你可以看到,它按预期工作。这非常好,但它也有点尴尬,我不确定它是否会出现安全问题和其他问题。也许我正在重新发明轮子,并且有非常简洁的方式来编写这样的工作流程?

+1

这种方法的问题是,工作流将继续超时后执行。你应该看看是否使用取消,然后在超时期限之后用信号通知取消令牌。这需要您的工作流程中的各个阶段对取消通知做出响应。 – Lee

+0

执行'Async.RunSynchronously'时,如果底层的异步操作受I/O限制,则会阻止当前线程并失去效率。我想我还没有看到在野外的工作实现,但我记得使用一个丑陋的黑客来做到这一点,将异步comps转换为observables,然后合并它们。 – MisterMetaphor

+0

@MisterMetaphor这就是为什么我把'RunSynchronously'换成另一个'async'的原因。我想这将允许保持并发。但这很丑,我同意。我更关心@李的话 - 我并没有真正杀死正在运行的任务。但目前我不知道如何解决这个问题 – Rustam

回答

4

请参阅的执行this,这应该表现得类似于Task.WhenAny

通过使用它,你同样可以实现withTimeout它是如何为Taskhere实现:

let withTimeout dueTime comp = 
    let success = async { 
     let! x = comp 
     return (Some x) 
    } 
    let timeout = async { 
     do! Async.Delay(dueTime) 
     return None 
    } 
    Async.WhenAny(success, timeout) 

我不相信,当第一个完成,将取消其他的计算,但至少这个实现不会不必要地阻塞一个线程。

+0

+1感谢您的考虑。我会考虑如何扩展它以照顾取消 – Rustam

7

UPD:此刻的最佳选择是Vesa A.J.K这里提出:https://stackoverflow.com/a/26230245/1554463。随着我的编辑的那样:

type Async with 
    static member WithTimeout (timeout : int option) operation = 
     match timeout with 
     | Some time when time > 0 -> 
      async { 
       let! child = Async.StartChild (operation, time) 
       try 
        let! result = child 
        return Some result 
       with :? TimeoutException -> return None 
      } 
     | _ -> 
      async { 
       let! result = operation 
       return Some result 
      } 

这里的另一种选择:

type Async with 
    static member WithCancellation (token:CancellationToken) operation = 
     async { 
      try 
       let task = Async.StartAsTask (operation, cancellationToken = token) 
       task.Wait() 
       return Some task.Result 
      with 
       | :? TaskCanceledException -> return None 
       | :? AggregateException -> return None 
     } 

    static member WithTimeout (timeout:int option) operation = 
     match timeout with 
     | Some(time) -> 
      async { 
       use tokenSource = new CancellationTokenSource (time) 
       return! operation |> Async.WithCancellation tokenSource.Token 
      } 

     | _ -> 
      async { 
       let! res = operation 
       return Some res 
      } 

这里我使用的.Net任务和CancellationToken

3

只需使用Async.StartChild : computation:Async<'T> * ?millisecondsTimeout:int -> Async<Async<'T>>

let with_timeout timeout action = 
    async { 
    let! child = Async.StartChild(action, timeout) 
    return! child 
    }