下面是一些示例代码展示了如何做几个被问及的事情。这使用多线程而不是多处理,并显示了一些使用队列,启动/停止工作线程以及用附加数据更新matplotlib图的例子。
(部分代码来自回答其他问题,包括this one和this one)
的代码示出了可能的实现异步工人中,向其中的数据可以用于随后的处理被发送。工作人员使用内部队列来缓冲数据,并且从队列中读取数据的内部线程(循环)进行一些处理并发送结果以进行显示。
还显示了一个异步绘图仪的实现。结果可以从多名工作人员发送给该绘图员。 (这也使用内部队列进行缓冲;这样做是为了允许主程序线程本身调用更新图的函数,这似乎是matplotlib的要求。)
NB这是为Python 2.7编写的在OSX上。希望其中的一些可能有用。
import time
import threading
import Queue
import math
import matplotlib.pyplot as plt
class AsynchronousPlotter:
"""
Updates a matplotlib data plot asynchronously.
Uses an internal queue to buffer results passed for plotting in x, y pairs.
NB the output_queued_results() function is intended be called periodically
from the main program thread, to update the plot with any waiting results.
"""
def output_queued_results(self):
"""
Plots any waiting results. Should be called from main program thread.
Items for display are x, y pairs
"""
while not self.queue.empty():
item = self.queue.get()
x, y = item
self.add_point(x, y)
self.queue.task_done()
def queue_result_for_output(self, x, y):
"""
Queues an x, y pair for display. Called from worker threads, so intended
to be thread safe.
"""
self.lock.acquire(True)
self.queue.put([x, y])
self.lock.release()
def redraw(self):
self.ax.relim()
self.ax.autoscale_view()
self.fig.canvas.draw()
plt.pause(0.001)
def add_point(self, x, y):
self.xdata.append(x)
self.ydata.append(y)
self.lines.set_xdata(self.xdata)
self.lines.set_ydata(self.ydata)
self.redraw()
def __init__(self):
self.xdata=[]
self.ydata=[]
self.fig = plt.figure()
self.ax = self.fig.add_subplot(111)
self.lines, = self.ax.plot(self.xdata, self.ydata, 'o')
self.ax.set_autoscalex_on(True)
self.ax.set_autoscaley_on(True)
plt.ion()
plt.show()
self.lock = threading.Lock()
self.queue = Queue.Queue()
class AsynchronousWorker:
"""
Processes data asynchronously.
Uses an internal queue and internal thread to handle data passed in.
Does some processing on the data in the internal thread, and then
sends result to an asynchronous plotter for display
"""
def queue_data_for_processing(self, raw_data):
"""
Queues data for processing by the internal thread.
"""
self.queue.put(raw_data)
def _worker_loop(self):
"""
The internal thread loop. Runs until the exit signal is set.
Processes the supplied raw data into something ready
for display.
"""
while True:
try:
# check for any data waiting in the queue
raw_data = self.queue.get(True, 1)
# process the raw data, and send for display
# in this trivial example, change circle radius -> area
x, y = raw_data
y = y**2 * math.pi
self.ap.queue_result_for_output(x, y)
self.queue.task_done()
except Queue.Empty:
pass
finally:
if self.esig.is_set():
return
def hang_up(self):
self.esig.set() # set the exit signal...
self.loop.join() # ... and wait for thread to exit
def __init__(self, ident, ap):
self.ident = ident
self.ap = ap
self.esig = threading.Event()
self.queue = Queue.Queue()
self.loop = threading.Thread(target=self._worker_loop)
self.loop.start()
if __name__ == "__main__":
ap = AsynchronousPlotter()
num_workers = 5 # use this many workers
# create some workers. Give each worker some ID and tell it
# where it can find the output plotter
workers = []
for worker_number in range (num_workers):
workers.append(AsynchronousWorker(worker_number, ap))
# supply some data to the workers
for worker_number in range (num_workers):
circle_number = worker_number
circle_radius = worker_number * 4
workers[worker_number].queue_data_for_processing([circle_number, circle_radius])
# wait for workers to finish then tell the plotter to plot the results
# in a longer-running example we would update the plot every few seconds
time.sleep(2)
ap.output_queued_results();
# Wait for user to hit return, and clean up workers
raw_input("Hit Return...")
for worker in workers:
worker.hang_up()
考虑使用['multiprocessing'](https://docs.python.org/3.6/library/multiprocessing.html#module-multiprocessing)模块,而不是线程。它创建了多个进程,而不是同一进程上的多个线程,这使得它对于CPU密集型任务(比如matplotlib所做的那种)更高效,因为它可以利用多个处理器。 – tavnab
谢谢,我会研究多处理,这似乎更好。我基本上使用线程,因为这是我听说的唯一模块。 – user169808
看看[使用工作人员池](https://docs.python.org/3.6/library/multiprocessing.html#using-a-pool-of-workers)示例,这应该是一个好地方学习如何使用自己的参数启动多个并行函数调用,并将其扩展到您的问题。如果您在工作中遇到任何问题,请更新您的答案,我会尽力帮助您。 – tavnab