2013-01-31 61 views
11

这是我的用例。一起使用Spring @Scheduled和@Async

传统系统更新数据库队列表QUEUE。

我希望计划定期作业 - 检查队列 的内容 - 如果有表中的行锁定该行并做了一些工作 - 删除队列中排

如果以前的工作仍在运行,那么将创建一个新线程来完成这项工作。我想配置最大并发线程数。

我使用Spring 3和我目前的解决办法是以下(使用1毫秒的固定利率,以获得线程基本上连续运行)

@Scheduled(fixedRate = 1) 
@Async 
public void doSchedule() throws InterruptedException { 
    log.debug("Start schedule"); 
    publishWorker.start(); 
    log.debug("End schedule"); 
} 

<task:executor id="workerExecutor" pool-size="4" /> 

这创造了4个线程直客做和线程正确分享队列中的工作量。但是,当线程需要很长时间才能完成时,我似乎正在获取内存泄漏。

java.util.concurrent.ThreadPoolExecutor @ 0xe097b8f0        |    80 | 373,410,496 |  89.74% 
|- java.util.concurrent.LinkedBlockingQueue @ 0xe097b940       |   48 | 373,410,136 |  89.74% 
| |- java.util.concurrent.LinkedBlockingQueue$Node @ 0xe25c9d68 

所以

1:我应该使用@Async和@Scheduled在一起吗?

2:如果不是那么我怎么才能用弹簧来达到我的要求?

3:仅当其他线程忙时才能创建新线程?

谢谢大家!

编辑:我觉得作业队列渐渐无限长的......现在,使用

<task:executor id="workerExecutor" 
    pool-size="1-4" 
    queue-capacity="10" rejection-policy="DISCARD" /> 

将报告与结果

+4

这岂不是没有'@ Async'正常工作?无论如何,用'@ Scheduled'注解的方法应该是异步执行的。 – ach

+0

如果你想让“线程连续运行”,那么你应该不会真的在首先使用@Scheduled。它的用途是“预定”的活动,而不是连续的活动...... – JoeG

+0

你可能会考虑制作publishWorker.start();方法异步。 –

回答

0
//using a fixedRate of 1 millisecond to get the threads to run basically continuously 
@Scheduled(fixedRate = 1) 

当您使用@Scheduled一个新的线程将被创建并将在1毫秒的指定fixedRate处调用方法doSchedule。当你运行你的应用程序时,你已经可以看到4个线程正在竞争QUEUE表,并且可能会出现死锁。

调查线程转储是否存在死锁。 http://helpx.adobe.com/cq/kb/TakeThreadDump.html

@Async注释在这里没有任何用处。

更好的实现方法是通过实现runnable并将您的类传递给具有所需线程数的TaskExecutor来创建类作为线程。

Using Spring threading and TaskExecutor, how do I know when a thread is finished?

还要检查你的设计似乎并没有被正确地处理同步。如果之前的作业正在运行并且在该行上持有锁,那么您创建的下一个作业仍然会看到该行,并将等待获取该行的锁定。

2

您可以尝试

  1. 运行调度与一秒的延迟,这将锁定&获取的未到目前为止锁定所有 队列记录。
  2. 对于每条记录,调用一个Async方法,它会处理那条记录,&将其删除。
  3. 执行程序的拒绝策略应为ABORT,以便调度程序可以解锁尚未发出的QUEUE。这样调度器可以在下次运行中再次处理这些QUEUE。

当然,您将不得不处理场景,调度程序已锁定QUEUE,但处理程序没有完成处理它,无论出于何种原因。

伪代码:

public class QueueScheduler { 
    @AutoWired 
    private QueueHandler queueHandler; 

    @Scheduled(fixedDelay = 1000) 
    public void doSchedule() throws InterruptedException { 
     log.debug("Start schedule"); 
     List<Long> queueIds = lockAndFetchAllUnlockedQueues(); 
     for (long id : queueIds) 
      queueHandler.process(id); 
     log.debug("End schedule"); 
    } 
} 

public class QueueHandler { 

    @Async 
    public void process(long queueId) { 
     // process the QUEUE & delete it from DB 
    } 
} 
<task:executor id="workerExecutor" pool-size="1-4" queue-capcity="10" 
    rejection-policy="ABORT"/>