2016-08-04 30 views
0

以前,PCollection格式化过的结果;我用下面的代码在大查询插入行:从Dataflow插入BigQuery中的数据

    // OPTION 1 
PCollection<TableRow> formattedResults = .... 
formattedResults.apply(BigQueryIO.Write.named("Write").to(tableName) 
          .withSchema(tableSchema) 
          .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) 
          .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)); 

而且所有的行直接插在BigQuery中,都好到这里。但现在我已经开始动态识别表名和行,以便我如下创建PCollection:(字符串将表名称,然后它作为值行)

PCollection<KV<String, TableRow>> tableRowMap // OPTION 2 

而且,我创建的行组这将在同一个表中去,因为:

PCollection<KV<String, Iterable<TableRow>>> groupedRows //OPTION 3 

其中key(字符串)是BQ表名和值在BQ要插入的行的列表。

使用选项1,我可以使用上面显示的代码轻松地在BQ中插入行,但相同的代码不能与OPTION 2或OPTION 3一起使用,因为在这种情况下,我的表名是映射中的键。有没有办法使用OPTION 2或OPTION 3在表格中插入行。任何链接或代码示例都会有很大的帮助。

回答

1

Dataflow正在向每个窗口的表写入最近的东西(并且您可以创建自己的BoundedWindow子类和WindowFn以在窗口中包含所需的任何数据)。为此,请使用

to(SerializableFunction<BoundedWindow,String> tableSpecFunction) 

on BigQueryIO.Write。

请注意,此功能使用BigQuery的流式上载功能,每个表限制为100MB/s。另外,上传不是原子的,因此失败的批处理作业可能只上传部分输出。

-1

您还可以选择创建自己的DoFn,它直接将数据插入bigquery,而不是依赖BigQueryIO.Write。 从技术上讲,您需要创建BigQueryTableInserter,您可以使用insertAll(TableReference ref, List<TableRow> rowList)将东西插入到所需的表格中。

您可以使用像创建TableReference: new TableReference().setProjectId("projectfoo").setDatasetId("datasetfoo").setTableId("tablefoo")

这是不是100%推荐BigQueryIO做一些不错的东西分裂是需要插入到最大化吞吐量行和正确处理重试。

相关问题