2016-06-09 28 views
1

我试图写它有两个方法批量邮件服务:生产者/用料和冲洗功能的消费者

add(Mail mail):可发送Email,由生产者

flushMailService()称为:刷新服务。消费者应该列出一个清单,并打电话给另一个(昂贵的)方法。通常只有在达到批量大小后才能调用昂贵的方法。

这有点类似于这样的问题: Producer/Consumer - producer adds data to collection without blocking, consumer consumes data from collection in batch

这是可以做到与poll()具有超时。但是生产者应该能够清除邮件服务,如果它不想等待超时,而是让生产者发送队列中的邮件。

poll(20, TimeUnit.SECONDS)可以被中断。如果发生中断,则应发送队列中的所有邮件,而不管批量大小是否已达到队列为空(使用poll(),如果队列为空则立即返回null;一旦该队列为空,则发送的邮件该中断已经被送到制片人。然后,生产者应当再次致电然后阻断poll版本,直到其他任何生产中断等。

这似乎与给定的实施工作。

我试着使用ExecutorServicesFutures,但它似乎只能被中断一次,因为它们是在第一次中断后视为取消。因此,我使用了可以多次中断的线程。

目前我有以下实现似乎工作(但正在使用“原始”线程)。

这是一个合理的方法吗?或者也许可以使用另一种方法?

public class BatchMailService { 
    private LinkedBlockingQueue<Mail> queue = new LinkedBlockingQueue<>(); 
    private CopyOnWriteArrayList<Thread> threads = new CopyOnWriteArrayList<>(); 
    private static Logger LOGGER = LoggerFactory.getLogger(BatchMailService.class); 

    public void checkMails() { 

     int batchSize = 100; 
     int timeout = 20; 
     int consumerCount = 5; 

     Runnable runnable =() -> { 
      boolean wasInterrupted = false; 

      while (true) { 
       List<Mail> buffer = new ArrayList<>(); 
       while (buffer.size() < batchSize) { 
        try { 
         Mail mail; 
         wasInterrupted |= Thread.interrupted(); 
         if (wasInterrupted) { 
          mail = queue.poll(); // non-blocking call 
         } else { 
          mail = queue.poll(timeout, TimeUnit.SECONDS); // blocking call 
         } 
         if (mail != null) { // mail found immediately, or within timeout 
          buffer.add(mail); 
         } else { // no mail in queue, or timeout reached 
          LOGGER.debug("{} all mails currently in queue have been processed", Thread.currentThread()); 
          wasInterrupted = false; 
          break; 
         } 
        } catch (InterruptedException e) { 
         LOGGER.info("{} interrupted", Thread.currentThread()); 
         wasInterrupted = true; 
         break; 
        } 
       } 
       if (!buffer.isEmpty()) { 
        LOGGER.info("{} sending {} mails", Thread.currentThread(), buffer.size()); 
        mailService.sendMails(buffer); 
       } 
      } 
     }; 

     LOGGER.info("starting 5 threads "); 
     for (int i = 0; i < 5; i++) { 
      Thread thread = new Thread(runnable); 
      threads.add(thread); 
      thread.start(); 
     } 

    } 

    public void addMail(Mail mail) { 
     queue.add(mail); 
    } 

    public void flushMailService() { 
     LOGGER.info("flushing BatchMailService"); 
     for (Thread t : threads) { 
      t.interrupt(); 
     } 
    } 
} 

无中断的另一种方法,但毒丸计划(Mail POISON_PILL = new Mail())的变体可能是以下。当有一个消费者线程时可能效果最好。至少,对于一个毒丸,只有一个消费者会继续。

Runnable runnable =() -> { 
     boolean flush = false; 
     boolean shutdown = false; 

     while (!shutdown) { 
      List<Mail> buffer = new ArrayList<>(); 
      while (buffer.size() < batchSize && !shutdown) { 
       try { 
        Mail mail; 
        if (flush){ 
         mail = queue.poll(); 
         if (mail == null) { 
          LOGGER.info(Thread.currentThread() + " all mails currently in queue have been processed"); 
          flush = false; 
          break; 
         } 
        }else { 
         mail = queue.poll(5, TimeUnit.SECONDS); // blocking call 
        } 

        if (mail == POISON_PILL){ // flush 
         LOGGER.info(Thread.currentThread() + " got flush"); 
         flush = true; 
        } 
        else if (mail != null){ 
         buffer.add(mail); 
        } 
       } catch (InterruptedException e) { 
        LOGGER.info(Thread.currentThread() + " interrupted"); 
        shutdown = true; 
       } 
      } 
      if (!buffer.isEmpty()) { 
       LOGGER.info(Thread.currentThread()+"{} sending " + buffer.size()+" mails"); 
       mailService.sendEmails(buffer); 
      } 
     } 
    }; 

public void flushMailService() { 
    LOGGER.info("flushing BatchMailService"); 
    queue.add(POISON_PILL); 
} 

回答

1

如何使用信号并等待而不是中断?

如果需要刷新邮件和信号,生产者会发送邮件和信号。 分派器等待信号或超时并继续在消费者线程中发送电子邮件。

import java.util.ArrayList; 
import java.util.List; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
import java.util.concurrent.LinkedBlockingQueue; 
import java.util.concurrent.TimeUnit; 
import java.util.concurrent.locks.Condition; 
import java.util.concurrent.locks.Lock; 
import java.util.concurrent.locks.ReentrantLock; 

public class BatchMailService { 

    private LinkedBlockingQueue<Mail> queue = new LinkedBlockingQueue<>(); 

    public static final int BATCH_SIZE = 100; 
    public static final int TIMEOUT = 20; 
    public static final int CONSUMER_COUNT = 5; 

    private final Lock flushLock = new ReentrantLock(); 
    private final Condition flushCondition = flushLock.newCondition(); 

    MailService mailService = new MailService(); 

    public void checkMails() { 

     ExecutorService consumerExecutor = Executors.newFixedThreadPool(CONSUMER_COUNT); 

     while (true) { 

      try { 
       // wait for timeout or for signal to come 
       flushLock.lock(); 
       flushCondition.await(TIMEOUT, TimeUnit.SECONDS); 

       // flush all present emails 
       final List<Mail> toFLush = new ArrayList<>(); 
       queue.drainTo(toFLush); 

       if (!toFLush.isEmpty()) { 
        consumerExecutor.submit(() -> { 
         LOGGER.info("{} sending {} mails", Thread.currentThread(), toFLush.size()); 
         mailService.sendEmails(toFLush); 
        }); 
       } 

      } catch (InterruptedException e) { 
       Thread.currentThread().interrupt(); 
       break; // terminate execution in case of external interrupt 
      } finally { 
       flushLock.unlock(); 
      } 
     } 

    } 

    public void addMail(Mail mail) { 

     queue.add(mail); 

     // check batch size and flush if necessary 
     if (queue.size() >= BATCH_SIZE) { 

      try { 
       flushLock.lock(); 
       if (queue.size() >= BATCH_SIZE) { 
        flushMailService(); 
       } 
      } finally { 
       flushLock.unlock(); 
      } 
     } 
    } 

    public void flushMailService() { 
     LOGGER.info("flushing BatchMailService"); 
     try { 
      flushLock.lock(); 
      flushCondition.signal(); 
     } finally { 
      flushLock.unlock(); 
     } 
    } 

} 
+0

那么,如果生产者检查队列和刷新的尺寸达到当批量大小,然后如果你有多个消费者,可能他们不会用更少的邮件的邮件服务多次打电话? – user140547

+0

它会有但锁定,以防止这种情况发生。可能有一个错误,我错过了,但正如我看到它正常工作。 addMail()在第二次检查queue.size()之前获取'fluskLock'。 当'checkMails()'从'await()'唤醒时,它会在继续刷新之前获取'flushLock'。 [链接](https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/locks/Condition。html#awaitNanos(long)) –

+0

我用5个并行生产者和1500邮件运行代码。在没有重复邮件的情况下,没有任何批次的BATCH_SIZE项目少于其中的项目,尽管它们通常有更多。 –