2013-08-31 53 views
0

我正在一个项目中,我将有不同的捆绑包/模型。我们举一个例子,假设我有4个捆绑包,每个捆绑包都有一个方法名称process线程安全问题,同时超时为每个包的线程

下面是事情,我应该做 -

  1. 我需要调用并行所有4个捆绑process method使用多线程和process method在每捆将返回我一个地图,然后写这个地图进入在同一个线程中的数据库或任何最好的方法(我不知道这是正确的方法)。
  2. 而且我想在线程级别启用某种超时功能。这意味着如果任何Bundle花费很多时间来执行,那么Bundle线程应该会超时并记录一个错误,指出这个特定的bundle已经超时了,因为它花了很多时间。

以下尝试我所做的很可能是有缺陷的,错误处理并不完整。任何人都可以指导我在错误处理案例中应该做什么?

以下是我的方法,它将以多线程方式调用所有软件包的process method

public void processEvents(final Map<String, Object> eventData) { 
    ExecutorService pool = Executors.newFixedThreadPool(5); 
    List<ProcessBundleHolderEntry> entries = new ArrayList<ProcessBundleHolderEntry>(); 

    Map<String, String> outputs = (Map<String, String>)eventData.get(BConstants.EVENT_HOLDER); 

    for (BundleRegistration.BundlesHolderEntry entry : BundleRegistration.getInstance()) { 
     ProcessBundleHolderEntry processBundleHolderEntry = new ProcessBundleHolderEntry(entry, outputs); 
     entries.add(processBundleHolderEntry); 
    } 

    try { 
     List<Future<Object>> futures = pool.invokeAll(entries, 30, TimeUnit.SECONDS); 
     for (int i = 0; i < futures.size(); i++) { 
      // This works since the list of future objects are in the 
      // same sequential order as the list of entries 
      Future<Object> future = futures.get(i); 
      ProcessBundleHolderEntry entry = entries.get(i); 
      if (!future.isDone()) { 
       // log error for this entry 
      } 
     } 
    } catch (InterruptedException e) { 
     // handle this exception! 
    } 
} 

其次,你的线程可调用的实现:

public class ProcessBundleHolderEntry implements Callable { 
    private BundleRegistration.BundlesHolderEntry entry; 
    private Map<String, String> outputs; 

    public ProcessBundleHolderEntry(BundleRegistration.BundlesHolderEntry entry, Map<String, String> outputs) { 
     this.entry = entry; 
     this.outputs = outputs; 
    } 

    public Object call() throws Exception { 
     final Map<String, String> response = entry.getPlugin().process(outputs); 
     // write to the database. 
     System.out.println(response); 
     return response; 
    } 
} 

谁能告诉我是否有上述做法有任何问题或有做同样的事情什么更好的和有效的方法?我不确定是否有任何线程安全问题。

任何帮助将不胜感激。

回答

1

代码中唯一的共享对象是eventData:只要它在该方法运行时未被修改(或者如果地图及其内容是线程安全的并且更改已安全地发布),您应该没问题。

关于你的任务异常处理,你通常做:

try { 
    future.get(); 
} catch (ExecutionException e) { 
    Throwable exceptionInFuture = e.getCause(); 
    //throw, log or whatever is appropriate 
} 

关于中断例外:它意味着你正在执行的方法已中断线程。你需要做什么取决于你的使用情况,但一般应该停止你正在做什么,所以像:

} catch (InterruptedException e) { 
    pool.shutdownNow(); //cancels the tasks 
    //restore interrupted flag and exit 
    Thread.currentThread.interrupt(); 
    //or rethrow the exception 
    throw e; 
} 

注:线程池的目的是为了重复使用 - 你应该申报的执行服务作为(私有最终)实例变量,而不是每次调用processEvents方法时创建一个。

+0

感谢assylias的建议。除此之外,我的代码还有其他问题,我应该担心吗?我想让你知道的一件事就是 - 每次都会调用processEvents方法,并且不会等待所有线程在'processEvents'方法中完成。含义'processEvents'方法每次都会被异步调用。所以不知道这是否会导致任何问题..有什么想法? – ferhan

+0

任何关于我上面的问题的想法..?谢谢。 – ferhan

+0

对于详细的代码审查,我建议http://codereview.stackexchange。com/- 从线程安全的角度来看,这完全取决于eventData是否在操作过程中被修改。如果不是,那么你很好。关于你的代码,'future.isDone()'意味着任务还没有完成 - 这并不意味着有错误。所以我建议使用'future.get()'代替 - 但这意味着该方法不会退出,直到所有的期货完成,这可能不是你想要的。 – assylias

相关问题