问题:我需要编写一个应用程序来处理数百个文件,每个文件需要几百MB和几秒钟才能完成。我已经使用Future[Report]
使用Future[Report]
创建了对象,但使用Executors.newFixedThreadPool()
创建了对象,但因为由ExecutorService.invokeAll()
返回的List[Future[Report]]
对象持有到每个进程使用的中间内存而导致内存不足错误。我通过在计算Report
值(每Report
只有几百行)而不是在call
方法(从接口Callable
)中进行计算之后,通过在处理器中返回来自处理器中本地方法的对象Report
解决了该问题。斯卡拉演员而不是爪哇期货
我想尝试用Scala Actors来解决这个问题。我创建了一个采用一系列作业(作业,结果和处理功能的参数化类型)的类,并将每个作业配置为一个可配置数量的Worker
实例(Actor
的子类)。代码如下。
问题:
我不知道,我的处理是正确的 。我不喜欢用
CountDownLatch
来延迟从调度程序返回结果。我宁愿写的调度员更“实用”的版本,不修改
jobsQueue
列表或workers
HashMap的,也许借用Clojure的尾递归loop
结构(我在其它使用的方法@tailrec def loop
Scala代码)。
我急切地等待Philipp Haller和Frank Sommers发表的"Actors in Scala"。
下面是代码:
package multi_worker
import scala.actors.Actor
import java.util.concurrent.CountDownLatch
object MultiWorker {
private val megabyte = 1024 * 1024
private val runtime = Runtime.getRuntime
}
class MultiWorker[A, B](jobs: List[A],
actorCount: Int)(process: (A) => B) {
import MultiWorker._
sealed abstract class Message
// Dispatcher -> Worker: Run this job and report results
case class Process(job: A) extends Message
// Worker -> Dispatcher: Result of processing
case class ReportResult(id: Int, result: B) extends Message
// Worker -> Dispatcher: I need work -- send me a job
case class SendJob(id: Int) extends Message
// Worker -> Dispatcher: I have stopped as requested
case class Stopped(id: Int) extends Message
// Dispatcher -> Worker: Stop working -- all jobs done
case class StopWorking extends Message
/**
* A simple logger that can be sent text messages that will be written to the
* console. Used so that messages from the actors do not step on each other.
*/
object Logger
extends Actor {
def act() {
loop {
react {
case text: String => println(text)
case StopWorking => exit()
}
}
}
}
Logger.start()
/**
* A worker actor that will process jobs and return results to the
* dispatcher.
*/
class Worker(id: Int)
extends Actor{
def act() {
// Ask the dispatcher for an initial job
dispatcher ! SendJob(id)
loop {
react {
case Process(job) =>
val startTime = System.nanoTime
dispatcher ! ReportResult(id, process(job))
val endTime = System.nanoTime
val totalMemory = (runtime.totalMemory/megabyte)
val usedMemory = totalMemory - (runtime.freeMemory/megabyte)
val message = "Finished job " + job + " in " +
((endTime - startTime)/1000000000.0) +
" seconds using " + usedMemory +
"MB out of total " + totalMemory + "MB"
Logger ! message
dispatcher ! SendJob(id)
case StopWorking =>
dispatcher ! Stopped(id)
exit()
}
}
}
}
val latch = new CountDownLatch(1)
var res = List.empty[B]
/**
* The job dispatcher that sends jobs to the worker until the job queue
* (jobs: TraversableOnce[A]) is empty. It then tells the workers to
* stop working and returns the List[B] results to the caller.
*/
val dispatcher = new Actor {
def act() {
var jobQueue = jobs
var workers = (0 until actorCount).map(id => (id, new Worker(id))).toMap
workers.values.foreach(_.start())
loop {
react {
case ReportResult(id, result) =>
res = result :: res
if (jobQueue.isEmpty && workers.isEmpty) {
latch.countDown()
exit()
}
case SendJob(id) =>
if (!jobQueue.isEmpty) {
workers(id) ! Process(jobQueue.head)
jobQueue = jobQueue.tail
}
case Stopped(id) =>
workers = workers - id
}
}
}
}
dispatcher.start()
/**
* Get the results of the processing -- wait for the dispatcher to finish
* before returning.
*/
def results: List[B] = {
latch.await()
res
}
}
谢谢!我会仔细看看你的代码。 – Ralph 2011-04-21 14:35:43
太棒了!我喜欢'Channel'的招数, – 2011-04-21 15:22:42
可爱的代码。我会分解它 - 提取方法等 - 为了可读性,但这个概念非常好。 – 2011-04-22 21:26:50