2016-06-22 59 views
0

继承人的交易,我有我的数据库中的某些条目。我打电话一个Django:For循环python信息

variables = Variable.objects.order_by('foo').values('foo') 

然后,我有一个for语句上的每个变量执行发现:

for x in variables: 
    #doing something.... 

我的问题是,“做一些事情”是一项持续的任务...即它不停止。那么我怎么能够在第二个变量上运行for循环呢?

我认为这与池化有关,但这并不意味着我只能同时进行4个进程?如果我想要说50个单独的过程为每个50个变量运行,并且每个过程不会停止,直到某个时间或以前,我该怎么做?

甚至可以这样做。

这里是我的多码:

if __name__ == '__main__': 
x = Variable.objects.order_by('foo').values('foo') 
for t in x: 
    t = t.values() 
    foo = "".join(t) 
    info('Starting...') 
    p = Process(target=myfunction, args=(foo,)) 
    p.start() 
    p.join() 

myFunction是什么在一个无限循环运行...

@samuel:

# globals 
my_queue = multiprocessing.Manager().Queue() # queue to store our values 
stop_event = multiprocessing.Event() # flag which signals processes to stop 
my_pool = None 

def my_function(foo): 
    while not stop_event.is_set(): 
     print("starting %s" % foo) 
     try: 
      var = my_queue.get_nowait() # getting value from queue 
     except Queue.Empty: 
      print "No more items in queue" 
     # do you logic here 


# Since `t` could have unlimited size but do wan't to limit processes 
# we'll put all `t` value in queue 

x = Company.objects.order_by('ticker').values('ticker') 
for t in x: 
    foo = t.values() 
    my_queue.put(foo) 

MAX_PROCESSES = len(x) 
my_pool = multiprocessing.Pool(MAX_PROCESSES) 

for i in range(MAX_PROCESSES): 
    my_pool.apply_async(my_function, args=(foo,)) 
my_pool.close() 
my_pool.join() 
+1

['multithreading'(https://docs.python.org/2/library/threading.html) –

+0

我想这是一个他们。感谢您的快速响应 –

+0

@MoonCheesez - 对于50个并发任务,'threading'通常是一个糟糕的选择,因为python GIL只允许运行一次。 'multiprocessing'将是更好的选择。那么“多线程”应该是什么呢? – tdelaney

回答

0

这就是解决方案可以实现的方式使用多处理库。

我们将使用Poolapply_asyncQueue

# globals 
MAX_PROCESSES = 50 
my_queue = multiprocessing.Manager().Queue() # queue to store our values 
stop_event = multiprocessing.Event() # flag which signals processes to stop 
my_pool = None 

def my_function(proc_name, var): 
    while not stop_event.is_set(): 
     # do you logic here with var variable 


def var_scanner_process(): 
    # Since `t` could have unlimited size we'll put all `t` value in queue 
    while not stop_event.is_set(): # forever scan `values` for new items 
     x = Variable.objects.order_by('foo').values('foo') 
     for t in x: 
      t = t.values() 
      my_queue.put(t) 
     time.sleep(10) 

try: 
    var_scanner_process = Process(target=var_scanner) 
    var_scanner_process.start() 
    my_pool = multiprocessing.Pool(MAX_PROCESSES) 

    while not stop_event.is_set(): 
     try: # if queue isn't empty, get value from queue and create new process 
      var = my_queue.get_nowait() # getting value from queue 
      p = Process(target=my_function, args=("process-%d" % i, var)) 
      p.start() 
     exception Queue.Empty: 
      print "No more items in queue" 

except KeyboardInterrupt as stop_test_exception: 
    print(" CTRL+C pressed. Stopping test....") 
    stop_event.set() 
+0

我真的很感谢帮助的人。我可以澄清一下吗?首先找到变量,然后放入“队列”中,然后加入?那么对于我在范围内(“50”)它适用于该函数的异步并传递“进程ID”?什么是进程ID?以及如何将“t”变为“my_function” –

+0

异常Queue.Empty:是一种无效的语法。你能帮助我吗? –

+0

@DenisAngell你必须导入'Queue'模块来处理'Queue.Empty'异常 – Samuel