2013-07-05 46 views
3

我想验证一个多线程的应用程序,我写的设计,并得到澄清/再保险的几个点。我事先为这么长的帖子道歉 - 我想把它分成几个问题,但后来我不得不引用相同的代码,它们似乎都是相互关联的,所以我选择将所有内容放在一个帖子中。如果这不合适 - 请让我知道,我会分解成多个帖子。ExecutorService中的相关设计问题

以下是我有:

  1. BatchService(Spring的单例的bean):接受请求上传指定的目录或zip存档。为此,它包含ExecutorService servicePool。在每个请求中,它向池提交一个新的BatchUploader Callable任务,并将返回的Future存储在列表中 - 一种TX方法。它提供获取所有上传状态并取消所有上传的方法。它还启动一个新的BatchMonitor线程来监视上载和更新队列的进度,这些队列保存已完成而未完成的上传信息。它还清理bean即将销毁时的所有资源(使用Spring的PreDestroy回调)
  2. BatchUploader是一个可调用的任务,它也有自己的ServiceExecutor batchPool来上传单个文件。在其call()方法中,它扫描目录或zip归档文件,并为每个文件提交SingleFileUploader Callable任务到其池中。
  3. SingleFileUploader是一个Callable任务,并在其()方法调用它所有的工作上传和处理文件,并返回一些状态。

这里是一些真正的和一些伪代码:

public class BatchService { 

private ExecutorService servicePool; 
private ConcurrentHashMap<String, Future<SingleBatchUploadResult>> uploadBatchFutures = new ConcurrentHashMap<String, Future<SingleBatchUploadResult>>(); 
// keep last 100 unsuccessful uploads 
private ConcurrentLinkedQueue<SingleBatchUploadResult> notCompletedBatches = new ConcurrentLinkedQueue<SingleBatchUploadResult>(); 
// keep last 100 successful uploads 
private ConcurrentLinkedQueue<String> completedBatches = new ConcurrentLinkedQueue<String>(); 
private Thread monitorThread; 

public BatchService() { 
    executorService = Executors.newFixedThreadPool(MAX_BATCH_UPLOAD_THREADS); 
    monitorThread = new Thread(new BatchMonitor()); 
    monitorThread.setDaemon(true); 
    monitorThread.start(); 
} 

@Transactional 
public void processUpload(String uploadId, String contentName) { 
    Future<SingleBatchUploadResult> taskFuture = servicePool.submit(new BatchUploader(uploadId, contentName)); 
    uploadBatchFutures.put(uploadId, taskFuture); 
} 

@PreDestroy 
public void preDestroy() { 
    // stop the monitor thread 
    monitorThread.interrupt(); 
    // stop all executors and their threads 
    cancelAllTasks(); 
} 

public void cancelAllTasks(){ 
    List<Runnable> waitingTasks = servicePool.shutdownNow(); 
    for (Runnable task: waitingTasks){ 
     // examine which tasks are still waiting, if necessary    
    } 
} 

public boolean cancelBatchById(String uploadId){ 
    Future<SingleBatchUploadResult> resultFuture = activeBatchFutures.get(uploadId); 
    if (resultFuture != null && (!resultFuture.isDone() || !resultFuture.isCancelled())){ 
     resultFuture.cancel(true); 
     return true; 
    } 
    // this task was either already finished, cancelled, not submitted or unknown 
    return false; 
} 

public void getCurrentStatus(){ 
    // just print out the sizes of queues for now 
    System.out.println("number of active uploads: " + activeBatchFutures.size());    
    System.out.println("number of successfully completed uploads: " + completedBatches.size());    
    System.out.println("number of failed uploads: " + notCompletedBatches.size());     
} 


public class BatchMonitor implements Runnable { 
    @Override 
    public void run() { 
     boolean cont = true; 
     while (cont) { 
      if (Thread.currentThread().isInterrupted()){ 
       // the thread is being shut down - get out 
       cont = false; 
       break; 
      }     
      Iterator<Entry<String, Future<SingleBatchUploadResult>>> iterator = activeBatchFutures.entrySet().iterator(); 
      // remove completed Futures from the map 
      // add successfully completed batches to completedBatches queue 
      // add all other batches to notCompletedBatches queue 
      while (iterator.hasNext() && cont){ 
       … 
       if (batchUploadFuture.isCancelled()) {       
        addToNotCompleted(defaultResult); 
        // remove this future from the active list 
        activeBatchFutures.remove(uploadId);       
       } else if (batchUploadFuture.isDone()){ 
        try { 
         SingleBatchUploadResult result = batchUploadFuture.get(); 
         if (UploadStatus.SUCCESS.equals(result.getUploadStatus())) 
          addToCompleted(uploadId); 
         else 
          addToNotCompleted(result); 
        } catch (InterruptedException e) { 
         // the thread is being shut down - stop processing 
         cont = false; 
         // preserve interruption state of the thread 
         Thread.currentThread().interrupt(); 
         break; 
        } catch (ExecutionException e) { 
         addToNotCompleted(defaultResult); 
        } 
        // remove this future from the active list 
        activeBatchFutures.remove(uploadId); 
       } else { 
        // the task has not finished yet - let it be 
        // TODO if a Future is not complete - see how old it is [how ?] If older then timeout - cancel it 
        // For now, rely on the ExecutorService timeout set on the BatchUploader 
       } 

      } 
      // try to sleep for 5 sec, unless the thread is being shutdown 
      if (!Thread.currentThread().isInterrupted()){ 
       try { 
        Thread.sleep(5000); 
       } catch (InterruptedException e) { 
        cont = false; 
        // preserve interruption state of the thread 
        Thread.currentThread().interrupt(); 
       } 
      } 

     } 
     System.out.println("BatchMonitor.run() has terminated"); 
    } 

    public void addToCompleted(String uploadId){ 
     int currentSize = completedBatches.size(); 
     // bring the size of the queue below MAX 
     if (currentSize > MAX_SUCCESSFUL_RESULTS) { 
      int delta = MAX_SUCCESSFUL_RESULTS - currentSize; 
      while (delta > 0){ 
       completedBatches.poll(); 
       delta--; 
      } 
     } 
     completedBatches.offer(uploadId);    
    } 

    public void addToNotCompleted(SingleBatchUploadResult result){ 
     int currentSize = notCompletedBatches.size(); 
     // bring the size of the queue below MAX 
     if (currentSize > MAX_UNSUCCESSFUL_RESULTS) { 
      int delta = MAX_UNSUCCESSFUL_RESULTS - currentSize; 
      while (delta > 0){ 
       notCompletedBatches.poll(); 
       delta--; 
      } 
     } 
     notCompletedBatches.offer(result);    
    } 

} 
} 

public class BatchUploader implements Callable<SingleBatchUploadResult> { 

private ExecutorService executorService; 
// Map<fileName, Future result> - holds Futures for all files that were submitted for upload (those that did not fail validation) 
private ConcurrentHashMap<String, Future<SingleFileUploadResult>> uploadTaskFutures = new ConcurrentHashMap<String, Future<SingleFileUploadResult>>(); 
private ConcurrentHashMap<String, SingleFileUploadResult> notUploadedFiles = new ConcurrentHashMap<String, SingleFileUploadResult>(); 
private int totalFilesToUpload = 0; 

public BatchUploader(...) { 
    executorService = Executors.newFixedThreadPool(MAX_THREADS_PER_BATCH); 
} 

public SingleBatchUploadResult call() { 
// do some validation 
    if (this is a correct ZIP file){ 
     String errorMessage = processZipArchive(threadName, contentName); 
     // the errorMessage will be not null if there were some exceptions that happened during the zip archive read: 
     // opening the ZIP archive, reading entries or thread interruption exceptions 
     if (errorMessage != null) { 
    ... 
      return errorBatchUploadResult;     
     } 
    }   
    // all tasks are submitted - stop the service from accepting new requests and shutdown when done 
    executorService.shutdown(); 

    // now wait until all tasks have finished - but only up to BATCH_UPLOAD_TIMEOUT_IN_SEC seconds 
    try { 
     executorService.awaitTermination(BATCH_UPLOAD_TIMEOUT_IN_SEC, TimeUnit.SECONDS); 
    } catch (InterruptedException e) { 
     // try to shutdown all running tasks and stop waiting tasks from being scheduled; 
     executorService.shutdownNow(); 
     // preserve interruption state of the thread 
     Thread.currentThread().interrupt(); 
     return errorBatchUploadResult; 
    } 

    // at this point, we either finished all tasks (awaitTermination finished before timeout), 
    // or we timed out waiting. Get the latest status of each task 
    List<String> successfullyUploadedFiles = new LinkedList<String>(); 
    for (String entryName : uploadTaskFutures.keySet()) { 
     Future<SingleFileUploadResult> future = uploadTaskFutures.get(entryName); 
     try { 
      if (future.isCancelled()) { 
       ... 
       notUploadedFiles.putIfAbsent(entryName, taskResult);     
      } else if (future.isDone()) { 
       // this task has finished 
       taskResult = future.get(); 
       if (taskResult.getUploadStatus().equals(UploadStatus.SUCCESS)) 
        successfullyUploadedFiles.add(entryName); 
       else 
        notUploadedFiles.putIfAbsent(entryName, taskResult);     
      } else { 
       // this task is either not started yet or not finished yet 
       … 
       notUploadedFiles.putIfAbsent(entryName, sometaskResult); 
      } 
     } catch (InterruptedException e){ 
      // this is a signal to stop processing 
      batchUploadResult.setTotalFilesToUpload(totalFilesToUpload); 
      batchUploadResult.setNotUploadedFiles(notUploadedFiles); 
      batchUploadResult.setSuccessfullyUploadedFiles(successfullyUploadedFiles); 
      batchUploadResult.setStatusMessage(statusMessage); 
      batchUploadResult.setUploadStatus(UploadStatus.PARTIAL_FAILURE); 
      // cancel/stop all executing/waiting SingleFileUpload tasks 
      executorService.shutdownNow(); 
      // preserve interruption state of the thread 
      Thread.currentThread().interrupt(); 
      return batchUploadResult; 
     } catch (ExecutionException e) { 
      // we do not know what the state of this task is 
      … 
      notUploadedFiles.putIfAbsent(entryName, sometaskResult); 
     }    
    } 
    ... 
    return batchUploadResult; 
} 

private String processZipArchive(String threadName, String zipName) { 
    // do all ZIP-reading work here 
     while (valid file found) 
     { 
      if (Thread.currentThread().isInterrupted()){ 
       // this batch uploader thread is being shut down - stop all SingleFileUpload tasks 
       executorService.shutdownNow(); 
       return errorMessage; 
      } 
      // do a try while processing individual files to be able to gather info about failed files but continue processing good ones 
      try { 
       // read the file and pass it for processing to SingleFileUploader 
       Future<SingleFileUploadResult> taskFuture = executorService.submit(new SingleFileUploader(uploadId, bytesContent, zipEntryName)); 
       uploadTaskFutures.put(zipEntryName, taskFuture); 
       ... 
      } catch (some exceptions) { 
        notUploadedFiles.put(zipEntryName, taskResult); 
      } 
     } 
return errorMessage; 
}  
} 

public class SingleFileUploader implements Callable<SingleFileUploadResult> { 
...  
@Override 
public SingleFileUploadResult call() { 
    // check if there was a cancellation request 
    if (Thread.currentThread().isInterrupted()){ 
     // this file uploader thread is being shut down - get out    
     return errorResult; 
    } 
    // do the real work here 
    return result; 
} 

}

所有这部作品在常规情况下就好了。不过,我还是想听听你的意见是否有更好/更可靠的方法可以做到我想要的东西,尤其是在以下几个方面:

  1. 我使用一个单独的线程,BatchMonitor,跟踪通过定期扫描活跃期货清单并将它们转移到“成功完成”或“未完成[失败]”队列中,这些活动已完成,尚未完成。我想知道是否有更好的方法来做到这一点?我继续增加项目并将其绑定到一个指定的最大大小自己 -

  2. 我使用同步为无界队列。我找不到标准JDK库一个“有限并发队列”,只有无限的,我希望我可以用番石榴的EvictingQueue但它捆绑到15.0版本这似乎不会是出来呢......所以,我决定自己限制队列的大小,但是以使用size()操作为代价,我知道这是一个并发队列的问题,因为它对队列进行了全面扫描......我的推理是它可能是好的if我把队列小的尺寸 - 100在我的情况

  3. 我需要并发队列呢?唯一修改队列的线程是BatchMonitor线程,而唯一另一个将读取队列的线程是BatchService线程。唯一一次我可以进入不同步的情况是当BatchSErvice尝试获取特定上传的状态。这可能是该上传已经从activeBatchFutures地图中删除,但还没有放入任何“已完成”或“notCompleted”队列,因为我不同步读取地图而故意队列之间/写,以避免不必要的锁定。但是,我可以离开偶尔的“找不到”状态返回一个特定的上传 - 要求状态第二次会得到正确的结果。

  4. BatchService是一个Singleton豆 - 这带来了自身的可扩展性的问题,因为这个bean的所有请求都将被扼杀。另一个选择可能是让每个BatchUploader成为一个Spring bean并限制bean的数量,但是我怎么做整体监控呢?

  5. 处理超时和取消:我试图让这个应用程序防弹资源清理 - 我试图处理所有ThreadInterpution的情况下,并停止处理,以允许线程被杀害。我依靠InterruptedException在BAtchUploader中捕获和处理,通过调用batchPool.shutdownNow()将此事件传播到各个FileUploader任务。你可以看到任何潜在的情况,我可能会失控线程 - 当JVM关闭,应用程序重新部署在Web容器中,...?

谢谢!

滨海

回答

3
  1. 利用番石榴的ListenableFuture,而不是你BatchMonitor - 在ListenableFuture能尽快执行的回调为Future是完整的,这避免了需要你使用一个线程来监视你的Futures

  2. 使用一个ArrayBlockingQueue,这是一个有界并发队列。使用在消费者线程take删除一个项目,并阻止如果队列为空,offer(E e, long timeout, TimeUnit unit)在生产线增加一个项目,块(用于timeout units)如果队列已满。

  3. 如果使用ListenableFutures那么你不应该需要一个BatchMonitor或并发队列

  4. 我建议你对你for (String entryName : uploadTaskFutures.keySet())循环的每个迭代检查Thread.currentThread().isInterrupted(),你不是叫抛出InterruptedException的方法在所有代码路径(例如,如果你继续坚持通过else路径,你会注意到中断标志设置之前,可能需要一段时间)

+0

感谢,ZIM-ZAM,指着我的ListenableFutures - 我还没有使用过它们。这看起来是一种截然不同的方式,使用Guava的Executor服务,听众和期货。我通常会尝试使用标准JDK库,只要它们能够满足我需要的90%,但在这种情况下,它似乎值得考虑 - 因为它提供了非常好的回调功能并解决了我的一些问题。至于ArrayBlockingQueue - 我认为它,因为它是有界的并发的,不幸的是它也阻塞 - 我想有一个'滑动'队列,只有最近的100个最近的项目被保存。 – Marina

+0

此外,谢谢你为isInterrupted缺少检查的伟大发现!我完全错过了它。感谢您阅读代码! – Marina

+1

@Marina在这种情况下,您可能希望为队列的大小维护一个'AtomicInteger'以确保实现恒定的时间'size' –