2013-01-21 61 views
1

我有大量的状态机。偶尔,状态机将需要从一个状态移动到另一个状态,这可能很便宜或昂贵,并且可能涉及DB读取和写入等。从很多队列中消耗

这些状态更改是由于来自客户端的传入命令而发生的,并且可能随时发生。

我想平行工作量。我想要一个队列说'把这台机器从这个状态移到这个状态'。显然,任何一台机器的命令都需要按顺序执行,但如果我有很多线程,我可以并行向前移动许多机器。

我可以为每个状态机器设置一个线程,但状态机的数量是依赖于数据的,可能有数百或数千个;我不想每个状态机都有一个专用的线程,我想要一个某种类型的池。

我怎样才能有一个工作者池,但确保严格按顺序处理每个状态机的命令?

UPDATE:所以想象Machine实例有优秀的命令列表。当线程池中的执行程序完成使用命令时,如果它具有更多未完成的命令,它会将Machine放回到线程池的任务队列中。所以问题是,如何在追加第一个命令时自动将Machine加入到线程池中?并确保这是所有线程安全?

+0

看看[这篇文章](http://java.dzone.com/articles/ensuring-order-execution-tasks)。也许它有帮助。 –

回答

2

我建议你这样的场景:

  1. 创建线程池,可能是一些固定的大小与Executors.newFixedThreadPool
  2. 创建一些(可能这将是一个HashMap),它适用于每个状态机一个Semaphore。这信号灯将具有1的值,将公平信号灯保持序列
  3. 在Runnable接口,这将做的工作对乞讨只需添加semaphore.aquire()其状态机和semaphore.release()的信号在运行方法的末尾。

随着线程池的大小,您将控制并行度的级别。

2

我建议另一种方法。使用线程池来移动状态机中的状态,而不是使用线程池来处理所有事情,包括执行工作的。在做了一些导致状态改变的工作之后,应该将状态改变事件添加到队列中。处理状态更改后,应该将另一个do-work事件添加到队列中。

假设状态转换是工作驱动的,反之亦然,无法进行连续处理。

将信号存储在特殊映射中的想法非常危险。地图必须被同步(添加/删除objs是线程不安全的),并且执行搜索(可能在地图上同步)然后使用信号量的开销相对较大。

此外 - 如果您想在您的应用程序中使用多线程架构,我认为您应该一路走下去。混合不同的体系结构可能会在以后出现麻烦。

+0

非常有效的一点;对不起,我不清楚,这些状态变化是外部触发的,我想在内部排队。 – Will

1

每台机器都有一个线程ID。产生期望数量的线程。让所有线程贪婪地处理来自全局队列的消息。每个线程都会锁定当前消息的服务器,以独占方式使用它(直到它完成处理当前消息和其队列中的所有消息),并且其他线程将该服务器的消息放入其内部队列中。

编辑:处理消息的伪代码:

void handle(message) 
    targetMachine = message.targetMachine 
    if (targetMachine.thread != null) 
    targetMachine.thread.addToQueue(message); 
    else 
    targetMachine.thread = this; 
    process(message); 
    processAllQueueMessages(); 
    targetMachine.thread = null; 

处理消息的Java代码:(我可能会稍微过于复杂的事情,但是这应该是线程安全的)

/* class ThreadClass */ 
void handle(Message message) 
{ 
    // get targetMachine from message 
    targetMachine.mutexInc.aquire(); // blocking 
    targetMachine.messages++; 
    boolean acquired = targetMachine.mutex.aquire(); // non-blocking 
    if (acquired) 
    targetMachine.threadID = this.ID; 
    targetMachine.mutexInc.release(); 
    if (!acquired) 
    // can put this before release, it may speed things up 
    threads[targetMachine.threadID].addToQueue(message); 
    else 
    { 
    process(message); 
    targetMachine.messages--; 
    while (true) 
    { 
     while (!queue.empty()) 
     { 
     process(queue.pop()); 
     targetMachine.messages--; 
     } 
     targetMachine.mutexInc.acquire(); // blocking 
     if (targetMachine.messages > 0) 
     { 
     targetMachine.mutexInc.release(); 
     Thread.sleep(1); 
     } 
     else 
     break; 
    } 
    targetMachine.mutex.release(); 
    targetMachine.mutexInc.release(); 
    } 
}