2015-09-03 81 views
6

我有一个简单的要求。我正在运行apscheduler作为一个单独的过程。我有另一个作业生产者脚本,我想将作业添加到调度程序并运行它。APScheduler如何在调度程序外添加作业?

这是我的调度代码,

# appsched.py 
from apscheduler.schedulers.blocking import BlockingScheduler 
scheduler = BlockingScheduler() 
scheduler.start() 

这是我的工作制片脚本,

# jobproducer.py 
from appsched import scheduler 

def say_hello_job(): 
    print "Hello" 

scheduler.add_job(say_hello_job, 'interval', minutes=1) 

不用说,这没有奏效。有没有办法通过使用jobstore来完成这项工作? 如何与多个不同的作业生产者共享一个调度器?

回答

1

我有一个类似的问题,我的调度程序进程是一个uWSGI MULE进程,并有一个单独的应用程序,我想添加新的作业。

望着BaseScheduleradd_job()功能:

with self._jobstores_lock: 
if not self.running: 
    self._pending_jobs.append((job, jobstore, replace_existing)) 
    self._logger.info('Adding job tentatively -- it will be properly scheduled when the scheduler starts') 
else: 
    self._real_add_job(job, jobstore, replace_existing, True) 

你可以看到这个问题:调度创造了就业,只有当它已经启动。

的解决方案是幸运的很简单,我们应该定义我们自己的“附加作业只”计划:

class JobAddScheduler(BlockingScheduler): 
    def add_job(self, func, trigger=None, args=None, kwargs=None, id=None, name=None, misfire_grace_time=undefined, 
       coalesce=undefined, max_instances=undefined, next_run_time=undefined, jobstore='default', 
       executor='default', replace_existing=False, **trigger_args): 

    job_kwargs = { 
     'trigger': self._create_trigger(trigger, trigger_args), 
     'executor': executor, 
     'func': func, 
     'args': tuple(args) if args is not None else(), 
     'kwargs': dict(kwargs) if kwargs is not None else {}, 
     'id': id, 
     'name': name, 
     'misfire_grace_time': misfire_grace_time, 
     'coalesce': coalesce, 
     'max_instances': max_instances, 
     'next_run_time': next_run_time 
    } 
    job_kwargs = dict((key, value) for key, value in six.iteritems(job_kwargs) if value is not undefined) 
    job = Job(self, **job_kwargs) 

    # Add jobs to job store 
    with self._jobstores_lock: 
     self._real_add_job(job, jobstore, replace_existing, True) 

    return job 

    def start(self): 
    pass 

    def shutdown(self, wait=True): 
    pass 

    def _main_loop(self): 
    pass 

    def wakeup(self): 
    pass 

然后,我们可以添加cron作业瞬间:

jobscheduler = JobAddScheduler() 
jobscheduler.add_job(...) 

唐不要忘记配置调度程序!在我来说,我使用的SQLAlchemy-MySQL后端存储工作:

jobstores=dict(default=SQLAlchemyJobStore(url='mysql+pymsql://USER:[email protected]/DATABASE')) 
jobscheduler.configure(jobstores=jobstores) 

我不知道其他的作业存储,但之后我添加了一个新的工作,我只好打电话给独立的调度过程的wakeup()功能以“积极”工作。我使用uWSGI的信号系统实现了这一点。

+0

我很确定'self._jobstores_lock'实际上并没有做正确的事情,因为它在一个单独的进程中。 – scribu