2017-06-19 23 views
0

我需要根据时间戳将SCollection元素保存到不同的每小时BigQuery表中。我尝试以下方法 -如何将SCollection元素保存到不同的BigQuery表中?

  1. 通过(TableName, Iterable[TableRow])族元素,然后使用BigQueryClient实例保存每个Iterable[TableRow]到它们各自的表中。这不起作用,因为BigQueryClient不可序列化。

  2. 创建一个SCollection[TableName, PCollection[TableRow]],然后使用BigQueryIO.Write将每个PCollection[TableRow]保存到其各自的BigQuery表。要创建PCollection[TableRow]对象,我使用.map(s => (s._1, sc.pipeline.apply(Create.of(s._2.toList.asJava)))),其中sc是ScioContext的一个实例。这不起作用,因为ScioContext不可序列化。

有没有办法将插入元素流到不同的BigQuery表中?

回答

0

在Beam中,BigQuery IO转换提供several methods用于根据当前窗口选择表格。我相信Dataflow 1.9对窗口相关目标具有相似的方法。数据流2.0还包括DynamicDestinations。请参阅Javadoc以获取基于每个元素内的用户ID选择表的示例。

我对Scio并不熟悉,但它看起来像从BigQuery中暴露底层方法,IO将是最简单的方法来完成这一点。

1

要使用Scio执行此操作,您可以创建自定义输出转换,该转换将写入由DynamicDestinations对象(Apache Beam)指定的目标。该表由输入元素的某些特征动态确定,在此情况下为元素的事件时间(小时)。

定制输出变换的BigQuery:

import com.google.api.services.bigquery.model.TableSchema 
import com.spotify.scio.bigquery.BigQueryUtil 
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition 
import org.apache.beam.sdk.io.gcp.bigquery._ 
import org.apache.beam.sdk.transforms.PTransform 
import org.apache.beam.sdk.values.{PCollection, PDone, ValueInSingleWindow} 


def saveAsBigQuery(tblPrefix: String, 
        tblSchema: String, 
        writeDisposition: WriteDisposition): 
    PTransform[PCollection[TableRow], PDone] = { 

    BigQueryIO.writeTableRows() 
    .to(new DynamicDestinations[TableRow, String] { 

     override def getTable(tblSuffix: String): TableDestination = { 
     // TODO: construct table name 
     val tblName = "%s_%s".format(tblPrefix, tblSuffix) 
     new TableDestination(tblName, null) 
     } 

     override def getDestination(tblRow: ValueInSingleWindow[TableRow]): String = { 
     // TODO: determine hourly table suffix based on event time in tblRow object 
     } 

     override def getSchema(destination: String): TableSchema = { 
     BigQueryUtil.parseSchema(tblSchema) 
     } 
    }) 
    .withWriteDisposition(writeDisposition) 
    .asInstanceOf[PTransform[PCollection[TableRow], PDone]] 
} 

应用自定义输出变换使用上面的函数:

import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write 


val tblPrefix = "table_prefix" 
val tblSchema = "table_schema" // TODO: this needs to be in valid BigQuery schema format 
val writeDisposition = Write.WriteDisposition.WRITE_APPEND 

val bqTransform = saveAsBigQuery(
    tblPrefix, 
    tblSchema, 
    writeDisposition) 

// assuming tblRows is an SCollection[TableRow] 
tblRows.saveAsCustomOutput("saveAsBigQuery", bqTransform) 
相关问题