2017-03-03 50 views
0

我想在小批量(每个csv 6000行)中将大量数据加载到PostgreSQL服务器(总共4000万行)中的一个表中。我认为HikariCP将会是这个目标的理想选择。Postgresql与HikariCP性能问题

这是我通过使用 Java 8(1.8.0_65),Postgres JDBC驱动程序9.4.1211和HikariCP 2.4.3从我的数据插入中获得的吞吐量。

6000行4分42秒。

我在做什么错,我该如何提高插入速度?

很少有关于我的设置更多的话:

  • 计划在身后CORP网络我的笔记本电脑上运行。
  • Postgres服务器9.4是具有db.m4.large和50 GB SSD的Amazon RDS。
  • 尚未在表格上创建明确的索引或主键。
  • 计划异步插入每一行与大线程池来容纳请求如下:

    private static ExecutorService executorService = new ThreadPoolExecutor(5, 1000, 30L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(100000)); 
    

数据源配置为:

 private DataSource getDataSource() { 
       if (datasource == null) { 
        LOG.info("Establishing dataSource"); 
        HikariConfig config = new HikariConfig(); 
        config.setJdbcUrl(url); 
        config.setUsername(userName); 
        config.setPassword(password); 
        config.setMaximumPoolSize(600);// M4.large 648 connections tops 
        config.setAutoCommit(true); //I tried autoCommit=false and manually committed every 1000 rows but it only increased 2 minute and half for 6000 rows 
        config.addDataSourceProperty("dataSourceClassName","org.postgresql.ds.PGSimpleDataSource"); 
        config.addDataSourceProperty("dataSource.logWriter", new PrintWriter(System.out)); 
        config.addDataSourceProperty("cachePrepStmts", "true"); 
        config.addDataSourceProperty("prepStmtCacheSize", "1000"); 
        config.addDataSourceProperty("prepStmtCacheSqlLimit", "2048"); 
        config.setConnectionTimeout(1000); 

        datasource = new HikariDataSource(config); 
       } 
       return datasource; 
      } 

这在我读源数据:

private void readMetadata(String inputMetadata, String source) { 
      BufferedReader br = null; 
      FileReader fr = null; 
      try { 
       br = new BufferedReader(new FileReader(inputMetadata)); 
       String sCurrentLine = br.readLine();// skip header; 
       if (!sCurrentLine.startsWith("xxx") && !sCurrentLine.startsWith("yyy")) { 
        callAsyncInsert(sCurrentLine, source); 
       } 
       while ((sCurrentLine = br.readLine()) != null) { 
        callAsyncInsert(sCurrentLine, source); 
       } 
      } catch (IOException e) { 
       LOG.error(ExceptionUtils.getStackTrace(e)); 
      } finally { 
       try { 
        if (br != null) 
         br.close(); 

        if (fr != null) 
         fr.close(); 

       } catch (IOException ex) { 
        LOG.error(ExceptionUtils.getStackTrace(ex)); 
       } 
      } 
    } 

我插入数据异步(或试图使用JDBC!):

  private void callAsyncInsert(final String line, String source) { 
        Future<?> future = executorService.submit(new Runnable() { 
         public void run() { 
          try { 
           dataLoader.insertRow(line, source); 
          } catch (SQLException e) { 
           LOG.error(ExceptionUtils.getStackTrace(e)); 
           try { 
            errorBufferedWriter.write(line); 
            errorBufferedWriter.newLine(); 
            errorBufferedWriter.flush(); 
           } catch (IOException e1) { 
            LOG.error(ExceptionUtils.getStackTrace(e1)); 
           } 
          } 
         } 
        }); 
        try { 
         if (future.get() != null) { 
          LOG.info("$$$$$$$$" + future.get().getClass().getName()); 
         } 
        } catch (InterruptedException e) { 
         LOG.error(ExceptionUtils.getStackTrace(e)); 
        } catch (ExecutionException e) { 
         LOG.error(ExceptionUtils.getStackTrace(e)); 
        } 
       } 

我DataLoader.insertRow低于:

  public void insertRow(String row, String source) throws SQLException { 
        String[] splits = getRowStrings(row); 
        Connection conn = null; 
        PreparedStatement preparedStatement = null; 
        try { 
         if (splits.length == 15) { 
          String ... = splits[0]; 
          //blah blah blah 

          String insertTableSQL = "insert into xyz(...) values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?) "; 
          conn = getConnection(); 
          preparedStatement = conn.prepareStatement(insertTableSQL); 
          preparedStatement.setString(1, column1); 
          //blah blah blah 
          preparedStatement.executeUpdate(); 
          counter.incrementAndGet(); 
          //if (counter.get() % 1000 == 0) { 
           //conn.commit(); 
          //} 
         } else { 
          LOG.error("Invalid row:" + row); 
         } 
        } finally { 
         /*if (conn != null) { 
          conn.close(); //Do preparedStatement.close(); rather connection.close 
         }*/ 
         if (preparedStatement != null) { 
          preparedStatement.close(); 
         } 
        } 
       } 

当pgAdmin4监测,我注意到几件事情:

  • 数最高每秒交易次数接近50.
  • 活动数据库会话只有一个,会话总数为15个。
  • 太多块I/O(打500左右,不知道这应该是一个问题)

screenshot from pgAdmin

+0

减少连接池的大小和使用的线程的数量:更多连接(和更多线程)不一定会带来更好的性能,甚至有一点(这可能低于您当前的设置),更多连接(和线程)实际上会导致性能和吞吐量的下降。此外,你应该**关闭你的方法中的连接,并将它返回到连接池以供重用。 –

+0

另外,你真的检查过,是否与异步插入瓶颈,也许问题是你没有显示的代码(它调用'callAsyncInsert')。 –

+0

感谢您的回应: – bkrish

回答

2

你绝对要使用批量插入,用语句正在准备循环,并自动关闭。在伪代码中:

PreparedStatement stmt = conn.prepareStatement("insert into xyz(...) values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)") 
while (<data>) { 
    stmt.setString(1, column1); 
    //blah blah blah 
    stmt.addBatch(); 
} 
stmt.executeBatch(); 
conn.commit(); 

即使单个连接上的单个线程也应该能够插入> 5000行/秒。

更新:如果要多线程它,连接数应该是数据库CPU核心数x1.5或2。处理线程的数量应该与此相匹配,并且每个处理线程应使用上述模式处理一个CSV文件。但是,您可能会发现许多并发插入到同一个表中的操作会在数据库中创建太多的锁定争用,在这种情况下,您需要退回处理线程的数量,直到找到最佳并发性。

正确大小的池和并发应该很容易达到> 20K行/秒。

此外,请升级到HikariCP v2.6.0。

+0

多线程导入的线程数量不仅取决于服务器上的CPU数量,还取决于该服务器上的硬盘数量。 –

+0

@a_horse_with_no_name尽管如此,但使用Amazon RDS时无法知道该数字。 – brettw

+0

好的。我已经修改了每个建议的程序。升级到2.6.0。增加了批量插入并仅使用连接来加载数据。现在我看到两种不同类型的数据集有很大的不同。数据集#1是一个csv文件中的500K行(准确地说是499951) - 00:02:08.670分钟。数据集#2是83个CSV文件中的498K,每个6K行占用00:02:09.674分钟。所以我能够获得3840ish/sec的吞吐量。如果我没有宏观日志记录,错误处理等重型框架,我可能会得到更多,但我对此感到满意。非常感谢Woolridge先生为这个框架和马克提供帮助。 – bkrish