-2

我需要为django queryset的每个条目生成PDF报告。将有介于3万到4万之间的条目。python - 异步处理HTTP请求

PDF是通过外部API生成的。由于当前是按需生成的,因此通过HTTP请求/响应同步处理。 这对于这个任务会有所不同,因为我认为我将使用django管理命令循环查询集并执行PDF生成。

这个任务应该遵循哪种方法?我想到了两种可能的解决方案,虽然是我从未使用过的技术:

1)Celery:向工作人员分配任务(具有不同有效负载的http请求),然后在完成后检索它。

2)request-futures:以非阻塞方式使用请求。

目标是同时使用API​​(例如,根据API可以处理多少个并发请求,同时发送10或100个http请求)。

任何人在这里处理类似的任务,并可以提供建议,如何继续这个? (:大部分代码的重复使用,而不是我自己写的,因为我把这个项目的所有权注。):

是第一次尝试,与multiprocessing提出以下

class Checker(object): 

    def __init__(self, *args, **kwargs): 
     # ... various setup 

    # other methods 
    # ..... 

    def run_single(self, uuid, verbose=False): 
     """ 
     run a single PDF generation and local download 
     """ 
     start = timer() 
     headers = self.headers 

     data, obj = self.get_review_data(uuid) 
     if verbose: 
      print("** Report: {} **".format(obj)) 
     response = requests.post(
      url=self.endpoint_url, 
      headers=headers, 
      data=json.dumps(data) 
     ) 
     if verbose: 
      print('POST - Response: {} \n {} \n {} secs'.format(
       response.status_code, 
       response.content, 
       response.elapsed.total_seconds()) 
      ) 
     run_url = self.check_progress(post_response=response, verbose=True) 
     if run_url: 
      self.get_file(run_url, obj, verbose=True) 
     print("*** Download {}in {} secs".format("(verbose) " if verbose else "", timer()-start)) 


    def run_all(self, uuids, verbose=True): 
     start = timer() 
     for obj_uuid in review_uuids: 
      self.run_single(obj_uuid, verbose=verbose) 
     print("\n\n### Downloaded {}{} reviews in {} secs".format(
      "(verbose) " if verbose else "", 
      len(uuids), 
      timer() - start) 
     ) 

    def run_all_multi(self, uuids, workers=4, verbose=True): 
     pool = Pool(processes=workers) 
     pool.map(self.run_single, uuids) 


    def check_progress(self, post_response, attempts_limit=10000, verbose=False): 
     """ 
     check the progress of PDF generation querying periodically the API endpoint 
     """ 
     if post_response.status_code != 200: 
      if verbose: print("POST response status code != 200 - exit") 
      return None 
     url = 'https://apidomain.com/{path}'.format(
      domain=self.domain, 
      path=post_response.json().get('links', {}).get('self', {}).get('href'), 
      headers = self.headers 
     ) 
     job_id = post_response.json().get('jobId', '') 
     status = 'Running' 
     attempt_counter = 0 
     start = timer() 
     if verbose: 
      print("GET - url: {}".format(url)) 
     while status == 'Running': 
      attempt_counter += 1 
      job_response = requests.get(
       url=url, 
       headers=self.headers, 
      ) 
      job_data = job_response.json() 
      status = job_data['status'] 
      message = job_data['message'] 
      progress = job_data['progress'] 
      if status == 'Error': 
       if verbose: 
        end = timer() 
        print(
         '{sc} - job_id: {job_id} - error_id: [{error_id}]: {message}'.format(
          sc=job_response.status_code, 
          job_id=job_id, 
          error_id=job_data['errorId'], 
          message=message 
         ), '{} secs'.format(end - start) 
        ) 
        print('Attempts: {} \n {}% progress'.format(attempt_counter, progress)) 
       return None 
      if status == 'Complete': 
       if verbose: 
        end = timer() 
        print('run_id: {run_id} - Complete - {secs} secs'.format(
         run_id=run_id, 
         secs=end - start) 
        ) 
        print('Attempts: {}'.format(attempt_counter)) 
        print('{url}/files/'.format(url=url)) 
       return '{url}/files/'.format(url=url) 
      if attempt_counter >= attempts_limit: 
       if verbose: 
        end = timer() 
        print('File failed to generate after {att_limit} retrieve attempts: ({progress}% progress)' \ 
          ' - {message}'.format(
           att_limit=attempts_limit, 
           progress=int(progress * 100), 
           message=message 
         ), '{} secs'.format(end-start)) 
       return None 
      if verbose: 
       print('{}% progress - attempts: {}'.format(progress, attempt_counter), end='\r') 
       sys.stdout.flush() 
      time.sleep(1) 
     if verbose: 
      end = timer() 
      print(status, 'message: {} - attempts: {} - {} secs'.format(message, attempt_counter, end - start)) 
     return None 

    def get_review_data(self, uuid, host=None, protocol=None): 
     review = get_object_or_404(MyModel, uuid) 
     internal_api_headers = { 
      'Authorization': 'Token {}'.format(
       review.employee.csod_profile.csod_user_token 
      ) 
     } 

     data = requests.get(
      url=a_local_url, 
      params={'format': 'json', 'indirect': 'true'}, 
      headers=internal_api_headers, 
     ).json() 
     return (data, review) 

    def get_file(self, runs_url, obj, verbose=False): 

     runs_files_response = requests.get(
      url=runs_url, 
      headers=self.headers, 
      stream=True, 
     ) 

     runs_files_data = runs_files_response.json() 


     file_path = runs_files_data['files'][0]['links']['file']['href'] # remote generated file URI 
     file_response_url = 'https://apidomain.com/{path}'.format(path=file_path) 
     file_response = requests.get(
      url=file_response_url, 
      headers=self.headers, 
      params={'userId': settings.CREDENTIALS['userId']}, 
      stream=True, 
     ) 
     if file_response.status_code != 200: 
      if verbose: 
       print('error in retrieving file for {r_id}\nurl: {url}\n'.format(
        r_id=obj.uuid, url=file_response_url) 
       ) 
     local_file_path = '{temp_dir}/{uuid}-{filename}-{employee}.pdf'.format(
      temp_dir=self.local_temp_dir, 
      uuid=obj.uuid, 
      employee=slugify(obj.employee.get_full_name()), 
      filename=slugify(obj.task.name) 
     ) 
     with open(local_file_path, 'wb') as f: 
      for block in file_response.iter_content(1024): 
       f.write(block) 
      if verbose: 
       print('\n --> {r} [{uuid}]'.format(r=review, uuid=obj.uuid)) 
       print('\n --> File downloaded: {path}'.format(path=local_file_path)) 

    @classmethod 
    def get_temp_directory(self): 
     """ 
     generate a local unique temporary directory 
     """ 
     return '{temp_dir}/'.format(
      temp_dir=mkdtemp(dir=TEMP_DIR_PREFIX), 
     ) 

if __name__ == "__main__": 
    uuids = #list or generator of objs uuids 
    checker = Checker() 
    checker.run_all_multi(uuids=uuids) 

不幸的是,跑checker.run_all_multi有以下效果

  • python shell freeze;
  • 不打印输出;
  • 没有生成文件;
  • 我不得不杀了命令行控制台,正常的键盘中断停止工作

,同时运行checker.run_all不正常工作(一一)。

任何有关为什么此代码不起作用的建议(而不是关于我可以用来代替多处理的)?

谢谢大家。

+0

您需要多久生成一次这些报告?这一代是手动还是自动触发的? –

+0

- 每年一次 - 手动 – Luke

+0

在那个频率我倾向于使用请求期货和避免需要设置rabbitmq等 – Anentropic

回答

1

随着你的频率,每年手动一次&。你不需要芹菜或请求期货。

创建像

def record_to_pdf(record): 
    # create pdf from record 

的方法然后创建与代码的管理命令(使用multiprocessing.Pool

from multiprocessing import Pool 
pool = Pool(processes=NUMBER_OF_CORES) 
pool.map(record_to_pdf, YOUR_QUERYSET) 

管理命令将不会被异步虽然。要使其异步,可以在后台运行它。另外,如果你的进程没有CPU绑定(比如,它只是调用一些API),那么@Anentropic建议你可以在创建池时尝试更多的进程。

+0

对于不是cpu-bound的任务,你也可以试验一些prosees> NUMBER_OF_CORES – Anentropic

+0

@Antropic你说得对,它的方法'record_to_pdf'只调用一些API,然后可以通过调用一些API来增加进程一个重要的数字(受网络速度和API速率限制)。 –

+0

试过。它不会向stdout输出任何东西,它不会将任何文件保存到目标目录,并冻结shell(我需要使用kill -9来杀死它)。 相同的代码没有多处理,按顺序处理每个项目。 我可以粘贴代码。有任何想法吗? – Luke