2017-05-25 45 views
1

我有以下代码:收集期货从APScheduler

import pandas as pd 
from concurrent.futures import ThreadPoolExecutor, as_completed 
from datetime import datetime 
from apscheduler.schedulers.blocking import BlockingScheduler 


class FutureScheduler(object): 

    def __init__(): 
     self.futures = [] 
     self.scheduler = BlockingScheduler() 
     self.pool = ThreadPoolExecutor(5) 
     self.full_frame = pd.DataFrame() 

    def start(self): 
     job = self.scheduler.add_job(self.add_future, 'cron', day_of_week='mon-fri', hour='8-15', minute='*') 
     self.scheduler.start() 
     self.flush_csvs() 

    def add_future(self): 
     self.futures.append(self.pool.submit(self.long_running_task)) 

    def flush_csvs(self): 
     for future in as_completed(self.futures): 
      results = future.result() 
      self.full_frame = pd.concat((self.full_frame, results)) 
      self.full_frame.to_csv('results.csv') 
      print "flushed... Queue size: %s" % len(self.futures) 

    def long_running_task(self): 
     #takes a while may or may not return before the next one is kicked off 

所以我的问题是,flush_csvs循环的内部代码永远不会运行。我是否必须在as_completed被调用之前将所有期货添加到列表中?有没有办法让BlockingScheduler回归未来?我看到它返回一个Job,但在这种情况下,我希望它更像未来。

回答

1

这不起作用,因为调度程序阻止主线程继续。这可以防止flush_csvs被执行。

self.scheduler.start() 
self.flush_csvs() 

但是,这可能不是你想要的。 APScheduler在内部使用线程池,因此回调(self.long_running_task)已经在单独的线程中执行。

如果您需要多个内核(使用ProcessPoolExecutor而不是ThreadPoolExecutor)等,您可以通过APScheduler更改此线程池的配置,具体取决于您需要的工作器数量。您可能还可以将每个作业配置为做你想做的事。例如,配置每分钟运行一次的作业的策略以合并(仅运行一个),而不是在延迟情况下背靠背运行多次。

http://apscheduler.readthedocs.io/en/latest/userguide.html#choosing-the-right-scheduler-job-store-s-executor-s-and-trigger-s