我正在写一个程序,需要从一个非常大的文件(400K +行)中读取行,并将每行中的数据发送到Web服务。我决定尝试线程化,并且看到我没有想到的一些行为,看起来像我的BufferedReader开始重用它在给它调用readline()时已经给出的行。从同一文件/奇怪行为读取多个线程
我的程序由两个类组成。一个“Main”类,它启动线程并保存对BufferedReader的静态引用,并具有静态同步的“readNextLine()”方法,线程可以使用该方法基本上调用BufferedReder上的readLine()。和“Runnable”类调用readNextLine()并使用来自每个readNextLine()调用的数据进行webservice调用。我创建了BufferedReader和readNextLine()静态,因为除了将主类的实例传递到线程之外,我只能想到线程共享读者的唯一方法,我不确定哪个更好。
大约5分钟后,我开始在我的web服务中看到错误,表示它正在处理它已经处理过的一行。我能够验证线路确实是多次分开发送的。
有没有人有任何想法,为什么BufferedReader似乎给线程它已经读取的线?我的印象是readline()是连续的,我所需要做的就是确保对readline()的调用是同步的。
我会在下面展示一些主类代码。 runnable本质上是一个while循环,它调用readNextLine()并处理每一行,直到没有剩下的行。
主要类:
//showing reader and thread creation
inputStream = sftp.get(path to file);
reader = new BufferedReader(new InputStreamReader(inputStream));
ExecutorService executor = Executors.newFixedThreadPool(threads);
Collection<Future> futures = new ArrayList<Future>();
for(int i=0;i<threads;i++){
MyRunnable runnable = new MyRunnable(i);
futures.add(executor.submit(runnable));
}
LOGGER.debug("futures.get()");
for(Future f:futures){
f.get(); //use to wait until all threads are done
}
public synchronized static String readNextLine(){
String results = null;
try{
if(reader!=null){
results = reader.readLine();
}
}catch(Exception e){
LOGGER.error("Error reading from file");
}
return results;
}
我认为你需要使用'RandomAccessFile'并让每个线程读取从一个不同的偏移量,尽管我会使用单个线程来读取文件的块和每个块读取,拆分多个线程来联系您的web服务与块的一部分。 –
其实我只是偶然发现了这个,http://docs.oracle.com/javase/7/docs/api/java/nio/channels/AsynchronousFileChannel.html我认为它可能会做你想要做的事情,如果Java 7是一个选项。 –