2016-06-08 25 views
0

我有一个包含大约5.2M行的大型CSV文件。我想解析这个文件并将数据插入到数据库中。我为此使用apache骆驼。使用Apache Camel插入大型CSV文件时的GC问题

路线是相当容易的(简化本示例)

from("file:work/customer/").id("customerDataRoute") 
.split(body().tokenize("\n")).streaming() 
.parallelProcessing() 
.unmarshal(bindyCustomer) 
.split(body()) 
.process(new CustomerProcessor()) 
.to("sql:INSERT INTO CUSTOMER_DATA(`FIELD1`,`FIELD2`) VALUES(#,#)"); 

bindyCustomer是CSV文件和 CustomerProcessor一个BindyCsvDataFormat是返回Bindy客户对象的数据作为对象的阵列的处理器为SQL插入。实际的对象有39个字段(以上简化)。

这对第一个800.000到1.000.000行都可以,但是它会停下来。

我用JVisualVM和Visual GC插件监视了骆驼实例,我可以看到老一代填满了,当它达到最大值时,整个系统停止运行,但不会崩溃。 在这一点上,老一代已经满员了,伊甸园的空间几乎已经满了,两个幸存者空间都是空的(因为它不能将任何东西移动到我猜想的老一代)。

那么这里有什么问题?这看起来像是Camel SQL组件中的内存泄漏。 数据主要存储在ConcurrentHashMap对象中。

当我拿出SQL组件时,老一代几乎没有填充。

我正在使用骆驼2.15.1将尝试使用2.17.1看看是否解决了这个问题。

更新:我试过骆驼2.17.1(同样的问题),我试图用java.sql.Statement.executeUPdate在Java中插入插入。有了这个选项,我设法插入了大约2.6 M行,但随后它也停止了。有趣的是我没有收到内存错误。它只是停下来。

回答

1

好吧我想通了这里出了什么问题。基本上读取部分与插入部分相比太快。这个例子有点过于简单,因为在阅读和插入之间有一个seda队列(因为我必须对内容做出选择,而这个内容在示例中没有显示)。 但即使没有seda队列,它从来没有完成。我意识到当我杀死骆驼时发生了什么错误,并得到一条消息,说明还有几千条机上消息。

因此,当插入端无法跟上时,并行处理读取没有意义。

from("file:work/customer/").id("customerDataRoute") 
     .onCompletion().log("Customer data processing finished").end() 
     .log("Processing customer data ${file:name}") 
     .split(body().tokenize("\n")).streaming() //no more parallel processing 
     .choice() 
      .when(simple("${body} contains 'HEADER TEXT'")) //strip out the header if it exists 
      .log("Skipping first line") 
      .endChoice() 
     .otherwise() 
      .to("seda:processCustomer?size=40&concurrentConsumers=20&blockWhenFull=true") 
      .endChoice(); 


from("seda:processCustomer?size=40&concurrentConsumers=20&blockWhenFull=true") 
      .unmarshal(bindyCustomer) 
      .split(body()) 
      .process(new CustomerProcessor()).id("CustomProcessor") //converts one Notification into an array of values for the SQL insert 
.to("sql:INSERT INTO CUSTOMER_DATA(`FIELD1`,`FIELD2`) VALUES(#,#)"); 

我在SEDA队列上定义了一个大小(默认情况下它不受限制),并在队列满时调用线程块。

seda:processCustomer?size=40&concurrentConsumers=20&blockWhenFull=true 

并行处理是通过在SEDA队列上使用20个并发消费者完成的。请注意,出于什么原因,您在调用路由时也必须指定队列大小(不仅在您定义它的位置)。

现在的内存消耗是最小的,它插入500万记录没有问题。

1

我没有测试你的代码,但是,我确实注意到你的第二个拆分语句不是流式。我建议尝试一下。如果你有太多并行的工作流,GC可能会在你释放资源之前填满。 SQL语句需要的时间可能是什么让GC得到了太多的建立时间,因为你正在对主要处理进行并行化。

from("file:work/customer/").id("customerDataRoute") 
    .split(body().tokenize("\n")).streaming().parallelProcessing() 
     .unmarshal(bindyCustomer) 
     .split(body()).streaming() //Add a streaming call here and see what happens 
      .process(new CustomerProcessor()) 
      .to("sql:INSERT INTO CUSTOMER_DATA(`FIELD1`,`FIELD2`) VALUES(#,#)"); 
+0

感谢您的提示。我试过了,但并没有解决问题。 .unmarchal(bindyCustomer)只返回一个元素的数组,所以在这种情况下流应该不会有太大的区别。你能想到其他可能会出错的东西吗?我将尝试在Java中执行插入以查看是否解决了问题。 – Ben

+0

嗯,我有一些粗略的猜测。你能够添加id标签到你的路线,然后打开你的JConsole来确认所有线程是“挂起”的吗? –

+0

这条路线已经作为一个ID(customerDataRoute)或者你指的是别的吗? – Ben