2010-06-07 105 views
3

在Java中,为了编写一个向服务器发送请求的库,我通常实现某种调度器(与Twitter4J库中发现的调度器类似:http://github.com/yusuke/twitter4j/blob/master/twitter4j-core/src/main/java/twitter4j/internal/async/DispatcherImpl.java),以限制连接数量,执行异步任务等。演员“队列”?

这个想法是创建了N个线程。 “任务”排队并通知所有线程,其中一个线程在准备就绪时会从队列中弹出一个项目,完成工作,然后返回到等待状态。如果所有线程都忙于处理任务,那么该任务只是排队,而下一个可用线程将采用它。

这将连接的最大数量保持为N,并且允许最多N个任务同时运行。

我想知道可以用Actor来创建什么样的系统来完成同样的事情?有没有一种方法可以有N个Actor,当一条新消息准备好时,将它传递给一个Actor来处理 - 如果所有Actor都很忙,只需排队消息?

+1

您所描述的是一个线程池,因为Java 5在标准库中,请参阅package java.util.concurrent(类ThreadPoolExecutor)。 – Jesper 2010-06-09 12:19:36

回答

4

Akka Framework旨在解决这样的问题,是你在寻找什么。

看看这个docu - 有很多高度可配置的dispather(基于事件的,基于线程的,负载平衡的,工作窃取等)管理参与者邮箱,并允许他们协同工作。你也可能会发现有趣的this blog post

E.g.此代码实例基于固定的线程池新的工作窃取调度,即满足其监督行为者之间的负载平衡:使用调度

val workStealingDispatcher = Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("pooled-dispatcher") 
    workStealingDispatcher 
    .withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity 
    .setCorePoolSize(16) 
    .buildThreadPool 

演员:

class MyActor extends Actor { 

    messageDispatcher = workStealingDispatcher 

    def receive = { 
     case _ => 
    } 
    } 

现在,如果你开始2+的实例,调度员将平衡演员的邮箱(队列)之间的负载(在邮箱中有太多消息的演员将“捐献”一些给没有任何事情的演员)。

+0

我认为你可以用Scala actor来做同样的事情来覆盖调度方法 – IttayD 2010-06-08 13:06:11

1

那么,你必须看到演员调度程序,因为演员通常不是与线程1对1。演员背后的想法是,你可能有很多,但实际的线程数量将被限制在合理的范围内。他们也不应该长时间运行,而是应该尽快回复他们收到的消息。简言之,该代码的架构似乎完全与设计演员系统的方式不一致。

不过,每个工作的演员可能会向队列演员发送一条消息,要求下一个任务,然后循环回复以作出反应。该Queue actor将收到排队消息或出队消息。它可以设计成这样的:

val q: Queue[AnyRef] = new Queue[AnyRef] 
loop { 
    react { 
    case Enqueue(d) => q enqueue d 
    case Dequeue(a) if q.nonEmpty => a ! (q dequeue) 
    } 
}