2010-04-15 89 views
4

我想写运行F#脚本(.fsx)的序列一些代码。事情是,我能有数以百计的脚本,如果我这样做:需要帮助有关异步和FSI

let shellExecute program args = 
    let startInfo = new ProcessStartInfo() 
    do startInfo.FileName  <- program 
    do startInfo.Arguments  <- args 
    do startInfo.UseShellExecute <- true 
    do startInfo.WindowStyle  <- ProcessWindowStyle.Hidden 

    //do printfn "%s" startInfo.Arguments 
    let proc = Process.Start(startInfo) 
    () 

scripts 
|> Seq.iter (shellExecute "fsi") 

可能压力太大我的2GB系统。无论如何,我想用一批n个,这也似乎是一个很好的锻炼学习Async(我想这是要走的路)运行的脚本。

我已经开始写一些代码,但遗憾的是它不工作:

open System.Diagnostics 

let p = shellExecute "fsi" @"C:\Users\Stringer\foo.fsx" 

async { 
    let! exit = Async.AwaitEvent p.Exited 
    do printfn "process has exited" 
} 
|> Async.StartImmediate 

foo.fsx只是一个Hello World脚本。 解决这个问题最常用的方式是什么?

我想也弄清楚它是否是可行的检索返回代码为每个执行的脚本,如果没有,找到另一种方式。谢谢!

编辑:

非常感谢您的见解和链接!我学到了很多。 我只是想添加一些代码使用Async.Parallel作为托马斯建议它运行在平行批次的。请评论我的cut函数是否有更好的实现。

module Seq = 
    /// Returns a sequence of sequences of N elements from the source sequence. 
    /// If the length of the source sequence is not a multiple 
    /// of N, last element of the returned sequence will have a length 
    /// included between 1 and N-1. 
    let cut (count : int) (source : seq<´T>) = 
    let rec aux s length = seq { 
     if (length < count) then yield s 
     else 
     yield Seq.take count s 
     if (length <> count) then 
      yield! aux (Seq.skip count s) (length - count) 
     } 
    aux source (Seq.length source) 

let batchCount = 2 
let filesPerBatch = 
    let q = (scripts.Length/batchCount) 
    q + if scripts.Length % batchCount = 0 then 0 else 1 

let batchs = 
    scripts 
    |> Seq.cut filesPerBatch 
    |> Seq.map Seq.toList 
    |> Seq.map loop 

Async.RunSynchronously (Async.Parallel batchs) |> ignore 

EDIT2:

所以我有一些麻烦让托马斯的后卫代码工作。我猜f功能曾在AddHandler方法被调用,否则我们失去永远的事件......下面的代码:

module Event = 
    let guard f (e:IEvent<´Del, ´Args>) = 
    let e = Event.map id e 
    { new IEvent<´Args> with 
     member this.AddHandler(d) = e.AddHandler(d); f() //must call f here! 
     member this.RemoveHandler(d) = e.RemoveHandler(d); f() 
     member this.Subscribe(observer) = 
      let rm = e.Subscribe(observer) in f(); rm } 

(由托马斯提到),有趣的是,它看起来像Exited在进程终止时,即使该过程还没有开始EnableRaisingEvents设置为true事件被保存在某个地方。 当此属性最终设置为true时,事件将启动。

因为我不知道,这是官方的规格(也有点偏执),我发现了另一个解决方案,包括在开始在guard功能的过程中,所以我们确保代码将在任何情况下工作:

let createStartInfo program args = 
    new ProcessStartInfo 
    (FileName = program, Arguments = args, UseShellExecute = false, 
    WindowStyle = ProcessWindowStyle.Normal, 
    RedirectStandardOutput = true) 

let createProcess info = 
    let p = new Process() 
    do p.StartInfo   <- info 
    do p.EnableRaisingEvents <- true 
    p 

let rec loop scripts = async { 
    match scripts with 
    | [] -> printfn "FINISHED" 
    | script::scripts -> 
    let args = sprintf "\"%s\"" script 
    let p = createStartInfo "notepad" args |> createProcess 
    let! exit = 
     p.Exited 
     |> Event.guard (fun() -> p.Start() |> ignore) 
     |> Async.AwaitEvent 
    let output = p.StandardOutput.ReadToEnd() 
    do printfn "\nPROCESSED: %s, CODE: %d, OUTPUT: %A"script p.ExitCode output 
    return! loop scripts 
    } 

通知我已经NOTEPAD.EXE更换fsi.exe这样我就可以重放不同的场景一步在调试器中的步骤和控制明确的处理自己的出口。

+0

您应该删除f()的从该行: 构件this.RemoveHandler(d)= e.RemoveHandler(d); f() 否则当事件处理程序被删除时,p.Start()将被调用。 – Oenotria 2012-02-26 06:18:49

回答

6

我做了一些实验,这里是处理在下面我的职务,并在乔尔的答案评论中讨论的问题的一种方法(我认为目前没有工作,但可能是固定的)。

认为Process的规范是,它可以触发事件Exited后我们设置EnableRaisingEvents属性true(并会触发事件,即使该过程已经完成之前我们设定的属性)。为了正确处理这种情况,我们需要在之后启用事件,我们将处理程序附加到Exited事件。

这是一个问题,因为如果我们使用AwaitEvent它会阻止工作流程直到事件触发。从工作流程中调用AwaitEvent后,我们无法执行任何操作(如果我们在调用AwaitEvent之前设置属性,那么我们就会得到一场比赛......)。 Vladimir's approach是正确的,但我认为有一个更简单的方法来处理这个问题。

我将创建一个功能Event.guard采取一个事件,并返回一个事件,它允许我们指定将执行后处理程序附加到事件的一些功能。这意味着如果我们在这个函数内部进行一些操作(这反过来触发事件),事件将被处理。

要将其用于此处讨论的问题,我们需要更改我的原始解决方案,如下所示。首先,shellExecute函数不能设置EnableRaisingEvents属性(否则,我们可能会失去事件!)。其次,等待的代码应该是这样的:

let rec loop scripts = async { 
    match scripts with 
    | [] -> printf "FINISHED" 
    | script::scripts -> 
    let p = shellExecute fsi script 
    let! exit = 
     p.Exited 
     |> Event.guard (fun() -> p.EnableRaisingEvents <- true) 
     |> Async.AwaitEvent 
    let output = p.StandardOutput.ReadToEnd() 
    return! loop scripts } 

注意使用Event.guard功能。粗略地说,它表示在工作流将处理程序附加到p.Exited事件后,所提供的lambda函数将运行(并且将启用事件引发)。但是,我们已经将处理程序附加到了事件中,所以如果这会立即引发事件,那我们很好!

实施办法(两者EventObservable)看起来是这样的:

module Event = 
    let guard f (e:IEvent<'Del, 'Args>) = 
    let e = Event.map id e 
    { new IEvent<'Args> with 
     member x.AddHandler(d) = e.AddHandler(d) 
     member x.RemoveHandler(d) = e.RemoveHandler(d); f() 
     member x.Subscribe(observer) = 
      let rm = e.Subscribe(observer) in f(); rm } 

module Observable = 
    let guard f (e:IObservable<'Args>) = 
    { new IObservable<'Args> with 
     member x.Subscribe(observer) = 
      let rm = e.Subscribe(observer) in f(); rm } 

好处是,这个代码是非常简单的。

+0

非常好!你介意在f()'e.Subscribe(observer)中的'in'关键字的意义吗?它看起来好像与冗长的语法有关,但我不熟悉“函数调用中的函数调用”用法。 – 2010-04-17 19:53:31

+0

当然。在详细语法中,你总是在'let'的初始化表达式之后写入'in'(例如'printf中的let a = 1 ...“)。如果你使用_light_语法,编译器_thinks_它在那里,如果你用换行符结束'let'。如果你想在单行上写'let'后跟其他东西(在我的例子中,为了使代码更简洁),你需要在'let'后加'in'(和正常表达后的';') 。 – 2010-04-17 23:05:42

+1

小(但重要)错字:f()调用应该在AddHandler()方法而不是RemoveHandler()上。 – Oenotria 2012-02-26 06:20:45

5

你的方法看起来不错给我,我真的很喜欢使用AwaitEvent嵌入过程执行到异步工作流的想法!

可能的原因,为什么它没有工作,是你需要的,如果你希望它永远触发Exited事件(不要问我,为什么你要做到这一点,它的ProcessEnableRisingEvents属性设置为true听起来很愚蠢!)无论如何,我做了几个其他更改您的代码进行测试时,所以这里是为我工作的一个版本:

open System 
open System.Diagnostics 

let shellExecute program args = 
    // Configure process to redirect output (so that we can read it) 
    let startInfo = 
    new ProcessStartInfo 
     (FileName = program, Arguments = args, UseShellExecute = false, 
     WindowStyle = ProcessWindowStyle.Hidden, 
     RedirectStandardOutput = true) 

    // Start the process 
    // Note: We must enable rising events explicitly here! 
    Process.Start(startInfo, EnableRaisingEvents = true) 

最重要的是,现在的代码集EnableRaisingEventstrue。我还更改了代码,以便在构建对象时指定对象的属性(以使代码更简洁一些),并更改了一些属性,以便我可以读取输出(RedirectStandardOutput)。

现在,我们可以使用AwaitEvent方法来等待进程完成。我假设fsi包含fsi.exe的路径,而scripts是FSX脚本的列表。如果要依次运行它们,可以使用使用递归环路中实现:

let rec loop scripts = async { 
    match scripts with 
    | [] -> printf "FINISHED" 
    | script::scripts -> 
    // Start the proces in background 
    let p = shellExecute fsi script 
    // Wait until the process completes 
    let! exit = Async.AwaitEvent p.Exited 
    // Read the output produced by the process, the exit code 
    // is available in the `ExitCode` property of `Process` 
    let output = p.StandardOutput.ReadToEnd() 
    printfn "\nPROCESSED: %s, CODE: %d\n%A" script p.ExitCode output 
    // Process the rest of the scripts 
    return! loop scripts } 

// This starts the workflow on background thread, so that we can 
// do other things in the meantime. You need to add `ReadLine`, so that 
// the console application doesn't quit immedeiately 
loop scripts |> Async.Start 
Console.ReadLine() |> ignore  

当然,也可以运行过程并行(或例如运行2组其中的并行等)。为了这样做,你会使用Async.Parallel(以通常的方式)。

无论如何,这是一个非常好的使用异步工作流程的例子,目前我还没有看到它们。非常有趣:-)

+2

很好的答案。在调用'Async.AwaitEvent'(意味着在添加侦听器之后不会引发该事件)之前,该进程会退出吗? – kvb 2010-04-15 22:21:09

+3

是的,这是一场比赛,请参阅http://v2matveev.blogspot.com/2010/02/event-based-async-pattern-in-f.html – Brian 2010-04-15 22:38:18

+0

好点......可悲的是让事情变得更加复杂:-(应该有某种方式在启动过程之前创建事件 - 但无论我们做什么,如果它在后台线程触发,我们可能会失去事件 - 它们至少保证在我们设置“EnableRaisingEvents”之前它不会完成?虽然,即使有了这些,我们仍然需要类似Concurrent ML中的'guard'组合器(请参阅http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.50.7965&rep=rep1&type=pdf )。我想这是一个同步单线程方法的+1。 – 2010-04-16 01:13:14

1

邮箱处理器怎么样?

+0

听起来不错,但我更喜欢坚持异步工作流第一次sinc e我认为它们可以在我的F#代码中比MailboxProcessor更广泛地使用。另外,在我看到的MailboxProcessor F#代码中经常会有异步代码。 – Stringer 2010-04-18 22:44:14

3

回应托马斯的回答,这是否会成为启动流程中涉及的竞争条件的可行解决方案,然后订阅其退出事件?

type Process with 
    static member AsyncStart psi = 
     let proc = new Process(StartInfo = psi, EnableRaisingEvents = true) 
     let asyncExit = Async.AwaitEvent proc.Exited 
     async { 
      proc.Start() |> ignore 
      let! args = asyncExit 
      return proc 
     } 

除非我记错了,这将事先订阅事件来启动进程,并将其打包都归结为一个Async<Process>结果。

这将允许你重写代码的其余部分是这样的:

let shellExecute program args = 
    // Configure process to redirect output (so that we can read it) 
    let startInfo = 
    new ProcessStartInfo(FileName = program, Arguments = args, 
     UseShellExecute = false, 
     WindowStyle = ProcessWindowStyle.Hidden, 
     RedirectStandardOutput = true) 

    // Start the process 
    Process.AsyncStart(startInfo) 

let fsi = "PATH TO FSI.EXE" 

let rec loop scripts = async { 
    match scripts with 
    | [] -> printf "FINISHED" 
    | script::scripts -> 
     // Start the proces in background 
     use! p = shellExecute fsi script 
     // Read the output produced by the process, the exit code 
     // is available in the `ExitCode` property of `Process` 
     let output = p.StandardOutput.ReadToEnd() 
     printfn "\nPROCESSED: %s, CODE: %d\n%A" script p.ExitCode output 
     // Process the rest of the scripts 
     return! loop scripts 
} 

如果没有工作,它肯定是少了很多码担心比弗拉基米尔Async.GetSubject

+2

我认为这不起作用,但这种方法看起来很有前途。您在工作流之外运行的代码只会创建一个异步工作流程('asyncExit'),一旦工作流程启动(在'let!'上),它将把事件处理程序附加到事件中,因此它的行为与原始版本相同。为了解决这个问题,你需要将处理程序附加到'proc.Exited'事件(在返回工作流之前),并处理事件在使用某个可变变量调用AwaitEvent之前触发的情况......我想我发现处理这个问题的另一种方法,所以我会发布它。 – 2010-04-17 13:27:42

+0

哦,这很令人失望。我希望获得一个基于AwaitEvent的异步工作流与订阅它是一样的。 – 2010-04-17 19:39:22

1

可以从blogpost简化主题的版本。而不是返回模仿的事件,getSubject可以返回工作流。

结果工作流本身就是状态机具有两种状态 1.事件不会触发尚未:所有悬而未决的听众应该注册 2.值已经设定,听者立即 服务。在代码会出现这样的:

type SubjectState<'T> = Listen of ('T -> unit) list | Value of 'T 

getSubject实现很简单

let getSubject (e : IEvent<_, _>) = 
    let state = ref (Listen []) 
    let switchState v = 
     let listeners = 
      lock state (fun() -> 
       match !state with 
       | Listen ls -> 
        state := Value v 
        ls 
       | _ -> failwith "Value is set twice" 
      ) 
     for l in listeners do l v 

    Async.StartWithContinuations(
     Async.AwaitEvent e, 
     switchState, 
     ignore, 
     ignore 
    ) 

Async.FromContinuations(fun (cont, _, _) -> 
    let ok, v = lock state (fun() -> 
     match !state with 
     | Listen ls -> 
      state := Listen (cont::ls) 
      false, Unchecked.defaultof<_> 
     | Value v -> 
      true, v 
     ) 
    if ok then cont v 
    )