2017-07-14 40 views
2

我正在尝试对每日运行的芹菜任务运行单元测试。
我试过导入函数并在我的测试中调用它,但这不起作用。是否可以对芹菜周期性任务运行单元测试?

的任务是:

@shared_task 

def create_a_notification_if_a_product_is_in_or_out_of_season(): 
    """ 
    Send a notification if a product is now in or out of season 
    """ 
    julian_date = date.today().timetuple().tm_yday + 1 
    active_products = Product.objects.filter(status='ACTIVE') 

    for products in active_products: 
     in_season_prd = ProductDescription.objects.filter(
      product=products, 
      early_start_julian=julian_date 
     ) 
     for prd in in_season_prd: 
      notification = Notification() 
      notification.type = notification_choices.PRODUCT_IN_SEASON 
      notification.description = str(prd.product.name) + " will be in season from tomorrow." 
      notification.save() 

,这里是我的测试中的一个例子:

def test_when_product_is_about_to_come_in_to_seasonality(self): 
    """ 
    Make a notification when a product is due to come in to seasonality tomorrow 
    """ 
    p = Product.objects.first() 
    p.status = "ACTIVE" 
    today = date.today().timetuple().tm_yday 
    p.early_start_julian = today + 1 
    create_a_notification_if_a_product_is_in_or_out_of_season() 
    updated_notifications = Notification.objects.all().count() 
    self.assertNotEqual(self.current_notifications, updated_notifications) 

任何帮助,将不胜感激!

感谢

回答

0

您可以apply()你的芹菜任务同步执行它:

def test_when_product_is_about_to_come_in_to_seasonality(self): 
    """ 
    Make a notification when a product is due to come in to seasonality tomorrow 
    """ 
    p = Product.objects.first() 
    p.status = "ACTIVE" 
    today = date.today().timetuple().tm_yday 
    p.early_start_julian = today + 1 
    create_a_notification_if_a_product_is_in_or_out_of_season.apply() 
    updated_notifications = Notification.objects.all().count() 
    self.assertNotEqual(self.current_notifications, updated_notifications) 
+0

我试过这个,我有相同的结果,任务不运行应该创建一个通知,因此当我比较自我。 current_notifications与updated_notification他们仍然相等,并且测试失败 –

+0

我意识到,这仍然是失败的原因是由于我建立了工厂,它创建了产品来运行测试。将.apply()添加到导入任务的末尾仍然有效,但任务正在触发。非常感谢! –

+0

高兴地帮忙@BenCurrie :) –

0

我认为你正在寻找CELERY_ALWAYS_EAGER设置。如果设置为True它将同步运行您的任务。你可以在你的测试设置中设置它,或者你可以装饰只有该测试@override_settings(CELERY_ALWAYS_EAGER=True)

+0

我刚试过这个,但它似乎仍然没有运行任务,因为没有通知被创建,并且测试失败。不过谢谢 –