1

我正在尝试使用数据流来读取pubsub消息并将其写入大查询。我获得了Google团队的Alpha访问权限,并获得了提供的示例,但现在我需要将其应用于我的场景。使用Python SDK进行数据流流式传输:将PubSub消息转换为BigQuery输出

PubSub的有效载荷:

Message { 
    data: {'datetime': '2017-07-13T21:15:02Z', 'mac': 'FC:FC:48:AE:F6:94', 'status': 1} 
    attributes: {} 
} 

大查询模式:

schema='mac:STRING, status:INTEGER, datetime:TIMESTAMP', 

我的目标是简单地读取信息的有效载荷和插入的BigQuery。我正在努力解决转换问题,以及如何将键/值映射到大型查询模式。

我对此很新,所以任何帮助表示赞赏。

当前代码:https://codeshare.io/ayqX8w

谢谢!

回答

0

写入Python SDK的BigQuery接收器的数据应该是字典的形式,其中字典的每个键都给出BigQuery表的字段,并且相应的值给出要写入该字段的值。对于BigQuery RECORD类型,值本身应该是具有相应键值对的字典。

我提起JIRA以改善相应蟒模块的文档中梁:https://issues.apache.org/jira/browse/BEAM-3090

+0

感谢您的反馈意见。经过一些更多的实验后,似乎传入的pub/sub消息以字符串形式出现(显然)。我必须应用将线条对象转换为字典的转换。在数据流中遇到的错误消息是: **组中的输入类型提示违规:预期的元组[TypeVariable [K],TypeVariable [V]],获得 ** – glux

1

我能够通过定义它加载到JSON对象函数成功解析发布订阅串(见parse_pubsub())。我遇到的一个奇怪问题是我无法在全局范围内导入json。我收到“NameError:全球名称”JSON'未定义“错误。我不得不在函数中导入json。

见下面我的工作代码:

from __future__ import absolute_import 

import logging 
import argparse 
import apache_beam as beam 
import apache_beam.transforms.window as window 

'''Normalize pubsub string to json object''' 
# Lines look like this: 
    # {'datetime': '2017-07-13T21:15:02Z', 'mac': 'FC:FC:48:AE:F6:94', 'status': 1} 
def parse_pubsub(line): 
    import json 
    record = json.loads(line) 
    return (record['mac']), (record['status']), (record['datetime']) 

def run(argv=None): 
    """Build and run the pipeline.""" 

    parser = argparse.ArgumentParser() 
    parser.add_argument(
     '--input_topic', required=True, 
     help='Input PubSub topic of the form "/topics/<PROJECT>/<TOPIC>".') 
    parser.add_argument(
     '--output_table', required=True, 
     help= 
     ('Output BigQuery table for results specified as: PROJECT:DATASET.TABLE ' 
     'or DATASET.TABLE.')) 
    known_args, pipeline_args = parser.parse_known_args(argv) 

    with beam.Pipeline(argv=pipeline_args) as p: 
    # Read the pubsub topic into a PCollection. 
    lines = (p | beam.io.ReadStringsFromPubSub(known_args.input_topic) 
       | beam.Map(parse_pubsub) 
       | beam.Map(lambda (mac_bq, status_bq, datetime_bq): {'mac': mac_bq, 'status': status_bq, 'datetime': datetime_bq}) 
       | beam.io.WriteToBigQuery(
        known_args.output_table, 
        schema=' mac:STRING, status:INTEGER, datetime:TIMESTAMP', 
        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, 
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND) 
      ) 

if __name__ == '__main__': 
    logging.getLogger().setLevel(logging.INFO) 
    run() 
0

我也有类似的用例(读取行从PubSub的字符串,将它们转换为类型的字典,然后处理它们)。

我正在使用ast.literal_eval(),这似乎为我工作。该命令将评估字符串,但采用比eval()更安全的方式(请参阅here)。它应该返回一个字符串,其键是字符串,并且值被评估为最可能的类型(int,str,float ...)。您可能想确保这些值采用正确的类型。

这将使你喜欢这个

import ast 
lines = (p | beam.io.ReadStringsFromPubSub(known_args.input_topic) 
      | "JSON row to dict" >> beam.Map(
         lambda s: ast.literal_eval(s)) 
      | beam.io.WriteToBigQuery(...) 
     ) 
我还没有使用的BigQuery(还)

管道,所以我不能帮你的最后一行,但你写乍一看似乎是正确的。