我需要为django queryset的每个条目生成PDF报告。将有介于3万到4万之间的条目。python - 异步处理HTTP请求
PDF是通过外部API生成的。由于当前是按需生成的,因此通过HTTP请求/响应同步处理。 这对于这个任务会有所不同,因为我认为我将使用django管理命令循环查询集并执行PDF生成。
这个任务应该遵循哪种方法?我想到了两种可能的解决方案,虽然是我从未使用过的技术:
1)Celery:向工作人员分配任务(具有不同有效负载的http请求),然后在完成后检索它。
2)request-futures:以非阻塞方式使用请求。
目标是同时使用API(例如,根据API可以处理多少个并发请求,同时发送10或100个http请求)。
任何人在这里处理类似的任务,并可以提供建议,如何继续这个? (:大部分代码的重复使用,而不是我自己写的,因为我把这个项目的所有权注。):
是第一次尝试,与multiprocessing提出以下
class Checker(object):
def __init__(self, *args, **kwargs):
# ... various setup
# other methods
# .....
def run_single(self, uuid, verbose=False):
"""
run a single PDF generation and local download
"""
start = timer()
headers = self.headers
data, obj = self.get_review_data(uuid)
if verbose:
print("** Report: {} **".format(obj))
response = requests.post(
url=self.endpoint_url,
headers=headers,
data=json.dumps(data)
)
if verbose:
print('POST - Response: {} \n {} \n {} secs'.format(
response.status_code,
response.content,
response.elapsed.total_seconds())
)
run_url = self.check_progress(post_response=response, verbose=True)
if run_url:
self.get_file(run_url, obj, verbose=True)
print("*** Download {}in {} secs".format("(verbose) " if verbose else "", timer()-start))
def run_all(self, uuids, verbose=True):
start = timer()
for obj_uuid in review_uuids:
self.run_single(obj_uuid, verbose=verbose)
print("\n\n### Downloaded {}{} reviews in {} secs".format(
"(verbose) " if verbose else "",
len(uuids),
timer() - start)
)
def run_all_multi(self, uuids, workers=4, verbose=True):
pool = Pool(processes=workers)
pool.map(self.run_single, uuids)
def check_progress(self, post_response, attempts_limit=10000, verbose=False):
"""
check the progress of PDF generation querying periodically the API endpoint
"""
if post_response.status_code != 200:
if verbose: print("POST response status code != 200 - exit")
return None
url = 'https://apidomain.com/{path}'.format(
domain=self.domain,
path=post_response.json().get('links', {}).get('self', {}).get('href'),
headers = self.headers
)
job_id = post_response.json().get('jobId', '')
status = 'Running'
attempt_counter = 0
start = timer()
if verbose:
print("GET - url: {}".format(url))
while status == 'Running':
attempt_counter += 1
job_response = requests.get(
url=url,
headers=self.headers,
)
job_data = job_response.json()
status = job_data['status']
message = job_data['message']
progress = job_data['progress']
if status == 'Error':
if verbose:
end = timer()
print(
'{sc} - job_id: {job_id} - error_id: [{error_id}]: {message}'.format(
sc=job_response.status_code,
job_id=job_id,
error_id=job_data['errorId'],
message=message
), '{} secs'.format(end - start)
)
print('Attempts: {} \n {}% progress'.format(attempt_counter, progress))
return None
if status == 'Complete':
if verbose:
end = timer()
print('run_id: {run_id} - Complete - {secs} secs'.format(
run_id=run_id,
secs=end - start)
)
print('Attempts: {}'.format(attempt_counter))
print('{url}/files/'.format(url=url))
return '{url}/files/'.format(url=url)
if attempt_counter >= attempts_limit:
if verbose:
end = timer()
print('File failed to generate after {att_limit} retrieve attempts: ({progress}% progress)' \
' - {message}'.format(
att_limit=attempts_limit,
progress=int(progress * 100),
message=message
), '{} secs'.format(end-start))
return None
if verbose:
print('{}% progress - attempts: {}'.format(progress, attempt_counter), end='\r')
sys.stdout.flush()
time.sleep(1)
if verbose:
end = timer()
print(status, 'message: {} - attempts: {} - {} secs'.format(message, attempt_counter, end - start))
return None
def get_review_data(self, uuid, host=None, protocol=None):
review = get_object_or_404(MyModel, uuid)
internal_api_headers = {
'Authorization': 'Token {}'.format(
review.employee.csod_profile.csod_user_token
)
}
data = requests.get(
url=a_local_url,
params={'format': 'json', 'indirect': 'true'},
headers=internal_api_headers,
).json()
return (data, review)
def get_file(self, runs_url, obj, verbose=False):
runs_files_response = requests.get(
url=runs_url,
headers=self.headers,
stream=True,
)
runs_files_data = runs_files_response.json()
file_path = runs_files_data['files'][0]['links']['file']['href'] # remote generated file URI
file_response_url = 'https://apidomain.com/{path}'.format(path=file_path)
file_response = requests.get(
url=file_response_url,
headers=self.headers,
params={'userId': settings.CREDENTIALS['userId']},
stream=True,
)
if file_response.status_code != 200:
if verbose:
print('error in retrieving file for {r_id}\nurl: {url}\n'.format(
r_id=obj.uuid, url=file_response_url)
)
local_file_path = '{temp_dir}/{uuid}-{filename}-{employee}.pdf'.format(
temp_dir=self.local_temp_dir,
uuid=obj.uuid,
employee=slugify(obj.employee.get_full_name()),
filename=slugify(obj.task.name)
)
with open(local_file_path, 'wb') as f:
for block in file_response.iter_content(1024):
f.write(block)
if verbose:
print('\n --> {r} [{uuid}]'.format(r=review, uuid=obj.uuid))
print('\n --> File downloaded: {path}'.format(path=local_file_path))
@classmethod
def get_temp_directory(self):
"""
generate a local unique temporary directory
"""
return '{temp_dir}/'.format(
temp_dir=mkdtemp(dir=TEMP_DIR_PREFIX),
)
if __name__ == "__main__":
uuids = #list or generator of objs uuids
checker = Checker()
checker.run_all_multi(uuids=uuids)
不幸的是,跑checker.run_all_multi
有以下效果
- python shell freeze;
- 不打印输出;
- 没有生成文件;
- 我不得不杀了命令行控制台,正常的键盘中断停止工作
,同时运行checker.run_all
不正常工作(一一)。
任何有关为什么此代码不起作用的建议(而不是关于我可以用来代替多处理的)?
谢谢大家。
您需要多久生成一次这些报告?这一代是手动还是自动触发的? –
- 每年一次 - 手动 – Luke
在那个频率我倾向于使用请求期货和避免需要设置rabbitmq等 – Anentropic