我想读取一个csv文件并使用apache beam数据流将其写入BigQuery。为了做到这一点,我需要以字典的形式将数据呈现给BigQuery。我如何使用apache beam来转换数据以执行此操作?如何将csv转换为apache beam数据流中的字典
我的输入csv文件有两列,我想在BigQuery中创建一个后续的两列表。我知道如何在BigQuery中创建数据,这是直接的,我不知道如何将csv转换为字典。下面的代码是不正确的,但应该告诉我想要做什么。
# Standard imports
import apache_beam as beam
# Create a pipeline executing on a direct runner (local, non-cloud).
p = beam.Pipeline('DirectPipelineRunner')
# Create a PCollection with names and write it to a file.
(p
| 'read solar data' >> beam.Read(beam.io.TextFileSource('./sensor1_121116.csv'))
# How do you do this??
| 'convert to dictionary' >> beam.Map(lambda (k, v): {'luminosity': k, 'datetime': v})
| 'save' >> beam.Write(
beam.io.BigQuerySink(
output_table,
schema='month:INTEGER, tornado_count:INTEGER',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)))
p.run()
非常感谢巴勃罗,这个作品非常好!这里是一个代码片段,以防人们在寻找完整性(p |'read solar data'>> beam.Read(CsvFileSource('./ sensor1_121116.csv')) |'save'>> beam.Write(beam .io.TextFileSink('./ greetings_solar'))) – user1753640
我正在尝试将结果写入BigQuery,但没有运气,表格被创建但没有数据。你能告诉发生了什么事吗?这里是一个片段(p |'read solar data'>> beam.Read(CsvFileSource('./ sensor1_121116.csv')) |'save'>> beam.Write( beam.io.BigQuerySink( output_table , 模式= 'lumosity:INTEGER,时间:INTEGER', create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED, write_disposition = beam.io.BigQueryDisposition.WRITE_TRUNCATE))) – user1753640
@ user1753640:我有同样的问题,不得不在将数据存储到GBQ之前,使用匹配模式的字典。 – vdolez