2014-02-20 65 views
3

我有一个使用的ProcessBuilder执行外部程序的演员:斯卡拉系统进程挂起

def act { 
    while (true) { 
     receive { 
     case param: String => { 
      val filePaths = Seq("/tmp/file1","/tmp/file2") 
      val fileList = new ByteArrayInputStream(filePaths.mkString("\n").getBytes()) 
      val output = s"myExecutable.sh ${param}" #< fileList !!< 

      doSomethingWith(output) 
     } 
     } 
    } 
    } 

我跑几百个并行运行的这种行为。有时,由于未知原因,该进程(!!)的执行永远不会返回。它永远挂起。这个特定的actor不能处理新的消息。有什么办法可以设置这个过程返回的超时时间,如果超过了重试时间?

这些执行死刑的原因是什么?因为这些命令不应该持续超过几个毫秒。

编辑1:

  1. 在Max OS X不会发生此问题,仅在Linux中
  2. 当我不使用ByteArrayInputStream的输入为: 两个,我观察到的重要事实执行,程序不会挂起
+0

https://issues.scala-lang.org/browse/SI-8406? –

回答

0

你能不能在这个过程中触发它的处理,并且在将来使用它的定时等待?

+0

我试图做到这一点,理论上它应该工作。问题是,我将需要每个未来的额外线程。这在我的应用程序中导致另一个问题我为此创建了一个问题:http://stackoverflow.com/questions/22187186/scala-futures-creating-infinity-threads-in-actors –

0

我不认为我们可以知道我知道myExecutable.sh或doSomethingWith。

挂起时,请尝试查杀所有myExecutable.sh进程。

  • 如果有帮助,你应该检查myExecutable.sh。
  • 如果没有帮助,你应该检查doSomethingWith函数。
+0

可执行文件是echoprint-codegen(https://github.com/echonest/ echoprint-代码生成)。有两种工作方式: 1)接收文件作为参数并处理文件 2)接收文件列表作为输入流并处理它们。如编辑1中所述,第一种方法不会挂起。第二个挂起(很少,但确实如此)。 –

+0

也许我误解了你使用ByteArrayInputStream的意思。我以为你给外部过程没有输入,这基本上可以意味着外部过程可能会有不同的表现。这样做既不能确认也不能否认外部过程中的问题。如果你的意思是因为不使用BAIS而有所不同,那你究竟意味着什么? – v6ak

+0

有两种执行外部进程的方式,两种方式的结果相同:一种带有文件名参数,另一种带有输入流,带有单个文件名列表(外部程序实际上接受文件名列表,但在我的情况下,我只想处理一个。 –

2

我有一个使用的ProcessBuilder执行外部程序的演员:...我跑几百个并行运行的这种行为......

这是一些非常沉重的处理并行发生的只是在每种情况下实现几毫秒的工作。并行处理机制排列如下(从最差到最好在资源使用,可扩展性和性能方面):

  1. 过程=重量级
  2. 螺纹=中等重量(几十个线程可在一个执行单个进程空间)
  3. 演员=轻量(几十个行动者可以通过利用单个共享线程或多个共享线程)执行的

兼产卵许多工艺需要显著操作系统资源 - 过程创建和终止。在极端情况下,启动&最终进程的O/S开销可能会消耗比实际作业执行数百或更多的CPU和内存资源。这就是创建线程模型(以及更高效的actor模型)的原因。把你当前的处理看作是从你的极其可伸缩的角色中进行'类CGI'不可扩展的O/S压力处理 - 这是一种反模式。将一些操作系统强调为破坏点并不需要太多:这可能会发生。另外,如果正在读取的文件的大小非常大,则对于可扩展性和可靠性来说,限制在同一磁盘上同时读取文件的进程的数量是最好的。最多可以同时读取10个进程,我认为它可以达到100个。

Actor应该如何调用外部程序?

当然,如果您将myExecutable.sh中的逻辑转换为Scala,则根本不需要创建进程。实现可伸缩性,性能和可靠性会更直接。

假设这是不可能的/可取的,您应该限制创建的进程总数,并且您应该在一段时间内跨不同的参与者/请求重用它们。 (1)创建一个重用的进程池(比如说大小10)(2)如果所有进程都忙,则通过ProcessIO(3)创建与进程通信的参与者(比如说100)在进行处理的情况下,Actors阻塞直到一个变得可用为止是可以的。这个选项的问题:复杂性; 100名演员必须做工作来与流程池进行交互,而演员本身在流程成为瓶颈时几乎没有什么价值。 (1)创建有限数量的角色(比如说10)(2)让每个角色创建1个私有的长时间运行过程(即没有像这样的池)(3)让每个角色与/来自通过ProcessIO,阻止进程繁忙。问题:仍然不尽可能简单;演员与阻挡过程互动不佳。 (1)没有参与者,主线程中简单的for循环将获得与参与者(2)通过for循环创建有限数量的进程(10)(3)相同的好处,使用ProcessIO顺序地交互每个进程(如果忙 - 阻止或跳到下一次迭代)

有没有什么办法可以设置此进程的超时返回值,如果超过了重试次数?

确实有。演员最强大的特征之一是某些演员能够产生其他演员并扮演他们的主管(接收失败或超时消息,从中恢复/重启)。有了'本土scala演员',这是通过基本的编程完成的,可以生成自己的检查和超时消息。但我不会介绍,因为阿卡方法更强大,更简单。另外,下一个主要的Scala版本(2.11)将使用Akka作为受支持的actor模型,并使用'native scala actors'deprecated

下面是一个编程超时/重启(未编译/测试)的Akka监督角色的示例。当然,这是没有用的,如果你有第三个解决方案选项去):

import scala.concurrent.duration._ 
import scala.collection.immutable.Set 

class Supervisor extends Actor { 
    override val supervisorStrategy = 
    OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) { 
     case _: ArithmeticException => Resume  // resumes (reuses) all child actors 
     case _: NullPointerException => Restart // restarts all child actors 
     case _: IllegalArgumentException => Stop // terminates this actor & all children 
     case _: Exception => Escalate    // supervisor to receive exception 
    } 

    val worker = context.actorOf(Props[Worker]) // creates a supervised child actor 
    var pendingRequests = Set.empty[WorkerRequest] 

    def receive = { 
    case req: WorkRequest(sender, jobReq) => 
     pendingRequests = pendingRequests + req 
     worker ! req 
     system.scheduler.scheduleOnce(10 seconds, self, WorkTimeout(req)) 
    case resp: WorkResponse(req @ WorkRequest(sender, jobReq), jobResp) => 
     pendingRequests = pendingRequests - req 
     sender ! resp 
    case timeout: WorkTimeout(req) => 
     if (pendingRequests get req != None) { 
     // restart the unresponsive worker 
     worker restart 
     // resend all pending requests 
     pendingRequests foreach{ worker ! _ } 
     } 
    } 
} 

注意的一点是:这种方法来监督演员将无法克服的架构差&设计。如果您从合适的工艺/线程/演员设计开始,以满足您的要求,那么监督将会提高可靠性。但是如果你从糟糕的设计开始,那么使用从操作系统级故障恢复的“强力”恢复会加剧你的问题 - 使得过程可靠性变差甚至导致机器崩溃。

1

我没有足够的信息来重现问题,所以我无法准确诊断它,但是如果我在你的鞋子里,我会如何诊断它。基本的方法是鉴别诊断 - 确定可能的原因,并进行测试,证明或排除。

我要做的第一件事就是验证应用程序产生的myExecutable.sh进程实际上是终止的。

如果进程不是终止,那么这是问题的一部分,所以我们需要明白为什么。我们可以做的一件事是运行myExecutable.sh以外的东西。您建议ByteArrayInputStream可能是问题的一部分,这表明myExecutable.shstdin上输入错误。如果是这种情况,那么你可以运行一个脚本,将其输入简单地记录到一个文件中,这会显示出来。如果输入无效,那么ByteArrayInputStream出于某种原因提供了不良数据 - 线程安全和unicode是明显的罪魁祸首,但查看实际的不良数据应该给你一个线索。如果输入有效,那么这是myExecutable.sh中的一个错误。

如果进程终止,那么问题在别的地方。我的第一个猜测是,它要么与演员安排(演员图书馆通常使用ForkJoin执行,这很好,但不能很好地处理阻止代码)或scala.sys.process库中的错误(不会是前所未有的 - 我不得不放弃scala.sys.process从我正在从事的项目because of a memory leak)。

查看挂起线程的堆栈跟踪应该会给你一些线索(VisualVM是你的朋友),因为你应该能够看到正在等待的东西。然后您可以在OpenJDK或Scala标准库源代码中找到相关的代码。你从哪里出发取决于你的发现。