我有一个使用的ProcessBuilder执行外部程序的演员:...我跑几百个并行运行的这种行为......
这是一些非常沉重的处理并行发生的只是在每种情况下实现几毫秒的工作。并行处理机制排列如下(从最差到最好在资源使用,可扩展性和性能方面):
- 过程=重量级
- 螺纹=中等重量(几十个线程可在一个执行单个进程空间)
- 演员=轻量(几十个行动者可以通过利用单个共享线程或多个共享线程)执行的
兼产卵许多工艺需要显著操作系统资源 - 过程创建和终止。在极端情况下,启动&最终进程的O/S开销可能会消耗比实际作业执行数百或更多的CPU和内存资源。这就是创建线程模型(以及更高效的actor模型)的原因。把你当前的处理看作是从你的极其可伸缩的角色中进行'类CGI'不可扩展的O/S压力处理 - 这是一种反模式。将一些操作系统强调为破坏点并不需要太多:这可能会发生。另外,如果正在读取的文件的大小非常大,则对于可扩展性和可靠性来说,限制在同一磁盘上同时读取文件的进程的数量是最好的。最多可以同时读取10个进程,我认为它可以达到100个。
Actor应该如何调用外部程序?
当然,如果您将myExecutable.sh中的逻辑转换为Scala,则根本不需要创建进程。实现可伸缩性,性能和可靠性会更直接。
假设这是不可能的/可取的,您应该限制创建的进程总数,并且您应该在一段时间内跨不同的参与者/请求重用它们。 (1)创建一个重用的进程池(比如说大小10)(2)如果所有进程都忙,则通过ProcessIO(3)创建与进程通信的参与者(比如说100)在进行处理的情况下,Actors阻塞直到一个变得可用为止是可以的。这个选项的问题:复杂性; 100名演员必须做工作来与流程池进行交互,而演员本身在流程成为瓶颈时几乎没有什么价值。 (1)创建有限数量的角色(比如说10)(2)让每个角色创建1个私有的长时间运行过程(即没有像这样的池)(3)让每个角色与/来自通过ProcessIO,阻止进程繁忙。问题:仍然不尽可能简单;演员与阻挡过程互动不佳。 (1)没有参与者,主线程中简单的for循环将获得与参与者(2)通过for循环创建有限数量的进程(10)(3)相同的好处,使用ProcessIO顺序地交互每个进程(如果忙 - 阻止或跳到下一次迭代)
有没有什么办法可以设置此进程的超时返回值,如果超过了重试次数?
确实有。演员最强大的特征之一是某些演员能够产生其他演员并扮演他们的主管(接收失败或超时消息,从中恢复/重启)。有了'本土scala演员',这是通过基本的编程完成的,可以生成自己的检查和超时消息。但我不会介绍,因为阿卡方法更强大,更简单。另外,下一个主要的Scala版本(2.11)将使用Akka作为受支持的actor模型,并使用'native scala actors'deprecated。
下面是一个编程超时/重启(未编译/测试)的Akka监督角色的示例。当然,这是没有用的,如果你有第三个解决方案选项去):
import scala.concurrent.duration._
import scala.collection.immutable.Set
class Supervisor extends Actor {
override val supervisorStrategy =
OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
case _: ArithmeticException => Resume // resumes (reuses) all child actors
case _: NullPointerException => Restart // restarts all child actors
case _: IllegalArgumentException => Stop // terminates this actor & all children
case _: Exception => Escalate // supervisor to receive exception
}
val worker = context.actorOf(Props[Worker]) // creates a supervised child actor
var pendingRequests = Set.empty[WorkerRequest]
def receive = {
case req: WorkRequest(sender, jobReq) =>
pendingRequests = pendingRequests + req
worker ! req
system.scheduler.scheduleOnce(10 seconds, self, WorkTimeout(req))
case resp: WorkResponse(req @ WorkRequest(sender, jobReq), jobResp) =>
pendingRequests = pendingRequests - req
sender ! resp
case timeout: WorkTimeout(req) =>
if (pendingRequests get req != None) {
// restart the unresponsive worker
worker restart
// resend all pending requests
pendingRequests foreach{ worker ! _ }
}
}
}
注意的一点是:这种方法来监督演员将无法克服的架构差&设计。如果您从合适的工艺/线程/演员设计开始,以满足您的要求,那么监督将会提高可靠性。但是如果你从糟糕的设计开始,那么使用从操作系统级故障恢复的“强力”恢复会加剧你的问题 - 使得过程可靠性变差甚至导致机器崩溃。
https://issues.scala-lang.org/browse/SI-8406? –