0

我不知道如何通过多处理工作来计算坏像素我到目前为止没有多处理并分析需要分析的10张图片大约需要7分钟...通过在Python中使用多处理来计算死像素

import random 
import time 

from multiprocessing import Process, Queue, current_process, freeze_support 
from PIL import Image, ImageDraw 

image1 = Image.open('MA_HA1_drawing_0.png') 
image2 = Image.open('MA_HA1_drawing_1.png') 
image2 = Image.open('MA_HA1_drawing_2.png') 
image3 = Image.open('MA_HA1_drawing_3.png') 
image4 = Image.open('MA_HA1_drawing_4.png') 
image5 = Image.open('MA_HA1_drawing_5.png') 
image6 = Image.open('MA_HA1_drawing_6.png') 
image7 = Image.open('MA_HA1_drawing_7.png') 
image8 = Image.open('MA_HA1_drawing_8.png') 
image9 = Image.open('MA_HA1_drawing_9.png') 

def analyze_picture(image): 
    time.sleep(0.5*random.random()) 
    counter = 0 
    for x in range(616,6446): 
     for y in range(756,3712): 
      r,g,b = image.getpixel((x,y)) 

      if r != 1 and g != 1 and b != 1: 
       counter += 1 
    return counter 

def test(): 
    NUMBER_OF_PROCESSES = 4 
    TASKS1 = [(analyze_picture(image1))] 
    TASKS2 = [(analyze_picture(image2))] 
    TASKS3 = [(analyze_picture(image2))] 
    TASKS4 = [(analyze_picture(image3))] 
    TASKS5 = [(analyze_picture(image4))] 
    TASKS6 = [(analyze_picture(image5))] 
    TASKS7 = [(analyze_picture(image6))] 
    TASKS8 = [(analyze_picture(image7))] 
    TASKS9 = [(analyze_picture(image8))] 
    TASKS10 = [(analyze_picture(image9))] 

    print TASKS1 

if __name__ == '__main__': 
    freeze_support() 
    test() 

他们给了我们一些功能来理解多处理并将其用于我们的任务,但我不理解他们,也不知道如何使用它们。

def worker(input, output): 
    for func, args in iter(input.get, 'STOP'): 
     result = calculate(func, args) 
     output.put(result) 

def calculate(func, args): 
    result = func(*args) 
    return '%s says that %s%s = %s' % \ 
     (current_process().name, func.__name__, args, result) 

def mul(a, b): 
    time.sleep(0.5*random.random()) 
    return a * b 

def plus(a, b): 
    time.sleep(0.5*random.random()) 
    return a + b 

# Create queues 
    task_queue = Queue() 
    done_queue = Queue() 

    # Submit tasks 
    for task in TASKS1: 
     task_queue.put(task) 

    # Start worker processes 
    for i in range(NUMBER_OF_PROCESSES): 
     Process(target=worker, args=(task_queue, done_queue)).start() 
     print i 

    # Get and print results 
    print 'Unordered results:' 
    for i in range(len(TASKS1)): 
     print '\t', done_queue.get() 

    # Add more tasks using `put()` 
    for task in TASKS2: 
     task_queue.put(task) 

    # Get and print some more results 
    for i in range(len(TASKS2)): 
     print '\t', done_queue.get() 

    # Tell child processes to stop 
    for i in range(NUMBER_OF_PROCESSES): 
     task_queue.put('STOP') 
     print 'process ', i, ' is stopped' 

编辑:新代码

import random 
import time 

from multiprocessing import Process, Queue, current_process, freeze_support 
from PIL import Image, ImageDraw 


def worker(input, output): 
    for func, args in iter(input.get, 'STOP'): 
     result = calculate(func, args) 
     output.put(result) 

def calculate(func, args): 
    result = func(args) 
    return '%s says that %s%s has %s dead pixels\n' % \ 
     (current_process().name, func.__name__, args, result) 

def analyze_picture(image_name): 
    t1 = time.clock() 
    image = Image.open(image_name) 
    time.sleep(0.5*random.random()) 
    counter = 0 
    for x in range(616,6446): 
     for y in range(756,3712): 
      r,g,b = image.getpixel((x,y)) 

      if r != 1 and g != 1 and b != 1: 
       counter += 1 

    t2 = time.clock() 
    dt = t2 - t1 
    print '\tThe process takes ',dt,' seconds.\n Result:\n' 
    return counter 

def test(): 


    NUMBER_OF_PROCESSES = 4 

    TASKS1 = [(analyze_picture, image_names[i]) for i in range(10)] 

    print TASKS1 

    # Create queues 
    task_queue = Queue() 
    done_queue = Queue() 

    # Submit tasks 
    for task in TASKS1: 
     task_queue.put(task) 

    # Start worker processes 
    for i in range(NUMBER_OF_PROCESSES): 
     Process(target=worker, args=(task_queue, done_queue)).start() 
     print i 

    # Get and print results 
    print 'Unordered results:' 
    for i in range(len(TASKS1)): 
     print '\t', done_queue.get() 

    # Tell child processes to stop 
    for i in range(NUMBER_OF_PROCESSES): 
     task_queue.put('STOP') 
     print 'process ', i, ' is stopped' 

if __name__ == '__main__': 
    image_names =[('MA_HA1_drawing_'+str(i)+'.png') for i in range(10)] 
    freeze_support() 
    test() 

回答

0

背后多的想法:

  • 创建几个工人可以分布到不同的内核中并行执行。
  • 对于多进程这些工作人员是与他们单独的内存空间(与线程相反)的进程。
  • 由于独立的内存空间,它们无法通过内存进行通信(接收任务并发送结果)。因此,需要进行进程间通信的队列。
  • 现在,任务通过队列分配给工人。
  • 最后,收集工人通过队列发送的结果。

如果是强制使用的发布代码,你可以如下做到这一点:

  1. 创建队列
  2. 开始工作进程
  3. 提交任务
    这是非常导入提交任务工人创建。可能的是,队列的缓冲区已满并阻塞,直到从队列中取出某些内容为止,但只要没有工作人员,队列中也不会有任何内容 - > DEADLOCK。
    既然你想要并行执行所有的图像,你的TASKS1(承认复数)必须是[(analyze_picture, (analyze_picture(image1),), (analyze_picture, (analyze_picture(image2),), ...]worker需要一个函数元组和tuple本身作为一个元组)。
  4. 获取和打印效果
  5. 告诉子进程停止

也许这就是你在问什么。

毕竟,有提高性能(和代码的可读性)的另外三个方面:

  • 进程间的通信是相当昂贵的。因此,您应该尽量减少从工作人员转移到工作人员的数据。
    就你而言,这意味着只传递图像名称而不是整个图像。此外,这导致所有图像的并行读入,因为工作人员读取图像。
  • 所有劳动者的东西是在multiprocessing.Pool,从而降低了对多行代码两个已实现的:
    pool = multiprocessing.Pool(processes=NUMBER_OF_PROCESSES) result = pool.map(analyze_picture, [image1, image2, ...])
  • 最后但并非最不重要的,由像素迭代像素是相当缓慢的。随着NumPy(或更高级别的SciPy),你可以加速它很多。

最后,你的脚本可能看起来像以下,并将于7分钟快得多:

import multiprocessing as mp 
import numpy as np 
from scipy import misc 

def analyze_picture(imagename): 
    image = misc.imread(imagename) # image[y, x, r/g/b] 
    return len(np.argwhere((a[756:,616:,0]!=1) & (a[756:,616:,1]!=1) & (a[756:,616:,2]!=1))) 

def main(): 
    pool = mp.Pool()     # default: number of logical cores 
    result = pool.map(analyze_picture, ("MA_HA1_drawing_{}.png".format(i) 
              for i in range(10))) 
    print(result) 

if __name__ == '__main__': 
    mp.freeze_support() 
    main() 

我不知道你的图像看起来怎么样(该{r,g,b}!=1奇怪),但在reference of scipy.misc.imread你会发现适合你的图像模式。

+0

嗨@Chickenmarkus非常感谢你的详细答复。我很感激它,我试过它,然后你给我的答案,我得到它的工作,但现在我有另一个问题,结果是这样的https://pastebin.com/gsviTkLj ,但我想它像 该过程需要x秒 结果 图像X满足x坏点 而不是 过程耗时x秒 结果 这个过程需要x秒...... 我将编辑我的问题,用新的代码,所以你能看到,也许是问题 –

+0

的在'analyze_picture()'中打印时间,'test()'中结果的打印是由于多处理而异步运行的。工人通常不应该印刷一些东西(因为不确定的不同步,而且通常有很多工人)。让worker只返回值(或字符串),而只在主进程test()中打印它。 – Chickenmarkus

相关问题