2017-08-24 46 views
0

TL; DR - 消费者流程完成但是未加入,没有错误发生并且脚本无限地运行,在join陈述中陷入了僵局?多处理 - 进程将不会加入?

我的目标,以加快数据检索过程,但我不知道有多少“任务”(数据块检索)可能存在。所以,我提出毒丸法的修改版本,这样的任务时识别它不再检索信息,并触发毒丸if声明。

我都贴出了证明,这是我的毒药方法的工作示例和完整的脚本,它顾名思义是完整的脚本。 (两者都应该能够运行为是)

证明

import multiprocessing 


class Task: 
    def __init__(self, number): 
     self.number = number 

    def __call__(self): 
     """Find officer and company data and combine and save it""" 

     try: 
      # 'gather some data!' 
      self.result = self.number*2 
      print(self.number) 
      # 'fake' finding no data 
      if self.result >= 8: 
       raise NameError 
     except NameError: 
      # become poison pill once latest is done 
      self.result = None 

    def output(self): 
     return self.result 


class Consumer(multiprocessing.Process): 
    """Handle process and re-queue complete tasks""" 
    def __init__(self, waiting_queue, complete_queue): 
     multiprocessing.Process.__init__(self) 
     self.waiting_queue = waiting_queue 
     self.complete_queue = complete_queue 

    def run(self): 
     """process tasks until queue is empty""" 
     proc_name = self.name 
     while True: 
      current_task = self.waiting_queue.get() 
      current_task() 
      if current_task.output() is None: 
       print('{}: Exiting, poison pill reached'.format(proc_name)) 
       self.waiting_queue.task_done() 
       break 
      self.waiting_queue.task_done() 
      self.complete_queue.put(current_task) 
     print('{}: complete'.format(proc_name)) 


class Shepard: 
    """Handle life cycle of Consumers, Queues and Tasks""" 
    def __init__(self): 
     pass 

    def __call__(self, start_point): 

     # initialize queues 
     todo = multiprocessing.JoinableQueue() 
     finished = multiprocessing.JoinableQueue() 

     # start consumers 
     num_consumers = multiprocessing.cpu_count() * 2 
     consumers = [Consumer(todo, finished) for i in range(num_consumers)] 
     for q in consumers: 
      q.start() 

     # decide on (max) end limit (make much longer than suspected amount of data to be gathered 
     start = int(start_point) 
     max_record_range = 100 
     end = start + max_record_range 

     # Enqueue jobs 
     for i in range(start, end): 
      todo.put(Task(i)) 
     print('Processes joining') 
     # wait for processes to join 
     for p in consumers: 
      p.join() 
     print('Processes joined') 

     # process results - UNFINISHED 
     pass 

     # return results - UNFINISHED 
     return 'results!' 


if __name__ == '__main__': 

    # load start points: 
    start_points = {'cat1': 1, 'cat2': 3, 'cat3': 4} 


    master = Shepard() 
    cat1 = master(start_points['cat1']) 
    print('cat1 done') 
    cat2 = master(start_points['cat2']) 
    print('cat2 done') 
    cat3 = master(start_points['cat3']) 

因此,这里是完整的脚本

import time 
import requests 
import sys 
import json 
import pandas as pd 
import multiprocessing 
import queue 


class CompaniesHouseRequest: 
    """Retreive information from Companies House""" 
    def __init__(self, company, catagory_url=''): 
     """Example URL: '/officers'""" 
     self.company = str(company) 
     self.catagory_url = str(catagory_url) 

    def retrieve(self, key='Rn7RLDV9Tw9v4ShDCotjDtJFBgp1Lr4d-9GRYZMo'): 
     """retrieve data from Companies House""" 
     call = 'https://api.companieshouse.gov.uk/company/' + self.company + self.catagory_url 
     retrieve_complete = False 
     while retrieve_complete is False: 
      resp = requests.get(call, auth=requests.auth.HTTPBasicAuth(key, '')) 
      code = resp.status_code 
      if code == 404: 
       print(resp.status_code) 
       raise NameError('Company not found') 
      elif code == 200: 
       try: 
        self.data = json.loads(resp.content.decode('UTF8')) 
        retrieve_complete = True 
       except json.decoder.JSONDecodeError: 
        print('Decode Error in Officers!') 
      else: 
       print("Error:", sys.exc_info()[0]) 
       print('Retrying') 
       time.sleep(5) 
     return self.data 


class Company: 
    """Retrieve and hold company details""" 
    def __init__(self, company_number): 
     self.company_number = company_number 

    def __call__(self): 
     """Create request and process data""" 
     # make request 
     req = CompaniesHouseRequest(self.company_number) 
     data = req.retrieve() 
     # extract data 
     try: 
      line = [self.company_number, 
        data['company_name'], 
        data['registered_office_address'].get('premises', ''), 
        data['registered_office_address'].get('address_line_1', ''), 
        data['registered_office_address'].get('address_line_2', ''), 
        data['registered_office_address'].get('country', ''), 
        data['registered_office_address'].get('locality', ''), 
        data['registered_office_address'].get('postal_code', ''), 
        data['registered_office_address'].get('region', '')] 
     except KeyError: 
      line = ['' for i in range(0, 9)] 
     # save as pandas dataframe 
     return pd.DataFrame([line], columns=['company_number', 'company_name', 'company_address_premises', 
              'company_address_line_1', 'company_address_line_2', 
              'company_address_country', 'company_address_locality', 
              'company_address_postcode', 'company_address_region']) 


def name_splitter(name): 
    split = name.split(', ') 
    if len(split) > 2: 
     return [split[2], split[1], split[0]] 
    else: 
     return ['', split[1], split[0]] 


class Officers: 
    """Retrieve and hold officers details""" 
    def __init__(self, company_number): 
     self.company_number = company_number 

    def __call__(self): 
     """Create request and process data""" 
     # make request 
     req = CompaniesHouseRequest(self.company_number, '/officers') 
     data = req.retrieve() 
     # extract data 
     for officer in data['items']: 
      if officer['officer_role'] == 'director': 
       name = name_splitter(officer['name']) 
       line = [name[0], 
         name[1], 
         name[2], 
         officer.get('occupation'), 
         officer.get('country_of_residence'), 
         officer.get('nationality'), 
         officer.get('appointed_on', ''), 
         officer['address'].get('premises', ''), 
         officer['address'].get('address_line_1', ''), 
         officer['address'].get('address_line_2', ''), 
         officer['address'].get('country', ''), 
         officer['address'].get('locality', ''), 
         officer['address'].get('postal_code', ''), 
         officer['address'].get('region', '')] 
       break 
     director_count = sum(map(lambda x: x['officer_role'] == 'director', data['items'])) 
     if director_count > 1: 
      line += [True] 
     elif director_count == 1: 
      line += [False] 
     else: 
      line = ['no directors'] * 3 + [''] * 12 
     return pd.DataFrame([line], columns=['title', 'first_name', 'surname', 'occupation', 'country_of_residence', 
              'nationality', 'appointed_on', 
              'address_premises', 'address_line_1', 'address_line_2', 
              'address_country', 'address_locality', 'address_postcode', 
              'address_region', 'multi_director']) 


class Task: 
    def __init__(self, prefix, company_number): 
     self.prefix = prefix 
     self.company_number = company_number 

    def __call__(self): 
     """Find officer and company data and combine and save it""" 
     comp_id = self.prefix + str(self.company_number) 
     print(comp_id) 
     try: 
      # initialise company class 
      comp = Company(comp_id) 
      # initialise officer class 
      off = Officers(comp_id) 
      # retrieve and concatonate 
      self.result = pd.concat([comp(), off()], axis=1) 

     except NameError: 
      # become poison pill once latest is done 
      self.result = None 

    def output(self): 
     return self.result 


class Consumer(multiprocessing.Process): 
    """Handle process and re-queue complete tasks""" 
    def __init__(self, waiting_queue, complete_queue): 
     multiprocessing.Process.__init__(self) 
     self.waiting_queue = waiting_queue 
     self.complete_queue = complete_queue 

    def run(self): 
     """process tasks until queue is empty""" 
     proc_name = self.name 
     while True: 
      current_task = self.waiting_queue.get() 
      current_task() 
      if current_task.output() is None: 
       print('{}: Exiting, poison pill reached'.format(proc_name)) 
       self.waiting_queue.task_done() 
       break 
      self.waiting_queue.task_done() 
      self.complete_queue.put(current_task) 
     print('{}: complete'.format(proc_name)) 


class Shepard: 
    """Handle life of Consumers, Queues and Tasks""" 
    def __init__(self): 
     pass 

    def __call__(self, prefix, start_point): 

     # initialize queues 
     todo = multiprocessing.JoinableQueue() 
     finished = multiprocessing.JoinableQueue() 

     # start consumers 
     num_consumers = multiprocessing.cpu_count() * 2 
     consumers = [Consumer(todo, finished) for i in range(num_consumers)] 
     for q in consumers: 
      q.start() 

     # decide on (max) end limit 
     start = int(start_point) 
     max_record_range = 1000 
     end = start + max_record_range 

     # Enqueue jobs 
     for i in range(start, end): 
      todo.put(Task(prefix, i)) 
     print('Processes joining') 

     # wait for processes to join 
     for p in consumers: 
      p.join() 
     print('Processes joined') 

     # process results - UNFINISHED 
     pass 

     # return results - UNFINISHED 
     return 'results!' 


if __name__ == '__main__': 
    # paths to data 
    data_directory = r'C:\Users\hdewinton\OneDrive - Advanced Payment Solutions\Python\Corporate DM\data' 
    base = r'\base' 

    # load start points: 
    init = {"England": 10926071, "Scotland": 574309, "Ireland": 647561} 

    # gather data for each catagory 
    master = Shepard() 
    ireland = master('NI', init['Ireland']) 
    scotland = master('SC', init['Scotland']) 
    england = master('', init['England']) 
+0

你得到的错误是什么? –

+0

没有错误,消费者直到他们最后的印刷资料才运行,但未能加入 –

回答

0

TL; DR - 在后果(陷入无人过问,而消费者无法加入)可以是固定的,通过改变这样的:

finished = multiprocessing.JoinableQueue() 

这样:

mananger = multiprocessing.Manager() 
finished = mananger.Queue() 

详细 - “当一个对象被放在一个队列,该对象被酸洗,后台线程以后的酸洗数据刷新到下层管。这有一些令人惊讶的后果,但不应该造成任何实际困难 - 如果他们真的打扰你,那么你可以使用一个经理创建的队列。“来自documentation

第二个队列,成品,触发器上述令人惊讶consquences之一如果任务的一定数目的被添加到它。下面的限制不存在任何问题和上述后果发生的极限。这不会在伪因为第二队列发生,而目前,未使用。该限制取决于Task对象的规模和复杂性,所以我侦察这个事做腌数据仅出现的冲洗达到数据的某个卷后 - 数据量触发此后果

补遗 - 一旦修复程序已实现的另一错误也出现:作为todo队列的消费者队列 是空离开之前终止发生管错误管道内没有连接对象的队列对象发送数据。这会触发WinError 232。不用担心,管道错误可以通过在退出消费者之前清空队列来解决。 只需将其添加到消费者类的run方法:

while not self.waiting_queue.empty(): 
      try: 
       self.waiting_queue.get(timeout=0.001) 
      except: 
       pass 
     self.waiting_queue.close() 

这个从队列中删除的每一个元素,确保其主要while环路和管错误,因为消费者会前清空将排队不应发生后,终止。