2016-04-28 35 views
2

我似乎无法将来自Socrata的任何数据上传到BigQuery中。我收到“加载操作中的BigQuery错误:无法与BigQuery服务器连接”。最初我得到了0错误的限制。现在,我已经将CSV文件限制为一个数据行,因此出现连接错误。以下是我的日志输出,代码和CSV。加载操作中的BigQuery错误:无法连接到BigQuery服务器

我的日志输出:

INFO:root:--request-start-- 
INFO:root:-headers-start- 
INFO:root:content-type: application/json 
INFO:root:accept-encoding: gzip, deflate 
INFO:root:accept: application/json 
INFO:root:user-agent: google-api-python-client/1.3.1 (gzip) 
INFO:root:-headers-end- 
INFO:root:-path-parameters-start- 
INFO:root:projectId: tokyo-unity-87516 
INFO:root:-path-parameters-end- 
INFO:root:body: {"configuration": {"load": {"projectionFields": [], "skipLeadingRows": 1, "destinationTable": {"projectId": "tokyo-unity-87516", "tableId": "data_seattle_gov_pu5n_trf42", "datasetId": "fromsocrata"}, "maxBadRecords": 0, "schema": {"fields": [{"type": "string", "name": "socrata__computed_region_2day_rhn5", "mode": "nullable"}, {"type": "string", "name": "socrata__computed_region_cyqu_gs94", "mode": "nullable"}, {"type": "string", "name": "socrata__computed_region_kuhn_3gp2", "mode": "nullable"}, {"type": "string", "name": "socrata__computed_region_q256_3sug", "mode": "nullable"}, {"type": "string", "name": "socrata_created_at", "mode": "nullable"}, {"type": "string", "name": "socrata_id", "mode": "nullable"}, {"type": "string", "name": "socrata_updated_at", "mode": "nullable"}, {"type": "string", "name": "socrata_version", "mode": "nullable"}, {"type": "string", "name": "at_scene_time", "mode": "nullable"}, {"type": "string", "name": "cad_cdw_id", "mode": "nullable"}, {"type": "string", "name": "cad_event_number", "mode": "nullable"}, {"type": "string", "name": "census_tract", "mode": "nullable"}, {"type": "string", "name": "district_sector", "mode": "nullable"}, {"type": "string", "name": "event_clearance_code", "mode": "nullable"}, {"type": "string", "name": "event_clearance_date", "mode": "nullable"}, {"type": "string", "name": "event_clearance_description", "mode": "nullable"}, {"type": "string", "name": "event_clearance_group", "mode": "nullable"}, {"type": "string", "name": "event_clearance_subgroup", "mode": "nullable"}, {"type": "string", "name": "general_offense_number", "mode": "nullable"}, {"type": "string", "name": "hundred_block_location", "mode": "nullable"}, {"type": "string", "name": "incident_location", "mode": "nullable"}, {"type": "string", "name": "incident_location_address", "mode": "nullable"}, {"type": "string", "name": "incident_location_city", "mode": "nullable"}, {"type": "string", "name": "incident_location_state", "mode": "nullable"}, {"type": "string", "name": "incident_location_zip", "mode": "nullable"}, {"type": "string", "name": "initial_type_description", "mode": "nullable"}, {"type": "string", "name": "initial_type_group", "mode": "nullable"}, {"type": "string", "name": "initial_type_subgroup", "mode": "nullable"}, {"type": "string", "name": "latitude", "mode": "nullable"}, {"type": "string", "name": "longitude", "mode": "nullable"}, {"type": "string", "name": "zone_beat", "mode": "nullable"}]}}}, "jobReference": {"projectId": "tokyo-unity-87516", "jobId": "bqjob_r1ad3a0e093a97474_000001545d58a62c_1"}} 
INFO:root:query: ?alt=json 
INFO:root:--request-end-- 
INFO:apiclient.discovery:URL being requested: POST https://www.googleapis.com/upload/bigquery/v2/projects/tokyo-unity-87516/jobs?uploadType=resumable&alt=json 
INFO:root:--response-start-- 
INFO:root:status: 400 
INFO:root:alternate-protocol: 443:quic 
INFO:root:content-length: 37 
INFO:root:server: UploadServer 
INFO:root:x-guploader-uploadid: AEnB2Uo0-JZcSyKgFHlFD6MkWm1XfD6n-nhcVk4TPxgqqdjWkQzNCXzNPuTxwxvDrffWyYlxX_oS8ZvaKJjc0YKJmK0yTTFQtQ 
INFO:root:date: Thu, 28 Apr 2016 14:49:31 GMT 
INFO:root:alt-svc: quic=":443"; ma=2592000; v="33,32,31,30,29,28,27,26,25" 
INFO:root:content-type: text/html; charset=UTF-8 
INFO:root:Failed to parse Content-Range header. 
INFO:root:--response-end-- 
BigQuery error in load operation: Could not connect with BigQuery server. 
Http response status: 400 
Http response content: 
Failed to parse Content-Range header. 

我的代码:

import rethinkdb as r 
import os 
r.connect("localhost", 28015).repl() 
try: 
    r.db_create('queryplayground').run() 
    r.db('queryplayground').table_create('socrata_datasets').run() 
    r.db('queryplayground').table_create('third_party_creds').run() 
    socrata_app_token = raw_input('Your Socrata app token\n') 
    r.db('queryplayground').table_create('third_party_creds').insert({"id": "socrata", "app_token": socrata_app_token}).run() 
    r.db('queryplayground').table('socrata_datasets').insert({"id": "data_seattle_gov_pu5n_trf4", "domain": "data.seattle.gov", "datasetid": "pu5n-trf4"}).run() 
except: 
    pass 
import requests 
r.db('queryplayground').table('socrata_datasets').replace({"id": "data_seattle_gov_pu5n_trf4", "domain": "data.seattle.gov", "datasetid": "pu5n-trf4"}).run() 
for dataset in r.db('queryplayground').table('socrata_datasets').run(): 
    app_token = r.db('queryplayground').table('third_party_creds').get('socrata').run()['app_token'] 
    if not 'socrata_created_at' in dataset: 
     local_filename = dataset['id']+'.csv' 
     # NOTE the stream=True parameter 
     url = 'https://%s/resource/%s.csv?$select=:*,*&$limit=1' % (dataset['domain'], dataset['datasetid']) 
     req = requests.get(url, stream=True) 
     with open(local_filename, 'wb') as f: 
      for chunk in req.iter_content(chunk_size=1024): 
       if chunk: # filter out keep-alive new chunks 
        f.write(chunk) 
     url = 'https://%s/resource/%s.json?' % (dataset['domain'], dataset['datasetid']) 
     url += '$order=:created_at DESC&$limit=1&$select=:created_at&$$app_token=' + app_token 
     print url 
     dataset['socrata_created_at'] = requests.get(url).json()[0][':created_at'] 
     r.db('queryplayground').table('socrata_datasets').update(dataset).run() 
     local_filename 
     newline = os.linesep # Defines the newline based on your OS. 

     source_fp = open(local_filename, 'r') 
     target_fp = open('2'+local_filename, 'w') 
     first_row = True 
     for row in source_fp: 
      if first_row: 
       row = row.replace(':', 'socrata_').replace('@', '_') 
       headers = row.strip().split(',') 
       first_row = False 
      target_fp.write(row) 
     schema = [] 
     for col in headers: 
      schema.append({"name": col.strip('"'), "type": "string", "mode": "nullable"}) 
     #schema = ','.join([col.strip('"')+':string:nullable' for col in headers]) 
     #print schema 
     import json 
     with open('schema.json', 'w') as f: 
      f.write(json.dumps(schema)) 
     import json 
     cmd = 'bq load --apilog=- --schema=schema.json --skip_leading_rows=1 fromsocrata.%s %s' % (dataset['id']+'2', '2'+local_filename) 
     print cmd 
     os.system(cmd) 

我的csv文件:

"socrata__computed_region_2day_rhn5","socrata__computed_region_cyqu_gs94","socrata__computed_region_kuhn_3gp2","socrata__computed_region_q256_3sug","socrata_created_at","socrata_id","socrata_updated_at","socrata_version","at_scene_time","cad_cdw_id","cad_event_number","census_tract","district_sector","event_clearance_code","event_clearance_date","event_clearance_description","event_clearance_group","event_clearance_subgroup","general_offense_number","hundred_block_location","incident_location","incident_location_address","incident_location_city","incident_location_state","incident_location_zip","initial_type_description","initial_type_group","initial_type_subgroup","latitude","longitude","zone_beat" 
,,"30","18081","2016-02-04T00:25:02.285Z","row-xkda.347u.ym5x","2016-04-04T19:09:32.730Z","rv-7pyt_iydi~nc25",,"1001512","12000437316","8100.1000","M","161","2012-12-23T13:59:00.000","TRESPASS","TRESPASS","TRESPASS","2012437316","1XX BLOCK OF PINE ST","POINT (-122.340367 47.610046)",,,,,,,,"47.610046276","-122.340367253","M3" 

回答

1

我终于想通了,问题是,我没有关闭在尝试使用BigQuery工具上传文件之前创建一个文件。