0

当我从本地机器运行我的管道时,我可以更新驻留在云Sql实例中的表。但是,当我将其移动到使用DataflowRunner运行时,同样出现以下异常。如何在访问Google sql实例的数据流中运行梁类?

从我的日食连接,我创建的数据源配置为 .create("com.mysql.jdbc.Driver", "jdbc:mysql://<ip of sql instance > :3306/mydb")

与运行Dataflow运行器时相同,我更改为 .create("com.mysql.jdbc.GoogleDriver", "jdbc:google:mysql://<project-id>:<instance-name>/my-db")

  1. 我是否应该将实例的区域信息加前缀?

例外,当我运行此我得到的是如下:

Jun 22, 2017 6:53:58 PM org.apache.beam.runners.dataflow.util.MonitoringUtil$LoggingHandler process 
INFO: 2017-06-22T13:23:51.583Z: (840be37ab35d3d0d): Starting 2 workers in us-central1-f... 
Jun 22, 2017 6:53:58 PM org.apache.beam.runners.dataflow.util.MonitoringUtil$LoggingHandler process 
INFO: 2017-06-22T13:23:51.634Z: (dabfae1dc9365d10): Executing operation JdbcIO.Read/Create.Values/Read(CreateSource)+JdbcIO.Read/ParDo(Read)+JdbcIO.Read/ParDo(Anonymous)+JdbcIO.Read/GroupByKey/Reify+JdbcIO.Read/GroupByKey/Write 
Jun 22, 2017 6:54:49 PM org.apache.beam.runners.dataflow.util.MonitoringUtil$LoggingHandler process 
INFO: 2017-06-22T13:24:44.762Z: (21395b94f8bf7f61): Workers have started successfully. 

SEVERE: 2017-06-22T13:25:30.214Z: (3b988386f963503e): java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException: java.sql.SQLException: Cannot load JDBC driver class 'com.mysql.jdbc.GoogleDriver' 
    at com.google.cloud.dataflow.worker.runners.worker.MapTaskExecutorFactory$3.typedApply(MapTaskExecutorFactory.java:289) 
    at com.google.cloud.dataflow.worker.runners.worker.MapTaskExecutorFactory$3.typedApply(MapTaskExecutorFactory.java:261) 
    at com.google.cloud.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:55) 
    at com.google.cloud.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:43) 
    at com.google.cloud.dataflow.worker.graph.Networks.replaceDirectedNetworkNodes(Networks.java:78) 
    at com.google.cloud.dataflow.worker.runners.worker.MapTaskExecutorFactory.create(MapTaskExecutorFactory.java:152) 
    at com.google.cloud.dataflow.worker.runners.worker.DataflowWorker.doWork(DataflowWorker.java:272) 
    at com.google.cloud.dataflow.worker.runners.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:244) 
    at com.google.cloud.dataflow.worker.runners.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:125) 
    at com.google.cloud.dataflow.worker.runners.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:105) 
    at com.google.cloud.dataflow.worker.runners.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:92) 
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: org.apache.beam.sdk.util.UserCodeException: java.sql.SQLException: Cannot load JDBC driver class 'com.mysql.jdbc.GoogleDriver' 
    at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36) 
    at org.apache.beam.sdk.io.jdbc.JdbcIO$Read$ReadFn$auxiliary$M7MKjX9p.invokeSetup(Unknown Source) 
    at com.google.cloud.dataflow.worker.runners.worker.DoFnInstanceManagers$ConcurrentQueueInstanceManager.deserializeCopy(DoFnInstanceManagers.java:65) 
    at com.google.cloud.dataflow.worker.runners.worker.DoFnInstanceManagers$ConcurrentQueueInstanceManager.peek(DoFnInstanceManagers.java:47) 
    at com.google.cloud.dataflow.worker.runners.worker.UserParDoFnFactory.create(UserParDoFnFactory.java:100) 
    at com.google.cloud.dataflow.worker.runners.worker.DefaultParDoFnFactory.create(DefaultParDoFnFactory.java:70) 
    at com.google.cloud.dataflow.worker.runners.worker.MapTaskExecutorFactory.createParDoOperation(MapTaskExecutorFactory.java:365) 
    at com.google.cloud.dataflow.worker.runners.worker.MapTaskExecutorFactory$3.typedApply(MapTaskExecutorFactory.java:278) 
    ... 14 more 

任何有助于解决这个问题真的赞赏。这是我第一次尝试将束流管道作为数据流工作来运行。

PipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); 

    ((DataflowPipelineOptions) options).setNumWorkers(2); 
    ((DataflowPipelineOptions)options).setProject("xxxxx"); 
    ((DataflowPipelineOptions)options).setStagingLocation("gs://xxxx/staging"); 
    ((DataflowPipelineOptions)options).setRunner(DataflowRunner.class); 
    ((DataflowPipelineOptions)options).setStreaming(false); 
    options.setTempLocation("gs://xxxx/tempbucket"); 
    options.setJobName("sqlpipeline"); 
PCollection<Account> collection = dataflowPipeline.apply(JdbcIO.<Account>read() 
      .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration 
        .create("com.mysql.jdbc.GoogleDriver", "jdbc:google:mysql://project-id:testdb/db") 
        .withUsername("root").withPassword("root")) 
      .withQuery(
        "select account_id,account_parent,account_description,account_type,account_rollup,Custom_Members from account") 
      .withCoder(AvroCoder.of(Account.class)).withStatementPreparator(new JdbcIO.StatementPreparator() { 
       public void setParameters(PreparedStatement preparedStatement) throws Exception { 
        preparedStatement.setFetchSize(1); 
        preparedStatement.setFetchDirection(ResultSet.FETCH_FORWARD); 

       } 
      }).withRowMapper(new JdbcIO.RowMapper<Account>() { 
       public Account mapRow(ResultSet resultSet) throws Exception { 
        Account account = new Account(); 
        account.setAccount_id(resultSet.getInt("account_id")); 
        account.setAccount_parent(resultSet.getInt("account_parent")); 
        account.setAccount_description(resultSet.getString("account_description")); 
        account.setAccount_type(resultSet.getString("account_type")); 
        account.setAccount_rollup("account_rollup"); 
        account.setCustom_Members("Custom_Members"); 
        return account; 
       } 
      })); 
+0

感谢巴勃罗重新格式化。 (Y) – Balu

回答

1

你是否正确地拉入了com.google.cloud.sql/mysql-socket-factory maven dependency?看起来你没有加载课程。

https://cloud.google.com/appengine/docs/standard/java/cloud-sql/#Java_Connect_to_your_database

+0

我使用的是以下版本:mysql-socket-factory - 1.0.2。这是我的代码中的旧版本。但是,即使更新到1.0.2之后,它也会出现相同的错误。 “无法加载JDBC驱动程序类'com.mysql.jdbc.GoogleDriver'” – Balu

+0

Hi Balu,是否可以检查您的构建输出以查看您是否拥有包含com.mysql.jdbc.GoogleDriver类的jar? 你也可以打印maven树,看看是否有任何冲突 https://maven.apache.org/plugins/maven-dependency-plugin/examples/resolving-conflicts-using-the-dependency-tree。 html http://maven.apache.org/plugins/maven-dependency-plugin/tree-mojo.html –

0

您好我认为这是更好地与“com.mysql.jdbc.Driver”继续前进,因为谷歌驱动程序支持的应用程序引擎的部署

所以其道理这是我的流水线配置看起来很像,它完美对我很好

PCollection < KV < Double, Double >> exchangeRates = p.apply(JdbcIO. < KV < Double, Double >> read() 
    .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create("com.mysql.jdbc.Driver", "jdbc:mysql://ip:3306/dbname?user=root&password=root&useUnicode=true&characterEncoding=UTF-8") 
      ) 
    .withQuery(
     "SELECT PERIOD_YEAR, PERIOD_YEAR FROM SALE") 
    .withCoder(KvCoder.of(DoubleCoder.of(), DoubleCoder.of())) 
    .withRowMapper(new JdbcIO.RowMapper < KV < Double, Double >>() { 
     @Override 
     public KV<Double, Double> mapRow(java.sql.ResultSet resultSet) throws Exception { 
     LOG.info(resultSet.getDouble(1)+ "Came"); 
      return KV.of(resultSet.getDouble(1), resultSet.getDouble(2)); 
     } 
})); 

希望这将有助于

相关问题