如果您使用的是python3.5或更新版本,则可以使用async/await语句。
import asyncio
import multiprocessing
import random
import time
from concurrent.futures import ProcessPoolExecutor
count_received = 0
tests_count = 16
async def AWS_KMS_call(i, text, action='encrypt'):
print(i, multiprocessing.current_process().name, action)
await asyncio.sleep(random.random()) # emulate some work
return text[::-1]
async def encrypt(i, text):
encrypted = await AWS_KMS_call(i, text, action='encrypt')
print(i, multiprocessing.current_process().name, encrypted, 'encrypted')
return encrypted
async def decrypt(i, text):
encrypted = await encrypt(i, text)
decrypted = await AWS_KMS_call(i, encrypted, action='decrypt')
print(i, multiprocessing.current_process().name, encrypted, 'decrypted')
return decrypted
def pool_func(i, text):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
decrypted = loop.run_until_complete(decrypt(i, text))
print(i, multiprocessing.current_process().name, decrypted, 'pool_func exit')
return i, decrypted
def got_result(future):
global count_received
global tests_count
count_received += 1
i, text = future.result()
print(i, multiprocessing.current_process().name, text, 'done')
if count_received == tests_count:
loop.stop()
if __name__ == '__main__':
text = 'some_data_to_encrypt'
executor = ProcessPoolExecutor(16)
loop = asyncio.get_event_loop()
for i in range(tests_count):
task = asyncio.ensure_future(loop.run_in_executor(executor, pool_func, i, text))
task.add_done_callback(got_result)
try:
loop.run_forever()
finally:
loop.close()
我添加了一些对打印清晰度和输出看起来像:
0 Process-1 encrypt
1 Process-3 encrypt
2 Process-5 encrypt
4 Process-2 encrypt
5 Process-4 encrypt
6 Process-6 encrypt
7 Process-9 encrypt
8 Process-7 encrypt
9 Process-10 encrypt
10 Process-8 encrypt
3 Process-15 encrypt
11 Process-11 encrypt
12 Process-12 encrypt
13 Process-14 encrypt
15 Process-13 encrypt
14 Process-16 encrypt
1 Process-3 tpyrcne_ot_atad_emos encrypted
1 Process-3 decrypt
6 Process-6 tpyrcne_ot_atad_emos encrypted
6 Process-6 decrypt
15 Process-13 tpyrcne_ot_atad_emos encrypted
15 Process-13 decrypt
5 Process-4 tpyrcne_ot_atad_emos encrypted
5 Process-4 decrypt
3 Process-15 tpyrcne_ot_atad_emos encrypted
3 Process-15 decrypt
9 Process-10 tpyrcne_ot_atad_emos encrypted
9 Process-10 decrypt
1 Process-3 tpyrcne_ot_atad_emos decrypted
1 Process-3 some_data_to_encrypt pool_func exit
1 MainProcess some_data_to_encrypt done
4 Process-2 tpyrcne_ot_atad_emos encrypted
4 Process-2 decrypt
11 Process-11 tpyrcne_ot_atad_emos encrypted
11 Process-11 decrypt
10 Process-8 tpyrcne_ot_atad_emos encrypted
10 Process-8 decrypt
10 Process-8 tpyrcne_ot_atad_emos decrypted
10 Process-8 some_data_to_encrypt pool_func exit
10 MainProcess some_data_to_encrypt done
6 Process-6 tpyrcne_ot_atad_emos decrypted
6 Process-6 some_data_to_encrypt pool_func exit
6 MainProcess some_data_to_encrypt done
8 Process-7 tpyrcne_ot_atad_emos encrypted
8 Process-7 decrypt
12 Process-12 tpyrcne_ot_atad_emos encrypted
12 Process-12 decrypt
2 Process-5 tpyrcne_ot_atad_emos encrypted
2 Process-5 decrypt
15 Process-13 tpyrcne_ot_atad_emos decrypted
15 Process-13 some_data_to_encrypt pool_func exit
15 MainProcess some_data_to_encrypt done
11 Process-11 tpyrcne_ot_atad_emos decrypted
11 Process-11 some_data_to_encrypt pool_func exit
11 MainProcess some_data_to_encrypt done
13 Process-14 tpyrcne_ot_atad_emos encrypted
13 Process-14 decrypt
4 Process-2 tpyrcne_ot_atad_emos decrypted
4 Process-2 some_data_to_encrypt pool_func exit
4 MainProcess some_data_to_encrypt done
7 Process-9 tpyrcne_ot_atad_emos encrypted
7 Process-9 decrypt
0 Process-1 tpyrcne_ot_atad_emos encrypted
0 Process-1 decrypt
14 Process-16 tpyrcne_ot_atad_emos encrypted
14 Process-16 decrypt
8 Process-7 tpyrcne_ot_atad_emos decrypted
8 Process-7 some_data_to_encrypt pool_func exit
8 MainProcess some_data_to_encrypt done
5 Process-4 tpyrcne_ot_atad_emos decrypted
5 Process-4 some_data_to_encrypt pool_func exit
5 MainProcess some_data_to_encrypt done
3 Process-15 tpyrcne_ot_atad_emos decrypted
3 Process-15 some_data_to_encrypt pool_func exit
3 MainProcess some_data_to_encrypt done
9 Process-10 tpyrcne_ot_atad_emos decrypted
9 Process-10 some_data_to_encrypt pool_func exit
9 MainProcess some_data_to_encrypt done
13 Process-14 tpyrcne_ot_atad_emos decrypted
13 Process-14 some_data_to_encrypt pool_func exit
13 MainProcess some_data_to_encrypt done
2 Process-5 tpyrcne_ot_atad_emos decrypted
2 Process-5 some_data_to_encrypt pool_func exit
2 MainProcess some_data_to_encrypt done
0 Process-1 tpyrcne_ot_atad_emos decrypted
0 Process-1 some_data_to_encrypt pool_func exit
0 MainProcess some_data_to_encrypt done
12 Process-12 tpyrcne_ot_atad_emos decrypted
12 Process-12 some_data_to_encrypt pool_func exit
12 MainProcess some_data_to_encrypt done
14 Process-16 tpyrcne_ot_atad_emos decrypted
14 Process-16 some_data_to_encrypt pool_func exit
14 MainProcess some_data_to_encrypt done
7 Process-9 tpyrcne_ot_atad_emos decrypted
7 Process-9 some_data_to_encrypt pool_func exit
7 MainProcess some_data_to_encrypt done
另外,我建议你寻找到celery库。也许它会更适合你的要求
我认为你需要更好地理解'并发'和'并行'。并行意味着一切都在同一时间在不同的操作系统级别的线程或进程中发生。并发意味着,只要有东西阻塞,另一个任务就会异步运行。在python中,你不会访问操作系统级别的线程,因为GIL会阻止你这样做。因为你的代码将被CPU绑定(加密/解密需要计算),所以你应该采取并行的方法。如果你的代码是IO绑定的(网络IO,磁盘IO等),最好使用asyncio。 –
作为你给出的答案之一,你应该在不同的过程中完成这个任务。 –
我认为你大部分都是正确的,但你有点误会 - 这个过程实际上是网络IO绑定,我相信 - 我的过程实际上并没有执行加密,它是一个将加密/解密过程发送到AWS的模块KMS。这意味着实际的加密和解密发生在我的机器上,而我只是在等待响应。 –