0

我有一个LinkedBlockingQueue任意挑选的容量为10,并有1000行输入文件。据我所知,我在服务类的main方法中有一个ExecutorService类型变量,它使用Executors.newSingleThreadExecutor() - 一个单独的线程调用buffer.readline(),直到文件line == null,然后处理 - 在循环使用Executors.newSingleThreadExecutor() - 经常线程处理行并将它们写入输出文件,直到!queue.take().equals("Stop")。但是,在写入一些行到文件后,当我处于调试模式时,我看到队列的容量最终达到最大值(10),并且处理线程不执行queue.take()。所有线程都处于running状态,但进程在queue.put()后停止。什么会导致这个问题,并且是否可以使用线程池或多个处理器变量的组合来解决,而不是使用单个变量?One Producer十个消费者文件处理与Executors.newSingleThreadExecutor()

纲要在服务main方法的当前状态:

//app settings to get values for keys within a properties file 
AppSettings appSettings = new AppSettings(); 
BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10); 

maxProdThreads = 1; 
maxConsThreads = 10; 

ExecutorService execSvc = null; 

for (int i = 0; i < maxProdThreads; i++) { 

     execSvc = Executors.newSingleThreadExecutor(); 

     execSvc.submit(new ReadJSONMessage(appSettings,queue)); 

    } 

    for (int i = 0; i < maxConsThreads; i++) { 

     execSvc = Executors.newSingleThreadExecutor(); 

     execSvc.submit(new ProcessJSONMessage(appSettings,queue)); 

    } 

读取方法的代码:

buffer = new BufferedReader(new FileReader(inputFilePath)); 

    while((line = buffer.readLine()) != null){ 

      line = line.trim(); 

      queue.put(line); 

     } 

处理和写代码:

while(!(line=queue.take()).equals("Stop")){ 

     if(line.length() > 10) 
     { 

      try { 
       if(processMessage(line, outputFilePath) == true) 
       { 
        ++count; 
       } 
      } catch (Exception e) { 
       e.printStackTrace(); 
      }    
     } 
    } 


public boolean processMessage(String line, String outputFilePath){ 
    CustomObject cO = new CustomObject(); 
    cO.setText(line); 
    writeToFile1(cO,...); 
    writeToFile2(cO,...); 
} 

public void writeOutputAToFile(CustomObject cO,...){ 
    synchronized(cO){ 
     ... 
     org.apache.commons.io.FileUtils.writeStringToFile(...) 
    } 
} 


public void writeOutputBToFile(CustomObject cO,...){ 
     synchronized(cO){ 
      ... 
      org.apache.commons.io.FileUtils.writeStringToFile(...) 
     } 
    } 
+1

请张贴您的读写代码。我感觉到你以某种方式阻挡在那里。 –

+0

不是你的答案,但我建议在你的循环外部放置一个'Executors.newCachedThreadPool()'并使用它。这将允许您在完成后使用一次调用('exeSvc.shutdown()')关闭所有线程。 – teppic

+0

进行线程转储以查看线程被阻塞的位置。 – teppic

回答

0

在处理和编写代码..确保所有资源都正确关闭..可能是资源可能不能正常关闭,因为线程继续运行,ExecutorService找不到空闲线程...

相关问题