2012-06-26 163 views
2

我有超过700K行的巨大csv行数。我必须解析那些csv的行和do ops。我想通过使用线程来做到这一点。我第一次尝试做的很简单。每个线程都应该有唯一的csv行。我只有有限的行数只能读到3000。我创建了3个线程。每个线程应读取一行csv。以下是代码:java读取巨大csv的行数

import java.io.*; 

class CSVOps implements Runnable 
{ 
    static int lineCount = 1; 
    static int limit = 3000; 
    BufferedReader CSVBufferedReader; 
    public CSVOps(){} // default constructor 
    public CSVOps(BufferedReader br){ 
     this.CSVBufferedReader = br; 
    } 
    private synchronized void readCSV(){ 
     System.out.println("Current thread "+Thread.currentThread().getName()); 
     String line; 
     try { 
      while((line = CSVBufferedReader.readLine()) != null){ 
       System.out.println(line); 
       lineCount ++; 
       if(lineCount >= limit){ 
        break; 
       } 
      } 
     } catch (IOException e) { 
      e.printStackTrace(); 
     } 
    }  
    public void run() { 
     readCSV(); 
    } 
} 

class CSVResourceHandler 
{ 
    String CSVPath; 

    public CSVResourceHandler(){ }// default constructor 

    public CSVResourceHandler(String path){ 
     File f = new File(path); 
     if(f.exists()){ 
      CSVPath = path; 
     }else{ 
      System.out.println("Wrong file path! You gave: "+path); 
     } 
    } 
    public BufferedReader getCSVFileHandler(){ 
     BufferedReader br = null; 
     try{ 
      FileReader is = new FileReader(CSVPath); 
      br = new BufferedReader(is); 
     }catch(Exception e){ 

     } 
     return br; 
    } 
} 
public class invalidRefererCheck 
{ 
    public static void main(String [] args) throws InterruptedException 
    { 
     String pathToCSV = "/home/shantanu/DEV_DOCS/Contextual_Work/invalid_domain_kw_site_wise_click_rev2.csv"; 
     CSVResourceHandler csvResHandler = new CSVResourceHandler(pathToCSV); 
     CSVOps ops = new CSVOps(csvResHandler.getCSVFileHandler()); 
     Thread t1 = new Thread(ops); 
     t1.setName("T1"); 
     Thread t2 = new Thread(ops); 
     t1.setName("T2"); 
     Thread t3 = new Thread(ops); 
     t1.setName("T3"); 
     t1.start(); 
     t2.start(); 
     t3.start(); 
    } 
} 

类CSVResourceHandler简单的发现,如果通过文件存在,然后创建一个BufferedReader,并赋予它。此阅读器传递给CSVOps类。它有一个方法readCSV,它读取一行csv并打印出来。有一个限制设置为3000.

现在线程为no来搞乱计数,我声明这些限制和计数变量都为静态。当我运行这个程序时,我得到了奇怪的输出。我只有约1000条记录,有时候我得到了1500条。它们是随机排列的。在输出结束时,我得到2行csv和当前线程名称出来主要!

我是线程中的新手。我想要的是阅读这个csv应该变快。请建议可以做些什么

+6

使用多个阅读线程将无济于事。阻塞点不是CPU而是IO。 –

+2

我建议你用一个线程工作,并在读取它时将每一行传递给一个可由第二个线程使用的队列。这确保了订单得以保留。 –

+0

@PeterLawrey:很酷的主意! – Shades88

回答

0

我建议大块阅读文件。分配一个大的缓冲区对象,读取一个块,从结尾解析出来找到最后一个EOL字符,将缓冲区的最后一位复制到一个临时字符串中,在EOL + 1处将一个空移入缓冲区,从缓冲区排队引用,立即创建一个新的,首先复制临时字符串,然后填充缓冲区的其余部分,并重复,直到EOF。重复,直到完成。使用线程池来分析/处理缓冲区。

你必须排队整个有效行的块。排队单行将导致线程通信花费的时间超过解析。

请注意,此操作和类似操作可能会导致池中的线程“乱序”处理块。如果必须保留顺序(例如,对输入文件进行排序并且输出进入另一个必须保持排序的文件),则可以让chunk-assembler线程在每个块对象中插入一个序列号。池线程然后可以将处理后的缓冲区传递到另一个线程(或任务),该线程保留无序块的列表,直到所有先前的块已经进入。

多线程不一定非常困难/危险/无效。如果您使用队列/池/任务,请避免同步/连接,不要连续创建/终止/销毁线程,并且只能围绕一次只能有一个线程工作的大缓冲区对象排队,您应该看到一个很好的加速几乎不存在死锁,虚假分享等可能性

这种加速的下一步将是预先分配一个缓冲池池以消除缓冲区和相关GC的连续创建/删除,并在开始时使用[L1缓存大小]“死区”的每个缓冲区来完全消除缓存共享。 在多核盒子上(尤其是使用SSD),这将大大快速。

编辑 - 哦,Java,对。我对我的答案中的'CplusPlus-iness'表示歉意,并带有空终止符。尽管如此,其余的观点都是可以的。这应该是一个语言不可知的答案:)

+0

感谢您的详细解答。 '这应该是一个语言不可知的答案'。是的,我会记住这一点,看看我是否可以在Java中适应:) – Shades88

2

好吧,首先,不要使用多个线程从单个机械磁盘执行并行I/O。它实际上会降低性能,因为每当线程有机会运行时,机械头需要寻找下一个读取位置。因此,您不必要地在磁盘的磁头周围弹跳,这是一项昂贵的操作。

使用单个生产者多个消费者模型来读取使用单个线程的行并使用工作者池来处理它们。

在你的问题:

你不应该实际上是等待线程退出主前完成?

public class invalidRefererCheck 
{ 
    public static void main(String [] args) throws InterruptedException 
    { 
     ... 
     t1.start(); 
     t2.start(); 
     t3.start(); 

     t1.join(); 
     t2.join(); 
     t3.join(); 
    } 
} 
+0

默认情况下,java线程不是守护进程,所以没有他不必。 – Voo

+0

'使用单个生产者多个消费者模型来读取使用单个线程的行并使用一组工作者来处理它们。谢谢你的提示,我刚刚上了它。而join()实际上是将一个线程的处理排队在另一个线程之后,这样在所有线程完成之前不会暂停主线程,对吧?我的意思是理论上是这样的:O – Shades88

+0

@ Shades88:实际上没有。在所有线程启动后,它们将与主线程并行运行。 “加入”只是保证主线程在孩子完成之前不会进展,但孩子们仍然会相互平行地工作。 – Tudor