2017-10-13 48 views
2

是否有可能在不运行谷歌应用程序引擎webservice的情况下运行处理用户数据的脚本?没有webservice的大查询cron作业

对于较小的脚本它工作得很好,但是当我的脚本持续约40分钟我收到提示:DeadlineExceededError

我临时的解决办法是使用Windows在Windows VM调度器和命令行用python脚本

编辑:添加代码

jobs = [] 
jobs_status = [] 
jobs_error = [] 
# The project id whose datasets you'd like to list 
PROJECT_NUMBER = 'project' 
scope = ('https://www.googleapis.com/auth/bigquery', 
     'https://www.googleapis.com/auth/cloud-platform', 
     'https://www.googleapis.com/auth/drive', 
     'https://spreadsheets.google.com/feeds') 

credentials = ServiceAccountCredentials.from_json_keyfile_name('client_secrets.json', scope) 

# Create the bigquery api client 
service = googleapiclient.discovery.build('bigquery', 'v2', credentials=credentials) 

def load_logs(source): 
    body = {"rows": [ 
     {"json": source} 
    ]} 

    response = service.tabledata().insertAll(
     projectId=PROJECT_NUMBER, 
     datasetId='test', 
     tableId='test_log', 
     body=body).execute() 
    return response 

def job_status(): 
    for job in jobs: 
     _jobId = job['jobReference']['jobId'] 
     status = service.jobs().get(projectId=PROJECT_NUMBER, jobId=_jobId).execute() 
     jobs_status.append(status['status']['state']) 
     if 'errors' in status['status'].keys(): 
      query = str(status['configuration']['query']['query']) 
      message = str(status['status']['errorResult']['message']) 
      jobs_error.append({"query": query, "message": message}) 
    return jobs_status 


def check_statues(): 
    while True: 
     if all('DONE' in job for job in job_status()): 
      return 


def insert(query, tableid, disposition): 
    job_body = { 
    "configuration": { 
     "query": { 
     "query": query, 
     "useLegacySql": True, 
     "destinationTable": { 
     "datasetId": "test", 
     "projectId": "project", 
     "tableId": tableid 
     }, 
     "writeDisposition": disposition 
     } 
    } 
    } 

    r = service.jobs().insert(
     projectId=PROJECT_NUMBER, 
     body=job_body).execute() 
    jobs.append(r) 
    return r 



class MainPage(webapp2.RequestHandler): 
    def get(self): 
     query = "SELECT * FROM [gdocs_users.user_empty]" 
     insert(query, 'users_data_p1', "WRITE_TRUNCATE") 
     check_statues() 
     query = "SELECT * FROM [gdocs_users.user_empty]" 
     insert(query, 'users_data_p2', "WRITE_TRUNCATE") 
     query = "SELECT * FROM [gdocs_users.user_%s]" 
     for i in range(1, 1000): 
      if i <= 600: 
       insert(query % str(i).zfill(4), 'users_data_p1', "WRITE_APPEND") 
      else: 
       insert(query % str(i).zfill(4), 'user_data_p2', "WRITE_APPEND") 
     for error in jobs_error: 
      load_logs(error) 


app = webapp2.WSGIApplication([ 
    ('/', MainPage), 
], debug=True) 

回答

2

默认情况下,App Engine服务使用automatic scaling,其中HTTP请求的限制时间为60秒,任务队列请求的限制时间为10分钟。如果您更改服务以使用基本或手动扩展,那么您的任务队列请求可以运行长达24小时。

听起来你可能只需要一个实例来完成这项工作,所以也许除了默认的服务之外,还可以创建第二个service。在子文件夹中创建一个bqservice文件夹使用基本缩放一个实例的最大值以下app.yaml设置:

# bqsservice/app.yaml 
# Possibly use a separate service for your BQ code than 
# the rest of your app: 
service: bqservice 
runtime: python27 
api_version: 1 
# Keep low memory/cost B1 class? 
instance_class: B1 
# Limit max services to 1 to keep costs down. There is an 
# 8 instance hour limit to the free tier. This option still 
# scales to 0 when not in use. 
basic_scaling: 
    max_instances: 1 

# Handlers: 
handlers: 
- url: /.* 
    script: main.app 

然后在相同的服务创建一个cron.yaml安排您的脚本运行。看上面我的示例配置干脆把话BigQuery的逻辑放到一个main.py文件,在其中定义的WSGI应用:

# bqservice/main.py 
import webapp2 

class CronHandler(webapp2.RequestHandler): 

    def post(self): 
     # Handle your cron work 
     # .... 

app = webapp2.WSGIApplication([ 
    #('/', MainPage), # If you needed other handlers 
    ('/mycron', CronHandler), 
], debug=True) 

你可以工作,所有到默认的服务这一点,如果你不使用计划App Engine应用程序的其他任何东西。如果您除了默认服务之外还需要这样做,则需要先将其部署到默认服务,即使它只是包含静态文件的简单app.yaml

+0

谢谢BrettJ!此解决方案可以工作 - 但请注意,App引擎在等待BigQuery返回时没有做任何工作。一个更好的方法是在工作排队后立即返回,然后再回来查看工作是否完成。 –

+0

@BrettJ谢谢你的帮助。我想补充一点如果有人会尝试,你将不得不在'cron.yaml'中添加'target:your_service'来决定你想要使用的服务 – Mat

0

大多数BigQuery操作可以异步运行。你能告诉我们你的代码吗?

例如,从Python的BigQuery文档:

def query(query): 
    client = bigquery.Client() 
    query_job = client.run_async_query(str(uuid.uuid4()), query) 

    query_job.begin() 
    query_job.result() # Wait for job to complete 

这是一个异步工作,并且代码是选择等待查询完成。而不是等待,在begin()之后得到工作id。您可以使用Task Queue将任务排入队列,以检查该作业的结果。

+0

我不得不创建服务,因为我有联合表(来自gsuit),它们不能与python API一起工作,正如你看到的,我通过插入作业异步运行它们。 – Mat

+0

是的 - 看看代码你不应该check_statues ()'。然后该函数将在插入作业后立即返回。然后再回来检查该作业ID的结果。 –

+0

但我在下面有一些查询,我在插入完成后运行它们。正如我在文档中读到的错误是在60s请求超时之后引发的,所以问题是应用程序引擎对于这类脚本来说是个好地方,还是对于小型后端来说呢? – Mat