0

我在存储CSV文件,我想读它,并将其写入BigQuery资料表。这是我的CSV文件,其中第一行是标题:GCP Dataflow-从存储读取CSV文件,写至BigQuery

GroupName,Groupcode,GroupOwner,GroupCategoryID 
System Administrators,sysadmin,13456,100 
Independence High Teachers,HS Teachers,,101 
John Glenn Middle Teachers,MS Teachers,13458,102 
Liberty Elementary Teachers,Elem Teachers,13559,103 
1st Grade Teachers,1stgrade,,104 
2nd Grade Teachers,2nsgrade,13561,105 
3rd Grade Teachers,3rdgrade,13562,106 
Guidance Department,guidance,,107 
Independence Math Teachers,HS Math,13660,108 
Independence English Teachers,HS English,13661,109 
John Glenn 8th Grade Teachers,8thgrade,,110 
John Glenn 7th Grade Teachers,7thgrade,13452,111 
Elementary Parents,Elem Parents,,112 
Middle School Parents,MS Parents,18001,113 
High School Parents,HS Parents,18002,114 

这是我的代码:

public class StorgeBq { 

     public static class StringToRowConverter extends DoFn<String, TableRow> { 

      private String[] columnNames; 

      private boolean isFirstRow = true; 

      @ProcessElement 
      public void processElement(ProcessContext c) { 
       TableRow row = new TableRow(); 

       String[] parts = c.element().split(","); 

       if (isFirstRow) { 
        columnNames = Arrays.copyOf(parts, parts.length); 
        isFirstRow = false; 
       } else { 
        for (int i = 0; i < parts.length; i++) { 
         row.set(columnNames[i], parts[i]); 
        } 
        c.output(row); 
       } 
      } 
     } 

     public static void main(String[] args) { 

      DataflowPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() 
         .as(DataflowPipelineOptions.class); 
        options.setZone("europe-west1-c"); 
        options.setProject("mydata-dev"); 
        options.setRunner(DataflowRunner.class); 
        Pipeline p = Pipeline.create(options); 

      p.apply("ReadLines", TextIO.read().from("gs://mydata3-dataflow/C2ImportGroupsSample.csv")) 
      .apply("ConverToBqRow",ParDo.of(new StringToRowConverter())) 
      .apply("WriteToBq", BigQueryIO.<TableRow>writeTableRows() 
        .to("mydata-dev:DF_TEST.dataflow_table") 
        .withWriteDisposition(WriteDisposition.WRITE_APPEND) 
        .withCreateDisposition(CreateDisposition.CREATE_NEVER)); 
      p.run().waitUntilFinish(); 
     } 

} 

存在一些问题: 1)作业开始执行的时候,我看到有一个名为“DropInputs”的进程,我没有在我的代码中定义!并在所有任务开始前运行,为什么? enter image description here

2)为什么pipline不与第一任务 “readlines方法” 开始? 3)在日志文件中,我看到在任务“WriteToBq”中试图找到其中一个数据为字段,例如“1st Grade Teachers”不是字段而是“GroupName”的数据:

"message" : "JSON parsing error in row starting at position 0: No such field: 1st Grade Teachers.", 
+0

你有一个作业ID?我不认为DropInputs的东西应该显示在这里。 – jkff

回答

1

你在代码中遇到了一些问题。但首先,关于“DropInputs”阶段 - 你可以放心地忽略它。这是this错误报告的结果。我仍然不明白为什么需要展示它(我们的许多用户也感到困惑),而且我很乐意让Google员工对此表示赞赏。在我看来,它只是混乱。

权,到现在你的代码:

  1. 您假定第一行读取将是你的头。这是一个不正确的假设。数据流并行读取,所以标题行可以随时到达。而不是使用boolean标志进行检查,每次检查string值本身,例如每次在ParDoif (c.element.contains("GroupName") then..
  2. 你缺少大量查询表模式。您需要将withSchema(..)添加到您的BigQuery接收器。这是我的一个公共管道的example
+0

谢谢,但你能告诉我什么时候我想在现有表格中写入BigQuery,应该如何在不添加“withSchema(..)”的情况下编写代码?因为这是一个csv的例子,但我应该为几个CSV文件完成这项任务,每个CSV文件都有大约500个列,并且总是添加模式并不容易。顺便说一句,你有读取CSV和写入BigQuery的例子吗? – Majico

+0

我已成功将CSV文件写入BQ,唯一的问题是无需使用“withSchema”或简单的方式从BQ写入BQ,而不是将其写入代码,您是否有任何想法? – Majico

+1

请为此打开一个单独的问题。 – jkff