0
我有一个包含数十万行的文件,其中每行需要经历相同的过程(计算协方差)。我要多线程,因为它需要很长的时间。但是,我所看到的所有示例/教程对于我想要做的事情都相当复杂。如果有人能指出我一个很好的教程,解释如何一起使用这两个模块,那将是很棒的。使用线程和队列模块的Python中的多线程
我有一个包含数十万行的文件,其中每行需要经历相同的过程(计算协方差)。我要多线程,因为它需要很长的时间。但是,我所看到的所有示例/教程对于我想要做的事情都相当复杂。如果有人能指出我一个很好的教程,解释如何一起使用这两个模块,那将是很棒的。使用线程和队列模块的Python中的多线程
每当我有并行处理的东西,我用类似这样的东西(我只是撕开了这一点,现有的脚本):
#!/usr/bin/env python2
# This Python file uses the following encoding: utf-8
import os, sys, time
from multiprocessing import Queue, Manager, Process, Value, Event, cpu_count
class ThreadedProcessor(object):
def __init__(self, parser, input_file, output_file, threads=cpu_count()):
self.parser = parser
self.num_processes = threads
self.input_file = input_file
self.output_file = output_file
self.shared_proxy = Manager()
self.input_queue = Queue()
self.output_queue = Queue()
self.input_process = Process(target=self.parse_input)
self.output_process = Process(target=self.write_output)
self.processes = [Process(target=self.process_row) for i in range(self.num_processes)]
self.input_process.start()
self.output_process.start()
for process in self.processes:
process.start()
self.input_process.join()
for process in self.processes:
process.join()
self.output_process.join()
def parse_input(self):
for index, row in enumerate(self.input_file):
self.input_queue.put([index, row])
for i in range(self.num_processes):
self.input_queue.put('STOP')
def process_row(self):
for index, row in iter(self.input_queue.get, 'STOP'):
self.output_queue.put([index, row[0], self.parser.parse(row[1])])
self.output_queue.put('STOP')
def write_output(self):
current = 0
buffer = {}
for works in range(self.num_processes):
for index, id, row in iter(self.output_queue.get, 'STOP'):
if index != current:
buffer[index] = [id] + row
else:
self.output_file.writerow([id] + row)
current += 1
while current in buffer:
self.output_file.writerow(buffer[current])
del buffer[current]
current += 1
基本上,你有两个过程管理的读/写文件。一个读取并解析输入,另一个读取“完成”队列并写入输出文件。其他进程产生(在这种情况下,数量等于CPU的总处理器内核数),它们都处理来自输入队列的元素。