1
基本上我必须使用多线程处理一个大型的csv文件,其中包含近100万条记录。ExecutorService多线程无法正常工作,但在调试模式下正常工作
我创建了一个类IngestionCallerThread
public class IngestionCallerThread {
public static void main(String[] args) {
try {
int count = 0;
InputStream ios = IngestionCallerThread.class.getClassLoader().getResourceAsStream("aa10.csv");
byte[] buff = new byte[8000];
int bytesRead = 0;
ByteArrayOutputStream bao = new ByteArrayOutputStream();
while ((bytesRead = ios.read(buff)) != -1) {
bao.write(buff, 0, bytesRead);
}
byte[] data = bao.toByteArray();
ByteArrayInputStream bin = new ByteArrayInputStream(data);
BufferedReader fileInputStreamBufferedReader = new BufferedReader(new InputStreamReader(bin));
while ((fileInputStreamBufferedReader.readLine()) != null) {
count++;
}
bin.reset();
int numberOfThreads = 12;
int rowsForEachThread = count/numberOfThreads;
int remRows = count % numberOfThreads;
int startPosition = 0;
System.out.println(count);
ExecutorService es = Executors.newCachedThreadPool();
for (int i = 0; i < numberOfThreads && startPosition < count; i++) {
if (remRows > 0 && i + 1 >= numberOfThreads)
rowsForEachThread = remRows;
IngestionThread ingThread = new IngestionThread(bin, startPosition, rowsForEachThread);
es.execute(ingThread);
startPosition = (startPosition + rowsForEachThread);
}
es.shutdown();
if (es.isTerminated()) {
System.out.println("Completed");
}
// t2.start();
} catch (IOException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
,我用它来打电话
public class IngestionThread implements Runnable {
InputStream is;
long startPosition;
long length;
public IngestionThread(InputStream targetStream, long position, long length) {
this.is = targetStream;
this.startPosition = position;
this.length = length;
}
@Override
public void run() {
// TODO Auto-generated method stub
int currentPosition = 0;
try {
is.reset();
} catch (IOException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
BufferedReader fileInputStreamBufferedReader = new BufferedReader(new InputStreamReader(is));
if (startPosition != 0) {
String line;
try {
while (((line = fileInputStreamBufferedReader.readLine())) != null) {
if (currentPosition + 1 == startPosition)
break;
currentPosition++;
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
try {
int execLength = 0;
String line;
while ((line = fileInputStreamBufferedReader.readLine()) != null && execLength < length) {
System.out.println(line);
execLength++;
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
我用的20条小csv文件测试,我已经实现了另一个运行的类。问题是当我调试类几乎所有的记录正在打印。但是当我运行这个类时,有时会读取15条记录,有时会读取12条记录。我不确定是什么问题。任何帮助将非常感激。提前致谢。
修复了这个问题,它像一个魅力工作..非常感谢.. :) –