0

我使用线程池执行器产生5个线程并行执行5个不同的命令。每个线程完成后,我将以threadid的条目作为键更新并发散列映射,并将其作为值终止。但是我的线程池并没有更新成功完成命令执行的hashmap。线程池执行器不更新并发哈希映射

主类:

package com.cisco.executor; 

import java.util.concurrent.ConcurrentHashMap; 
import java.util.concurrent.Executors; 
import java.util.concurrent.ThreadPoolExecutor; 
import java.util.ArrayList; 
import java.util.Arrays; 
import java.util.Iterator; 
import java.util.List; 
import java.util.Map; 

public class MainExecutor { 

    static String element; 
    static ConcurrentHashMap<Integer, String> map = new ConcurrentHashMap<Integer, String>(); 
    static Integer array[] = { 1, 2, 3, 4, 5 }; 
// static Integer array[] = { 1 }; 
    static List<Integer> threadid = Arrays.asList(array); 
    static String SQOOP_XXCCS_DS_SAHDR_CORE = ReadProperties.getInstance().getProperty("SQOOP_XXCCS_DS_SAHDR_CORE"); 
    static String SQOOP_XXCCS_DS_CVDPRDLINE_DETAIL = ReadProperties.getInstance() 
      .getProperty("SQOOP_XXCCS_DS_CVDPRDLINE_DETAIL"); 
    static String SQOOP_XXCCS_DS_INSTANCE_DETAIL = ReadProperties.getInstance() 
      .getProperty("SQOOP_XXCCS_DS_INSTANCE_DETAIL"); 
    static String SQOOP_XXCCS_SCDC_PRODUCT_PROFILE = ReadProperties.getInstance() 
      .getProperty("SQOOP_XXCCS_SCDC_PRODUCT_PROFILE"); 
    static String SQOOP_MTL_SYSTEM_ITEMS_B = ReadProperties.getInstance().getProperty("SQOOP_MTL_SYSTEM_ITEMS_B"); 

    public static void main(String[] args) { 

     ThreadPoolExecutor executors = (ThreadPoolExecutor) Executors.newFixedThreadPool(5); 
//  ThreadPoolExecutor executors = (ThreadPoolExecutor) Executors.newFixedThreadPool(1); 

     System.out.println("at executors step"); 
     List<String> getlist = getList(); 
     Iterator<Integer> itr2 = threadid.iterator(); 

     for (Iterator<String> itr = getlist.iterator(); itr.hasNext() && itr2.hasNext();) { 
      String element = (String) itr.next(); 
      int thread_id = itr2.next(); 
      String[] command = { "ssh", "hddev-c01-edge-02", "\"" + element + "\"" }; 
      System.out.println("the command is as below "); 
      System.out.println(Arrays.toString(command)); 
      System.out.println("inside the iterator"); 
      ParallelExecutor pe = new ParallelExecutor(command, thread_id, map); 
      executors.execute(pe); 
     } 
     // executors.shutdown(); 
     for(Map.Entry<Integer, String> entry: map.entrySet()) 
     { 
      Integer key = entry.getKey(); 
      String value = entry.getValue();    
      System.out.println("The key is " + key + " The value is " + value); 
      System.out.println("Thread " + key + " is terminated"); 
     } 

    } 

    public static List<String> getList() { 
     List<String> commandlist = new ArrayList<String>(); 
     System.out.println("inside getList"); 
     commandlist.add(SQOOP_XXCCS_DS_SAHDR_CORE); 
     commandlist.add(SQOOP_XXCCS_DS_CVDPRDLINE_DETAIL); 
     commandlist.add(SQOOP_XXCCS_DS_INSTANCE_DETAIL); 
     commandlist.add(SQOOP_XXCCS_SCDC_PRODUCT_PROFILE); 
     commandlist.add(SQOOP_MTL_SYSTEM_ITEMS_B); 
     return commandlist; 
    } 

} 

运行的类:

package com.cisco.executor; 

import java.io.BufferedReader; 
import java.io.InputStreamReader; 
import java.util.concurrent.ConcurrentHashMap; 

import org.apache.log4j.Logger; 

public class ParallelExecutor implements Runnable { 

    private static Logger LOGGER = Logger.getLogger(ParallelExecutor.class); 

    String[] command; 
    int threadid; 
    ConcurrentHashMap<Integer, String> map; 

    public ParallelExecutor(String[] command, int threadid, ConcurrentHashMap<Integer, String> map) { 
     this.command = command; 
     this.threadid = threadid; 
     this.map = map; 
    } 

    @Override 
    public void run() { 
     ProcessBuilder processbuilder = new ProcessBuilder(command); 
     LOGGER.info(command); 
     try { 
      Process process = processbuilder.inheritIO().start(); 
      System.out.println("inside process builder "); 
      process.waitFor(); 
      BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream())); 
      String readline; 
      while ((readline = reader.readLine()) != null) { 
       LOGGER.info(readline); 
      } 
      // getting the thread state and adding it to a collection 
      Thread.State state = Thread.currentThread().getState(); 
      if (state == Thread.State.TERMINATED) { 
       map.put(threadid, "TERMINATED"); 
      } 
     } catch (Exception e) { 
      LOGGER.error(e.getMessage()); 
     } 
    } 

} 

是我的错误执行。有人可以帮我实施。

+0

您预计何时会有线程说它已终止?如果你死了,你能告诉我你已经死了吗? –

+0

是的,你是绝对正确的。我错过了这个逻辑。那么我应该抓住状态并做一个入口? – dataEnthusiast

+0

我想要做的就是捕获线程ID和执行线程后的状态,并将其放入地图中以轮询地图。 – dataEnthusiast

回答

0

ThreadPoolExecutor不会终止,直到它被要求这样做。 所以,首先你必须调用

// executors.shutdown(); 

,你不停的评论。 第二,你需要等待线程正常终止。对于添加一个循环,之前(Map.Entry的条目:map.entrySet())

while (!es.isTerminated()) { 
     } 

但是,因为一个线程可能会运行许多可运行,如果我让你正确地要更新一次的CHM一个Runnable完成它的执行。

要做到这一点,你必须使用CustomThread类。扩展线程 并覆盖只有1个方法,afterExecute()从你需要把代码更新CHM与Runnable的ID和终止状态。但请记住,这意味着完成传递的Runnables run()方法,而不是底层的Thread的终止。

1

而不是试图捕获线程中的线程结果(尤其是在抛出异常/错误时容易出错)我建议您保留Future对象并检查它们。

ExecutorService exec = Executors.newFixedThreadPool(5); 

    System.out.println("at executors step"); 
    Map<String, Future<?>> results = new HashMap<>(); 
    for (String element : getList()) { 
     String[] command = { "ssh", "hddev-c01-edge-02", "\"" + element + "\"" }; 
     results.put(element, exec.submit(new ParallelExecutor(command, thread_id, map))); 
    } 
    for(Map.Entry<String, Future<?>> entry: map.entrySet()) { 
     try { 
      entry.getValue().get(); 
      System.out.println(entry.getKey()+ " is complete"); 
     } catch (ExecutionException e) { 
      System.out.println(entry.getKey()+ " failed with"); 
      e.getCause().printStackTrace(System.out); 
     } 
    } 
+0

我会尝试实现这一点。感谢您的片段! – dataEnthusiast

+1

@dataEnthusiast:如Peter Lawrey所建议的,保留Future Object并检查结果。我也喜欢同样的解决方案。 –

+0

当然,我会做。 – dataEnthusiast