2012-10-22 31 views
0

我有如下设计:的Java的TimerTask应该等待一个阻塞队列

有延伸TimerTask任务,定为每分钟运行。 此任务将尝试从中央队列(作为单个用户)获取项目并将其表示写入文件。

此外,有多个生产者不时将物品放入中央队列中。

我感兴趣的是每次执行任务时间(run() method executed)将提取所有从队列中的项目,如果有项目,如果没有项目做任何事。

生产者如果已满,应睡在队列中。

我对这个问题的解决方案是:

创建ExtractTask延伸的TimerTask。 ExtractTask将包含一个BlockingQueue。 每个生产者将通过执行方法getQueue()来接收对队列实例的引用。 生产者将执行BlockingQueue.put()方法。 消费者将在run()中执行BlockingQueue.poll()方法。

你能提出一个更好的设计吗?我的设计是否包含任何有问题的情况?这种设计可能遇到的任何同步问题?

+0

你有多个消费者吗? –

+0

@Akram Berkawy:只有一个消费者 – Michael

回答

2

我想:

  • 保持在您的设计任务分开排队,
  • 注入队列,而不是进行查找,
  • 使用SchedulerService,而不是一个TimerTask

除了你已经得到它。

如果你愿意冒险依赖春天,你应该看看Spring Integration。你描述的所有组件都在那里。您还可以使用许多其他框架来解决问题,如骆驼或阿卡;我的主要观点是如果你没有必要,不要自己维护这些代码。

免责声明:我有点biased about Spring Integration

+0

+1,关于排队注射,我没有关注你,你想详细说明一下吗?我也对Spring集成有所偏见,尽管我对Spring比较陌生,但我很熟悉如何初始化这些对象,但是对于定时器功能我不太熟悉,不过很好的例子。 – Michael

+1

@迈克尔买我的书,里面满是例子:p。更严重的是,你可以看看https://github.com/SpringSource/spring-integration-samples – iwein

+0

谢谢,现在它似乎是最好的答案和一个很好的,我会等待一段时间,以获得更多答案,给机会一些更有趣的想法,然后我会接受最好的一个。 – Michael

1

设计看起来不错。这里没有太多的细节,所以很难确定。我建议将所有依赖注入到计时器任务中。

此外,你可能在没有太多自定义代码的情况下在Apache Camel中实现这一点。见https://camel.apache.org/timer.html

+0

+1,您是否想详细说明如何将所有依赖注入到计时器任务中? – Michael

0

你说,在执行定时器当消费者将提取所有项目。

你应该要小心,从队列中提取的所有项目没有阻止操作的操作,它是poll()阻塞的方法调用的重复,这意味着在提取物品时 生产商将能够添加项目到队列中。

+0

不轮询是否同步?我建议投票,不要采取,因为它没有阻塞时,队列是空的,并返回null。我建议放,因为它是阻塞的,如果队列满了,生产者会睡觉。 – Michael

+0

根据队列的实现,会有或多或少的锁定。民意调查/放是通常的路要走,虽然你可以优化使用drainTo并提供... – iwein

+0

我想使用像drainTo这样的方法,虽然根据规范,似乎这种方法是不安全的。 – Michael

1

既然你问到的设计,我建议几件事情:

  • 我会亲自去执行人服务在计时器任务。看看here。如果需求更改为,使用执行程序可确保您可以在将来以单独的线程执行任务。
  • 尝试将您的队列与任务对象分开。
  • 一般在代码中使用DI以使其可测试。
  • 我会让生产者在他们的构造函数中接收队列。
+0

感谢您的建议,因为我已经回答了,并且在此问了一些谁删除了他的帖子,您如何在执行Executor服务的同时每分钟运行一次任务,(我只有一个重复任务) – Michael

+0

计划线程池执行程序。 http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/ScheduledThreadPoolExecutor.html希望这会有所帮助。 –

+0

+1,谢谢,这可能有帮助 – Michael

1

根据您的设计,我可以想到下面这样的事情。 ConsumerTask可以使用泛型,但我很难弄清楚如何在Producer线程中执行相同的操作。生产者和消费者都对生产/消费的物品数量有限制。从TimerTask逻辑中取消timerTask本身的run()方法中的定时器对于它的停止至关重要。在这种情况下,只能使用毒药丸方法来关闭。 如果使用Executors.newSingleThreadExecutor()或scheduledThreadPoolExecutor(),则可以使用shutdown()和shutdownNow()方法来停止生产者或使用者。虽然TimerTask是检查ConcurrentQueue的工作的一个很好的例子,但它不会用于生产系统。

编辑 向生产者线程添加通用功能。构造函数现在使用一个模板类来实现将项目添加到队列的方法。我定义了一个抽象类AddItem,它包含和addItem()方法,只要生产者想将项添加到队列中,该方法就会被调用。

import java.util.Date; 
import java.util.Random; 
import java.util.Timer; 
import java.util.TimerTask; 
import java.util.concurrent.ConcurrentLinkedQueue; 
import java.util.concurrent.atomic.AtomicInteger; 
import java.util.concurrent.atomic.AtomicLong; 

public class ConsumerTask<T> extends TimerTask { 
    Timer timer; 
    ConcurrentLinkedQueue<T> itemQueue; 
    AtomicLong count = new AtomicLong(0); 
    final long limit; 

    public ConsumerTask(ConcurrentLinkedQueue<T> itemQ, long lim, int seconds) { 
     limit = lim; 
     timer = new Timer(); 
     timer.scheduleAtFixedRate(this, new Date(), seconds * 1000); 
     itemQueue = itemQ; 
    } 

    public void run() { 
     T item = itemQueue.peek(); 
     if (item != null) { 
      if (count.incrementAndGet() <= limit) { 
       System.out.println("Extracting Item : " + itemQueue.poll()); 
      } else { 
       System.out 
         .println("Consumed : " + (count.get() - 1) + " items"); 
       timer.cancel(); 
      } 

     } 
    } 

    public static void main(String args[]) throws InterruptedException { 
     ConcurrentLinkedQueue<Integer> itemQ = new ConcurrentLinkedQueue<Integer>(); 
     ConsumerTask<Integer> ct = new ConsumerTask<Integer>(itemQ, 10, 1); 

     new Thread(new Producer<Integer>(itemQ, new IntegerAddItem(itemQ), 20)) 
       .start(); 
     new Thread(ct).start(); 

    } 
} 

abstract class AddItem<T> { 
    ConcurrentLinkedQueue<T> itemQ; 
    T t; 

    public AddItem(ConcurrentLinkedQueue<T> itemQ) { 
     this.itemQ = itemQ; 
    } 

    abstract boolean addItem(); 

    public boolean addItem(T t) { 
     return itemQ.add(t); 
    } 
} 

class IntegerAddItem extends AddItem<Integer> { 
    public IntegerAddItem(ConcurrentLinkedQueue<Integer> itemQ) { 
     super(itemQ); 
    } 

    AtomicInteger item = new AtomicInteger(0); 

    @Override 
    boolean addItem() { 
     return addItem(item.incrementAndGet()); 
    } 

} 

class Producer<T> implements Runnable { 
    private final ConcurrentLinkedQueue<T> itemQueue; 
    AtomicInteger item = new AtomicInteger(0); 
    AtomicLong count = new AtomicLong(0); 
    AddItem<T> addMethod; 
    final long limit; 

    public Producer(ConcurrentLinkedQueue<T> itemQ, AddItem<T> addMethod, 
      long limit) { 
     itemQueue = itemQ; 
     this.limit = limit; 
     this.addMethod = addMethod; 
    } 

    public void run() { 
     while (count.getAndIncrement() < limit) { 
      addMethod.addItem(); 
      try { 
       Thread.sleep(new Random().nextInt(5000)); 
      } catch (InterruptedException e) { 
       // TODO Auto-generated catch block 
       Thread.currentThread().interrupt(); 
      } 

     } 
    } 
} 
+0

谢谢,有趣的建议。 – Michael