2015-12-08 56 views
2

我没有收到谷歌的工作,例如使用的BigQuery连接器,星火

https://cloud.google.com/hadoop/examples/bigquery-connector-spark-example PySpark

有在代码中犯了一些错误,我认为,这样的:

“#输出参数 'mapred.bq.project.id': '',

应该是: 'mapred.bq.output.project.id': '',

'#将数据写回到新的BigQuery表中。
'#BigQueryOutputFormat丢弃键,因此将键设置为无。
(word_counts
.MAP(拉姆达对:无,json.dumps(对))
.saveAsNewAPIHadoopDataset(CONF))

会给出错误消息。如果我将其更改为:
(word_counts
.MAP(拉姆达对:(无,json.dumps(对)))
.saveAsNewAPIHadoopDataset(CONF))

我得到的错误信息:
org.apache.hadoop.io.Text不能转换为com.google.gson.JsonObject

而且无论我尝试什么,我都无法完成这项工作。
有BigQuery中有我给它的后面有个“_hadoop_temporary_job_201512081419_0008”
“的conf”的名字创建了一个数据集和表与上月底“_attempt_201512081419_0008_r_000000_0”创建。但总是空的

任何人都可以帮助我吗?
谢谢

回答

2

我们正在努力更新the documentation,因为如您所述,在这种情况下文档不正确。对于那个很抱歉!虽然我们正在努力更新文档,但我希望尽快给您答复。

铸造问题

你提到最重要的问题是铸造的问题。不幸的是,PySpark无法使用BigQueryOutputFormat来创建Java GSON对象。解决方案(解决方法)是将输出数据保存到Google Cloud Storage(GCS)中,然后使用bq命令手动加载。

代码示例

这里是一个代码示例,其出口到GCS并加载数据至BigQuery。您也可以使用subprocess和Python以编程方式执行bq命令。

#!/usr/bin/python 
"""BigQuery I/O PySpark example.""" 
import json 
import pprint 
import pyspark 

sc = pyspark.SparkContext() 

# Use the Google Cloud Storage bucket for temporary BigQuery export data used 
# by the InputFormat. This assumes the Google Cloud Storage connector for 
# Hadoop is configured. 
bucket = sc._jsc.hadoopConfiguration().get('fs.gs.system.bucket') 
project = sc._jsc.hadoopConfiguration().get('fs.gs.project.id') 
input_directory ='gs://{}/hadoop/tmp/bigquery/pyspark_input'.format(bucket) 

conf = { 
    # Input Parameters 
    'mapred.bq.project.id': project, 
    'mapred.bq.gcs.bucket': bucket, 
    'mapred.bq.temp.gcs.path': input_directory, 
    'mapred.bq.input.project.id': 'publicdata', 
    'mapred.bq.input.dataset.id': 'samples', 
    'mapred.bq.input.table.id': 'shakespeare', 
} 

# Load data in from BigQuery. 
table_data = sc.newAPIHadoopRDD(
    'com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat', 
    'org.apache.hadoop.io.LongWritable', 
    'com.google.gson.JsonObject', 
    conf=conf) 

# Perform word count. 
word_counts = (
    table_data 
    .map(lambda (_, record): json.loads(record)) 
    .map(lambda x: (x['word'].lower(), int(x['word_count']))) 
    .reduceByKey(lambda x, y: x + y)) 

# Display 10 results. 
pprint.pprint(word_counts.take(10)) 

# Stage data formatted as newline delimited json in Google Cloud Storage. 
output_directory = 'gs://{}/hadoop/tmp/bigquery/pyspark_output'.format(bucket) 
partitions = range(word_counts.getNumPartitions()) 
output_files = [output_directory + '/part-{:05}'.format(i) for i in partitions] 

(word_counts 
.map(lambda (w, c): json.dumps({'word': w, 'word_count': c})) 
.saveAsTextFile(output_directory)) 

# Manually clean up the input_directory, otherwise there will be BigQuery export 
# files left over indefinitely. 
input_path = sc._jvm.org.apache.hadoop.fs.Path(input_directory) 
input_path.getFileSystem(sc._jsc.hadoopConfiguration()).delete(input_path, True) 

print """ 
########################################################################### 
# Finish uploading data to BigQuery using a client e.g. 
bq load --source_format NEWLINE_DELIMITED_JSON \ 
    --schema 'word:STRING,word_count:INTEGER' \ 
    wordcount_dataset.wordcount_table {files} 
# Clean up the output 
gsutil -m rm -r {output_directory} 
########################################################################### 
""".format(
    files=','.join(output_files), 
    output_directory=output_directory)