2017-09-20 106 views
0

我的用例相当简单,至少我觉得它是来自Node的......但我无法完全理解这个问题。我在这个问题的编码方面需要的帮助更少,更多的只是一个解释:我想同时运行一个异步进程(来自aws-encryption-sdk的解密和加密函数)多次(尽可能多次)。我想我的主要问题是,做这件事的最好方法是什么?Python中的异步性和线程池

后续问题......在我看来,做这件事的最好方法是处理解密和加密的异步函数(asyncio格式),由线程池处理而不是单个处理。有时候,我需要尽可能快地运行这个函数2000次(不一定完全平行,显然)。但是,即使这是正确的,我也不确定如何做到这一点。

非常感谢任何能够帮助我的人。为了给你一个关于我在哪个曲目的想法,我非常考虑使用这个线索的答案:https://stackoverflow.com/a/29280606/5335646。但是,有些消息人士表示,这更多地用于阻止函数,而不是真正用于真正异步的,就像我的情况(稍后返回的AWS KMS调用)。

+0

我认为你需要更好地理解'并发'和'并行'。并行意味着一切都在同一时间在不同的操作系统级别的线程或进程中发生。并发意味着,只要有东西阻塞,另一个任务就会异步运行。在python中,你不会访问操作系统级别的线程,因为GIL会阻止你这样做。因为你的代码将被CPU绑定(加密/解密需要计算),所以你应该采取并行的方法。如果你的代码是IO绑定的(网络IO,磁盘IO等),最好使用asyncio。 –

+0

作为你给出的答案之一,你应该在不同的过程中完成这个任务。 –

+0

我认为你大部分都是正确的,但你有点误会 - 这个过程实际上是网络IO绑定,我相信 - 我的过程实际上并没有执行加密,它是一个将加密/解密过程发送到AWS的模块KMS。这意味着实际的加密和解密发生在我的机器上,而我只是在等待响应。 –

回答

0

如果您使用的是python3.5或更新版本,则可以使用async/await语句。

import asyncio 
import multiprocessing 
import random 
import time 
from concurrent.futures import ProcessPoolExecutor 

count_received = 0 
tests_count = 16 


async def AWS_KMS_call(i, text, action='encrypt'): 
    print(i, multiprocessing.current_process().name, action) 
    await asyncio.sleep(random.random()) # emulate some work 
    return text[::-1] 


async def encrypt(i, text): 
    encrypted = await AWS_KMS_call(i, text, action='encrypt') 
    print(i, multiprocessing.current_process().name, encrypted, 'encrypted') 
    return encrypted 


async def decrypt(i, text): 
    encrypted = await encrypt(i, text) 
    decrypted = await AWS_KMS_call(i, encrypted, action='decrypt') 
    print(i, multiprocessing.current_process().name, encrypted, 'decrypted') 
    return decrypted 


def pool_func(i, text): 
    loop = asyncio.new_event_loop() 
    asyncio.set_event_loop(loop) 
    decrypted = loop.run_until_complete(decrypt(i, text)) 
    print(i, multiprocessing.current_process().name, decrypted, 'pool_func exit') 
    return i, decrypted 


def got_result(future): 
    global count_received 
    global tests_count 
    count_received += 1 
    i, text = future.result() 
    print(i, multiprocessing.current_process().name, text, 'done') 

    if count_received == tests_count: 
     loop.stop() 


if __name__ == '__main__': 
    text = 'some_data_to_encrypt' 
    executor = ProcessPoolExecutor(16) 

    loop = asyncio.get_event_loop() 

    for i in range(tests_count): 
     task = asyncio.ensure_future(loop.run_in_executor(executor, pool_func, i, text)) 
     task.add_done_callback(got_result) 

    try: 
     loop.run_forever() 
    finally: 
     loop.close() 

我添加了一些对打印清晰度和输出看起来像:

0 Process-1 encrypt 
1 Process-3 encrypt 
2 Process-5 encrypt 
4 Process-2 encrypt 
5 Process-4 encrypt 
6 Process-6 encrypt 
7 Process-9 encrypt 
8 Process-7 encrypt 
9 Process-10 encrypt 
10 Process-8 encrypt 
3 Process-15 encrypt 
11 Process-11 encrypt 
12 Process-12 encrypt 
13 Process-14 encrypt 
15 Process-13 encrypt 
14 Process-16 encrypt 
1 Process-3 tpyrcne_ot_atad_emos encrypted 
1 Process-3 decrypt 
6 Process-6 tpyrcne_ot_atad_emos encrypted 
6 Process-6 decrypt 
15 Process-13 tpyrcne_ot_atad_emos encrypted 
15 Process-13 decrypt 
5 Process-4 tpyrcne_ot_atad_emos encrypted 
5 Process-4 decrypt 
3 Process-15 tpyrcne_ot_atad_emos encrypted 
3 Process-15 decrypt 
9 Process-10 tpyrcne_ot_atad_emos encrypted 
9 Process-10 decrypt 
1 Process-3 tpyrcne_ot_atad_emos decrypted 
1 Process-3 some_data_to_encrypt pool_func exit 
1 MainProcess some_data_to_encrypt done 
4 Process-2 tpyrcne_ot_atad_emos encrypted 
4 Process-2 decrypt 
11 Process-11 tpyrcne_ot_atad_emos encrypted 
11 Process-11 decrypt 
10 Process-8 tpyrcne_ot_atad_emos encrypted 
10 Process-8 decrypt 
10 Process-8 tpyrcne_ot_atad_emos decrypted 
10 Process-8 some_data_to_encrypt pool_func exit 
10 MainProcess some_data_to_encrypt done 
6 Process-6 tpyrcne_ot_atad_emos decrypted 
6 Process-6 some_data_to_encrypt pool_func exit 
6 MainProcess some_data_to_encrypt done 
8 Process-7 tpyrcne_ot_atad_emos encrypted 
8 Process-7 decrypt 
12 Process-12 tpyrcne_ot_atad_emos encrypted 
12 Process-12 decrypt 
2 Process-5 tpyrcne_ot_atad_emos encrypted 
2 Process-5 decrypt 
15 Process-13 tpyrcne_ot_atad_emos decrypted 
15 Process-13 some_data_to_encrypt pool_func exit 
15 MainProcess some_data_to_encrypt done 
11 Process-11 tpyrcne_ot_atad_emos decrypted 
11 Process-11 some_data_to_encrypt pool_func exit 
11 MainProcess some_data_to_encrypt done 
13 Process-14 tpyrcne_ot_atad_emos encrypted 
13 Process-14 decrypt 
4 Process-2 tpyrcne_ot_atad_emos decrypted 
4 Process-2 some_data_to_encrypt pool_func exit 
4 MainProcess some_data_to_encrypt done 
7 Process-9 tpyrcne_ot_atad_emos encrypted 
7 Process-9 decrypt 
0 Process-1 tpyrcne_ot_atad_emos encrypted 
0 Process-1 decrypt 
14 Process-16 tpyrcne_ot_atad_emos encrypted 
14 Process-16 decrypt 
8 Process-7 tpyrcne_ot_atad_emos decrypted 
8 Process-7 some_data_to_encrypt pool_func exit 
8 MainProcess some_data_to_encrypt done 
5 Process-4 tpyrcne_ot_atad_emos decrypted 
5 Process-4 some_data_to_encrypt pool_func exit 
5 MainProcess some_data_to_encrypt done 
3 Process-15 tpyrcne_ot_atad_emos decrypted 
3 Process-15 some_data_to_encrypt pool_func exit 
3 MainProcess some_data_to_encrypt done 
9 Process-10 tpyrcne_ot_atad_emos decrypted 
9 Process-10 some_data_to_encrypt pool_func exit 
9 MainProcess some_data_to_encrypt done 
13 Process-14 tpyrcne_ot_atad_emos decrypted 
13 Process-14 some_data_to_encrypt pool_func exit 
13 MainProcess some_data_to_encrypt done 
2 Process-5 tpyrcne_ot_atad_emos decrypted 
2 Process-5 some_data_to_encrypt pool_func exit 
2 MainProcess some_data_to_encrypt done 
0 Process-1 tpyrcne_ot_atad_emos decrypted 
0 Process-1 some_data_to_encrypt pool_func exit 
0 MainProcess some_data_to_encrypt done 
12 Process-12 tpyrcne_ot_atad_emos decrypted 
12 Process-12 some_data_to_encrypt pool_func exit 
12 MainProcess some_data_to_encrypt done 
14 Process-16 tpyrcne_ot_atad_emos decrypted 
14 Process-16 some_data_to_encrypt pool_func exit 
14 MainProcess some_data_to_encrypt done 
7 Process-9 tpyrcne_ot_atad_emos decrypted 
7 Process-9 some_data_to_encrypt pool_func exit 
7 MainProcess some_data_to_encrypt done 

另外,我建议你寻找到celery库。也许它会更适合你的要求

+0

如果可以,请在主要问题中查看我最近的评论,看看您的答案是否仍然相关。 –

+0

另外,请记住(我应该提到这一点),这个函数是在API调用中发生的 - 我担心新流程的建立会使这种方法显着减慢,考虑到这一点。但我绝对可能是错的。 –

+0

@RyanSpicer我更新了我的答案,抱歉花了这么长时间。至于产卵新过程 - 嗯,我认为你应该尝试 –