2014-10-09 34 views
1

我想用芹菜插入大数据在我的mongodb,但问题是并发在MongoDB中。如果我一次发送多个任务到芹菜,一部分数据将被插入到mongodb中,而另一些数据则不会。我想这是因为mongodb在插入操作时锁定了数据库,但我需要一种解决方案来发送相同类型的多个任务以在数据库中插入数据。像检查数据库是否被锁定,如果它正在等待它解锁。这里是我的代码的一部分:如何使用芹菜插入数据到mongodb使用mongoengine

@celery.task(name='celery_tasks.add_book_product') 
def add_book_product(product_dict, store_id): 

    connect(DefaultConfig.MONGODB_DB, host=DefaultConfig.MONGODB_HOST) 

    store_obj = Store.objects.get(pk=store_id) 

    try: 
     book = Books.objects.get(pk=product_dict['RawBook']) 

     try: 
      product_obj = Product.objects.get(store=store_obj, related_book=book, kind='book') 
      print("Product {} found for store {}".format(product_obj.id, store_obj.id)) 
      product_obj.count = int(product_dict['count']) 
      product_obj.buy_price = int(product_dict['buy_book']) 
      product_obj.sell_price = int(product_dict['sell_book']) 

      product_obj.save() 

     except (DoesNotExist, ValidationError): 
      product_obj = Product(store=store_obj, 
            related_book=book, 
            kind='book', 
            count=int(product_dict['count']), 
            buy_price=int(product_dict['buy_book']), 
            sell_price=int(product_dict['sell_book']), 
            name=book.name_fa) 

      product_obj.save() 

      print("Appending books to store obj...") 
      store_obj.products.append(product_obj) 
      store_obj.save() 
      print("Appending books to store obj done") 

     return "Product {} saved for store {}".format(product_obj.id, store_obj.id) 
    except (DoesNotExist, ValidationError): 
     traceback.print_exc() 
     return "Product with raw book {} does not exist.".format(product_dict['RawBook']) 

回答

2

默认情况下,多处理用于执行芹菜任务的并发执行。但有两种方法可以确保在任何给定时间只执行一项任务。

解决方案1:

当你开始

celery -A your_app worker -l info 

芹菜工人默认并发等于你的机器有内核的数量。因此,如果你开始这样的工作人员

celery -A your_app worker -l info -c 1 

它在任何给定的时间只运行一个任务。如果您还有其他一些必须执行的任务,您可以启动一个新的队列并分配一个工作人员来完成。

解决方案2:

这是有点复杂。你需要在你的任务中使用锁,就像这样。

if acquire_lock(): 
    try: 
     #do something 
    finally: 
     release_lock() 
    return 

你可以在Celery documentation了解更多。