2017-03-01 103 views
0

我已经使用了很多这个功能,并且找不到任何答案 - 因此我在问。并发线程正在等待任务

现在已经过去了一天,但我无法了解一些线程概念,这可能是为什么我的代码很混乱。

我生成3个主题。精细。

当线程2产生时,线程1“停止”,我假设它意味着它死亡。线程2和3相同。

我将这些线程放入活动池中。

我在努力的是保持所有3个线程同时运行并等待。我想有一个随机的时间间隔为任务分配线程的方法。

从我收集的,我的线程正在死亡的原因是因为我的工人类正在返回。然而,玩过它并把它放在一个循环中(while 1),我仍然无法获得任何工作。

任何想法?

import logging 
import random 
import threading 
import time 

logging.basicConfig(level = logging.DEBUG, format = '(%(threadName)-2s) %(message)s') 

class ActivePool(object): 
    def __init__(self): 
     super(ActivePool, self).__init__() 

     self.active = [] 
     self.lock = threading.Lock() 

    def activate(self, name): 
     with self.lock: 
      self.active.append(name) 
      logging.debug('Running wheel: %s', self.active) 
      self.move(name) 

    def move(self, name): 
     while name.is_alive(): 
      logging.debug('yes') 

    def inactive(self, name): 
     with self.lock: 
      self.active.remove(name) 
      logging.debug('Running wheel: %s', self.active) 

    def rtime(self): 
     self.rt = random.randint(5, 10) 
     t = threading.Timer(rt, self.revent) 

    def join(self): 
     for t in self.active: 
      t.join() 

    def check(self): 
     for t in self.active: 
      if t.is_alive(): 
       print t 

def worker(s, pool): 
    logging.debug('Wheel inactive') 

    with s: 
     #name = threading.currentThread().getName() 
     thread = threading.currentThread() 
     logging.debug('ACTIVATING') 
     pool.activate(thread) 
     #time.sleep(2) 
     #pool.inactive(thread) 

if __name__ == "__main__": 
    pool = ActivePool() 
    s = threading.Semaphore() 

    for i in range(0, 6): 
     t = threading.Thread(target = worker, name = str(i + 1), args = (s, pool)) 
     pool.activate(t) 
     t.start() 

    logging.debug('here') 
+0

即使在这里添加一些伪代码也会有所帮助。否则会非常难以诊断。 –

+0

@phyllisdiller新增 – popopret

+0

当我清理一些东西时,我会让线程1旋转起来而不会死亡。 但我不确定这个意图。你希望基本上有一个等待分配任务的线程池,对吗? 但是,您的主函数以及您的工作函数(线程要运行的东西)会激活线程。您不应该从线程内激活线程。 –

回答

0

好吧。我已经改变了一些东西。基本上你想要的是这个命令的顺序:

  • 构建一个ActivePool。
  • 将线程添加到您的ActivePool。
  • 调用ActivePool.start()开始线程。
  • 工作线程运行辅助函数,共享数据由信号量保护。
  • 主线程等待所有线程完成。

您不需要加入线程。

如果您确实添加了一个随机任务,您可以将其添加到某个列表(您必须锁定信号量),该列表将由工作人员函数从中抽取并执行。如果工作人员在列表中看到某些东西,它会将其从列表中拉出并执行相关操作。如果没有什么可做的,让线程睡觉。

您可能希望在您启动线程池之前将所有线程添加到线程池(即在ActivePool中创建一个列表,然后执行pool.activate()并依次激活每个线程)。

import logging 
import random 
import threading 
import time 

logger = logging.getLogger("thread_logger") 
logger.setLevel(logging.DEBUG) 

class ActivePool(object): 
    def __init__(self): 
     super(ActivePool, self).__init__() 

     self.active = [] 
     self.lock = threading.Lock() 

    def activate(self, name): 
     with self.lock: 
      self.active.append(name) 
      logger.debug('Running wheel: %s', self.active) 
      t.start() 

    def inactive(self, name): 
     with self.lock: 
      self.active.remove(name) 
      logger.debug('Running wheel: %s', self.active) 

    def rtime(self): 
     self.rt = random.randint(5, 10) 
     t = threading.Timer(rt, self.revent) 

    def join(self): 
     for t in self.active: 
      t.join() 

    def check(self): 
     for t in self.active: 
      if t.is_alive(): 
       return True 

def worker(s, pool): 

    logger.debug('Worker spinning up') 

    for x in range(0, 3): 
     with s: 
      logger.debug('Thread ID: ' + str(threading.currentThread().ident) + ' DO WORK: ' + str(x)) 
     time.sleep(2) 

if __name__ == "__main__": 

    pool = ActivePool() 
    s = threading.Semaphore() 

    for i in range(0, 2): 
     t = threading.Thread(target = worker, name = str(i + 1), args = (s, pool)) 
     pool.activate(t) 

    while(pool.check()): 
     print("Worker thread still workin yo.") 
     time.sleep(2) 
    logger.debug('Finito.')