2014-03-12 220 views
1

我有一个java代码(很旧的遗留问题),它创建了很多线程。 当进程运行时,一些线程在没有任何跟踪的情况下被杀死。看看代码,我觉得异常处理是正确的。但我不确定为什么一些线程会被杀死。请问这个论坛的任何专家是否帮助我了解这个类是否有关于处理线程的缺失?Java线程死亡

代码如下:

public class WorkerGroup { 

    // This value shows the number of all none-idleling workers. 
    private volatile int m_numbersOfActiveWorkingWorkers = 0; 
    // This value is set by when a single worker wants to starve multi workers 
    private volatile int m_starveRequests = 0; 
    // Thread group 
    // Java Collection of all workers. 
    protected static ThreadGroup s_workGroup = null; 
    // Queue manager 
    // Work group name 
    private final String m_workGroupName; 
    // Shut Down process which is hooked onto termination process of the 
    // application. 
    private static FtpWorkerShutdownHook s_shutDownHook = null; 
    private final Log m_log = Log.create(WorkerGroup.class); 
    /** 
    * CTOR WorkerGroup and collector of workers 
    * @param workGroupName will name the workGroup 
    * @param manager Queue manager 
    */ 
    public WorkerGroup(final String workGroupName) { 
    super(); 
    m_workGroupName = workGroupName; 
    if (s_shutDownHook == null) { 
     s_shutDownHook = new FtpWorkerShutdownHook(); 
     Runtime.getRuntime().addShutdownHook(s_shutDownHook); 
    } 
    } 
    /** 
    * Start procedure to start all workers. This function can only be called 
    * once. 
    * @param numberOfWorkers The number of available workers for this group 
    */ 
    public void startWorkers(int numberOfWorkers) { 
    // can not use negative count 
    if (numberOfWorkers < 0) { 
     return; 
    } 
    // workgroup is assigned alread. Run once only. 
    if (s_workGroup != null) { 
     return; 
    } 
    // creation of the working group. All workers and the work group are 
    // set to be deamon threads. This will keep the main process alive until 
    // all workers are terminated. 
    s_workGroup = new ThreadGroup("Workers of " + m_workGroupName); 
    s_workGroup.setDaemon(true); 

    // maximize number of workers 
    if (numberOfWorkers > 16) { 
     numberOfWorkers = 16; 
    } 
    // create and start all workers. 
    for (int i = 0; i < numberOfWorkers; i++) { 
     startWorker("FtpWorker" + String.valueOf(i + 1)); 
    } 
    } 

// public void startWorker(final String workerName, final ITaskActions task) { 
// final FtpWorker worker = new FtpWorker(workerName, this); 
// worker.setDaemon(true); 
// worker.start(); 
// } 
    public void startWorker(final String workerName) { 
    final FtpWorker worker = new FtpWorker(workerName, this); 
    worker.setDaemon(true); 
    worker.start(); 
    } 
    /** 
    * Explicit shutdown procedure. The normal case is to kill the process 
    */ 
    public void shutDown() { 
    final Thread thread = new FtpWorkerShutdownHook(); 
    thread.start(); 
    try { 
     thread.join(); 
     m_log.event("ftp work group shutdown thread terminated"); 
     // LogAgent 
    } 
    catch (InterruptedException e) { 
     e.printStackTrace(); 
    } 
    } 

    /** 
    * Returns the number of running (non terminated) threads assinged the the 
    * thread group 
    * @return Number of active threads 
    */ 
    protected int activeCount() { 
    if (s_workGroup == null) { 
     return 0; 
    } 
    else { 
     synchronized (s_workGroup) { 
     final int count = s_workGroup.activeCount(); 
     // LogAgent.event("work group", "active count" + count + " active " 
     // + m_numbersOfActiveWorkingWorkers); 
     return count; 
     } 
    } 
    } 

    /** 
    * This function returns if the worker group is active This function may be 
    * overriden. 
    * 
    * @return True if there is workers available and/or 
    */ 
    public synchronized boolean runnable() { 
    return activeCount() > 0; 
    } 

    // ************************************************************************** 
    // Mutex operation 
    // ************************************************************************** 

    private final Mutex m_mutex = new Mutex(); 

    public synchronized void incrementRunningWorkers() { 
    if (m_numbersOfActiveWorkingWorkers == 0) { 
     try { 
     m_mutex.acquire(); 
     } 
     catch (InterruptedException e) { 
     e.printStackTrace(); 
     } 
    } 
    ++m_numbersOfActiveWorkingWorkers; 
    } 

    public synchronized void decrementRunningWorkers() { 
    --m_numbersOfActiveWorkingWorkers; 
    if (m_numbersOfActiveWorkingWorkers == 0) { 
     m_mutex.release(); 
    } 
    } 

    protected void incrementStarveRequest() { 
    synchronized (m_mutex) { 
     m_starveRequests++; 
    } 
    } 

    protected void decrementStarveRequest() { 
    synchronized (m_mutex) { 
     m_starveRequests--; 
    } 
    } 

    public void acquire() { 
    try { 
     synchronized (m_mutex) { 
     m_mutex.acquire(); 
     } 
    } 
    catch (InterruptedException e) { 
     e.printStackTrace(); 
    } 
    } 

    public void release() { 
    synchronized (m_mutex) { 
     m_mutex.release(); 
    } 
    } 

    // -------------------------------------------------------------------------- 
    // Basic Test getter methods 
    // -------------------------------------------------------------------------- 

    protected final int getWorkingWorkersCount() { 
    return m_numbersOfActiveWorkingWorkers; 
    } 

    protected final boolean workersIsStarving() { 
    return m_starveRequests != 0; 
    } 
} 

Ftpworker类是:

public class FtpWorker extends Thread{ 


    // This instance will use log facility for debug and for CIF log. 
    private final Log m_log = Log.create(FtpWorker.class); 

    // pointer to onwer of this worker 
    protected FtpWorkerGroup m_workerGroup = null; 

    // setting m_running to false cause worker to terminate 
    protected boolean m_running = true; 
    private static boolean isRetryList = false; 

    // default sleeping time in mSec 
    protected int m_workerSleep = 2000; 

    protected EAServer m_eaServer = EAServer.createInstance(); 

    private final NeHandler m_neManager = NeHandler.getHandler(); 

    protected List<String> fileNotf = new Vector<String>(); 

    protected List<String> retrylist = Collections.synchronizedList(new ArrayList<String>()); 

    public FtpWorker(final String name, final FtpWorkerGroup workerGroup) { 
    super(FtpWorkerGroup.s_workGroup,name); 

    } 

    public void run() { 
    idle(); 
    shutDownEvent(); 
    } 

    private String getTask() { 
    String notification=null; 
    String nename; 
    NeInfo ne=null; 
    int delim; 
    if(retrylist.size()>0){ 
     m_log.trace("============ RETRY LIST===== " + Thread.currentThread().getName()); 
     m_log.trace("RETRY LIST ::" + retrylist) ; 

    if((retrylist.isEmpty())){ 
     m_log.trace("RETRY LIST IS EMPTY "); 

     return null; 
    } 
     notification= retrylist.remove(0); 
     m_log.trace("Thread :: " + Thread.currentThread().getName() + " scanning list for Notification[RETRY] :: " + notification); 
     delim = notification.indexOf("$"); 
     nename= notification.substring(0, delim); 
     try { 
     ne = m_neManager.getNetworkElement(nename); 
     }catch (NeNotFoundException e1) { 
     m_log.critical("Unable to get properties for Network Elemenet " 
      + nename + " Reason: " + e1.getMessage()+ "The Network Element is of Unsupported Version");  
     } 
    if((ne.getValidState()== NeInfo.NE_IS_STOPPED) || (ne.getValidState()== NeInfo.NE_IS_STOPPING)){ 
     if((retrylist.isEmpty())){ 
     m_log.trace("RETRY LIST IS EMPTY "); 
     FtpWorker.isRetryList = false; 
     return null; 
     } 
     else{ 
     int size = retrylist.size(); 
     for(Iterator it = retrylist.iterator();it.hasNext();){ 
      String test = (String)it.next(); 
      // m_log.trace(" RETRY LIST files to be deleted, test is "+test); 
      if(test.contains(ne.getName())){ 
      //retrylist.remove(test); 
      it.remove(); 
      m_log.trace("file"+ test+ "is deleted from list"); 
      m_log.trace(" NEW RETRY LIST FILE NOTIFICATION LIST ::" + retrylist) ; 
      } 
     } 
     if((retrylist.isEmpty())){ 
      m_log.trace("RETRY LIST IS EMPTY "); 
      FtpWorker.isRetryList = false; 
     } 
     return null; 
     } 
     } 
     if(!ne.getisInFtp()){ 
     m_log.trace("FTP Not Set[RETRY] " +notification); 
     ne.setisInFtp(); 
     m_log.trace("FETCH FILE USING NOTIFICATION[RETRY] " +notification); 
     FtpWorker.isRetryList = false; 
     m_log.trace("============Returning from RETRY LIST===== " + Thread.currentThread().getName()); 
     return notification; 
     } 
     else{ 
     m_log.trace("FTP Set. ADDING NOTIFICATION BACK TO RETRY QUEUE:::: " +notification); 
     FtpWorker.isRetryList = true; 
     retrylist.add(notification); 
     m_log.trace("============Returning NULL RETRY LIST===== " + Thread.currentThread().getName()); 
     return null; 
     } 
    } 
    if(EAServer.fileNotification!=null && (EAServer.fileNotification.size())>0 && !(isRetryList)){ 
     m_log.trace("====ORIGINAL FILE NOTIFICATION LIST== " + Thread.currentThread().getName()); 
     try{ 

     m_log.trace("ORIGINAL FILE NOTIFICATION LIST ::" + EAServer.fileNotification + " for Thread " + Thread.currentThread().getName()) ; 

     if((EAServer.fileNotification.isEmpty())){ 
     m_log.trace("ORIGINAL LIST IS EMPTY "); 
     return null; 
     } 


     notification = (String)EAServer.fileNotification.remove(0); 

     m_log.trace("Thread :: " + Thread.currentThread().getName() + " scanning list for Notification[ORIGINAL] :: " + notification); 
     delim = notification.indexOf("$"); 
     nename= notification.substring(0, delim); 
     m_log.trace("NOTIFICATION FOUND ::: " +notification); 
     try { 
     ne = m_neManager.getNetworkElement(nename); 
     }catch (NeNotFoundException e1) { 
     m_log.critical("Unable to get properties for Network Elemenet " 
      + nename + " Reason: " + e1.getMessage()+ "The Network Element is of Unsupported Version");  
     } 
     }catch(Exception e){ 
     m_log.trace("Exception caught while reading fileNotification " + e.getMessage() + " for Thread " + Thread.currentThread().getName()); 
     return null; 
     } 
     if((ne.getValidState()== NeInfo.NE_IS_STOPPED) || (ne.getValidState()== NeInfo.NE_IS_STOPPING)){ 
     try { 
      ne.setisNotInFtp(); 
      ne.deleteFileWithoutCollecting(ne.getFilePath()); 
      ne.setValidState(NeInfo.NE_IS_STOPPED); 
     } 
     catch (CDMException e) { 
      m_log.trace("Could not delete file for " + ne.getName() 
       + " because " + e.getMessage()); 
     } 
     if((EAServer.fileNotification.isEmpty())){ 
      m_log.trace("ORIGINAL LIST IS EMPTY NOW"); 
      return null; 
     } 
     else{ 
     try { 
     m_log.trace(" DELETING for NE Since Node is Stopped "+ ne.getName()); 
     int size = EAServer.fileNotification.size(); 
     m_log.trace("Size of ORIGINAL NOTIFICATION LIST " + size); 
     for(Iterator it = EAServer.fileNotification.iterator();it.hasNext();){ 
      String test = (String)it.next(); 
      if(test.contains(ne.getName())){ 
      it.remove(); 
      m_log.trace("file"+ test+ "is deleted from list"); 
      m_log.trace(" NEW ORIGINAL FILE NOTIFICATION LIST ::" + EAServer.fileNotification) ; 
      } 
     } 
     return null; 
     } 
     catch(Exception e){ 
      m_log.trace(" CCCCCC IN exception" + e.getMessage()); 
      return null; 
     } 
     } 


     } 
     if(!ne.getisInFtp()){ 
     /* if((ne.getValidState() == NeInfo.NE_IS_STOPPED) || (ne.getValidState() == NeInfo.NE_IS_STOPPING)){ 
      return null; 
     }*/ 
     m_log.trace("FTP Not Set[ORIGINAL] " +notification); 
     ne.setisInFtp(); 
    /* //Remove 
     try{ 
      m_log.trace("Thread is going to sleep"); 
     Thread.sleep(120000); 
     } 
     catch(Exception e){ 
     //Do nothing 
     } 
     //Remve end */ 
     m_log.trace("FETCH FILE USING NOTIFICATION[ORIGINAL] " +notification); 
     m_log.trace("====Returning from ORIGINAL LIST===== " + Thread.currentThread().getName()); 
     return notification; 
     } 
     else{ 
     m_log.trace("FTP Set.ADDING NOTIFICATION BACK TO RETRY QUEUE:::: ::: "+notification); 
//  EAServer.fileNotification.add(notification); 
     FtpWorker.isRetryList = true; 
     retrylist.add(notification); 
     m_log.trace("===+Returning NULL from ORIGINAL LIST===== " + Thread.currentThread().getName()); 
     return null; 
     } 
    } 

    return null; 
    } 

    private void idle() { 
    while (m_running) { 
    // m_log.trace("ftp wroker" + Thread.currentThread().getName() + " is active"); 
     final String task = getTask(); 
     if (task != null) { 
     // if a task is found increment working threads and run work as long 
     // there exists tasks. 
     work(task); 
     } 
    /* else{ 
     m_log.trace("task is null which means no file notifications received yet"); 
     }*/ 
     try { 
     //m_log.trace("No TASK.SLEEPING" + Thread.currentThread().getName()); 
     sleep(m_workerSleep); 
     } 
     catch (InterruptedException e) { 
     interruptEvent(e); 
     } 
    } 
    } 

    private void work(final String t) { 
    String task = t; 
    do { 
     m_eaServer.execute(task); 
     m_log.trace("TASK EXECUTED :::: " + task + " by Thread " + Thread.currentThread().getName()); 
     yield(); 
     task = getTask(); 

    } 
    while ((task != null) && m_running); 
    } 

    public void shutDown() { 
    m_running = false; 
    } 

    /** 
    * Generates and logs an event. Should be called when the the worker 
    * shutdowns. 
    */ 
    protected void shutDownEvent() { 
    m_log.event("worker " + super.getName() + " has stopped.", 
     LogEventType.KERNEL_WORKER_EVENT); 
    } 

    /** 
    * Generates and logs an event from the given Exception. Should be called when 
    * the worker has been interupted. 
    * 
    * @param e the exception. 
    */ 
    protected void interruptEvent(final Exception e) { 
    m_log.event("worker " + super.getName() + " has interrupted, cause by " 
     + e.getMessage() + ".", LogEventType.KERNEL_WORKER_EVENT); 
    } 


} 
+0

什么是'FtpWorker'?你怎么知道他们被杀了?有可能ftp(?)不再“工作”,并且该线程静静地终止? – PeterMmm

+0

你的FTPWorker类是什么样的? – DaveH

回答

0

如果线程退出,而无需登录关闭事件,那么我会认为事情是抛出一个运行时异常。对于初学者,我会尝试以下看看是什么被抛出:

public void run() { 
    try { 
    idle(); 
    shutDownEvent(); 
    } catch (Throwable e) { 
    m_log.critical(e.getMessage(); 
    } 
}