2014-03-28 64 views
0

我正在写一个程序,需要从一个非常大的文件(400K +行)中读取行,并将每行中的数据发送到Web服务。我决定尝试线程化,并且看到我没有想到的一些行为,看起来像我的BufferedReader开始重用它在给它调用readline()时已经给出的行。从同一文件/奇怪行为读取多个线程

我的程序由两个类组成。一个“Main”类,它启动线程并保存对BufferedReader的静态引用,并具有静态同步的“readNextLine()”方法,线程可以使用该方法基本上调用BufferedReder上的readLine()。和“Runnable”类调用readNextLine()并使用来自每个readNextLine()调用的数据进行webservice调用。我创建了BufferedReader和readNextLine()静态,因为除了将主类的实例传递到线程之外,我只能想到线程共享读者的唯一方法,我不确定哪个更好。

大约5分钟后,我开始在我的web服务中看到错误,表示它正在处理它已经处理过的一行。我能够验证线路确实是多次分开发送的。

有没有人有任何想法,为什么BufferedReader似乎给线程它已经读取的线?我的印象是readline()是连续的,我所需要做的就是确保对readline()的调用是同步的。

我会在下面展示一些主类代码。 runnable本质上是一个while循环,它调用readNextLine()并处理每一行,直到没有剩下的行。

主要类:

//showing reader and thread creation 
inputStream = sftp.get(path to file); 
reader = new BufferedReader(new InputStreamReader(inputStream)); 

ExecutorService executor = Executors.newFixedThreadPool(threads); 
     Collection<Future> futures = new ArrayList<Future>(); 
     for(int i=0;i<threads;i++){ 
     MyRunnable runnable = new MyRunnable(i); 
      futures.add(executor.submit(runnable)); 
     } 
     LOGGER.debug("futures.get()"); 
     for(Future f:futures){ 
      f.get(); //use to wait until all threads are done 
     } 

public synchronized static String readNextLine(){ 
    String results = null; 
    try{ 
     if(reader!=null){ 
     results = reader.readLine(); 
     } 
    }catch(Exception e){ 
     LOGGER.error("Error reading from file"); 
    } 

    return results; 
} 
+1

我认为你需要使用'RandomAccessFile'并让每个线程读取从一个不同的偏移量,尽管我会使用单个线程来读取文件的块和每个块读取,拆分多个线程来联系您的web服务与块的一部分。 –

+0

其实我只是偶然发现了这个,http://docs.oracle.com/javase/7/docs/api/java/nio/channels/AsynchronousFileChannel.html我认为它可能会做你想要做的事情,如果Java 7是一个选项。 –

回答

0

我测试你说什么,但我发现你在readNextLine()方法得到一个错误的逻辑,怎么能reader.readLine()被调用的结果是null和if条件是不是null?

现在,我完成了我的演示,而且似乎效果很好,以下是演示,无需重新读取线发生了:

static BufferedReader reader; 

public static void main(String[] args) throws FileNotFoundException, ExecutionException, InterruptedException { 
    reader = new BufferedReader(new FileReader("test.txt")); 
    ExecutorService service = Executors.newFixedThreadPool(3); 
    List<Future> results = new ArrayList<Future>(); 
    for (int i = 0; i < 3; i++) { 
     results.add(service.submit(new Runnable() { 
      @Override 
      public void run() { 
       try { 
        String line = null; 
        while ((line = readNextLine()) != null) { 
         System.out.println(line); 
        } 
       } catch (IOException e) { 
        e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. 
       } 
      } 
     })); 
    } 
} 

public synchronized static String readNextLine() throws IOException { 
    return reader.readLine(); 
}