2016-12-02 50 views
0

一旦列表的长度发生一定的变化,我可以触发某些函数的有效方法是什么?如何在满足条件时触发函数

我有一个嵌套列表,我每秒添加数据100次,并且我想在列表长度增加一些值时触发一个函数。我试图在while循环内使用if声明来执行此操作(请参见下面的my_loop())。这很有效,但这个看似简单的操作占用了我的一个CPU核心的100%。在我看来,不断查询列表大小是脚本的限制因素(将数据添加到while循环中的列表不是资源密集型的)。

这里是我到目前为止已经试过:

from threading import Event, Thread 
import time 

def add_indefinitely(list_, kill_signal): 
    """ 
    list_ : list 
     List to which data is added. 
    kill_signal : threading.Event 
    """ 
    while not kill_signal.is_set(): 
     list_.append([1] * 32) 
     time.sleep(0.01) # Equivalent to 100 Hz. 


def my_loop(buffer_len, kill_signal): 
    """ 
    buffer_len: int, float 
     Size of the data buffer in seconds. Gets converted to n_samples 
     by multiplying by the sampling frequency (i.e., 100). 
    kill_signal : threading.Event 
    """ 
    buffer_len *= 100 
    b0 = len(list_) 
    while not kill_signal.is_set(): 
     if len(list_) - b0 > buffer_len: 
      b0 = len(list_) 
      print("Len of list_ is {}".format(b0)) 


list_ = [] 
kill_signal = Event() 
buffer_len = 2 # Print something every 2 seconds. 


data_thread = Thread(target=add_indefinitely, args=(list_, kill_signal)) 
data_thread.start() 

loop_thread = Thread(target=my_loop, args=(buffer_len, kill_signal)) 
loop_thread.start() 


def stop_all(): 
    """Stop appending to and querying the list. 
    SO users, call this function to clean up! 
    """ 
    kill_signal.set() 
    data_thread.join() 
    loop_thread.join() 

输出示例:

Len of list_ is 202 
Len of list_ is 403 
Len of list_ is 604 
Len of list_ is 805 
Len of list_ is 1006 
+1

Python的多线程不切换活动线程直到I/O需要地方或'time.sleep()'被称为当前正在运行的那个。这意味着你的代码大部分时间都在执行一个线程或另一个线程。 – martineau

+0

谢谢。即使当我使用1 ms的睡眠持续时间('time.sleep(0.001)')时,这也是有效的。 – Jakub

回答

1

这是not very safe从两个线程访问列表,所以我会建议线程间通信更安全的方式。在CPython中,您的代码不会损坏列表的内容,但每次处理批次时可能不会得到完全200个项目。如果您开始从my_loop()的列表中删除项目,则可能会遇到麻烦。如果您使用其他版本的Python而不使用GIL,则可能会遇到更多麻烦。

在此之前,尽管如此,这是我所能想到的解决所问问题的最小更改:CPU使用率。我只是增加了一个睡眠my_loop()和清理缓存的计算,因此现在报告一个大多持稳201,401,601。偶尔,我看到了1002

from threading import Event, Thread 
import time 

def add_indefinitely(list_, kill_signal): 
    """ 
    list_ : list 
     List to which data is added. 
    kill_signal : threading.Event 
    """ 
    while not kill_signal.is_set(): 
     list_.append([1] * 32) 
     time.sleep(0.01) # Equivalent to 100 Hz. 


def my_loop(buffer_len, kill_signal): 
    """ 
    buffer_len: int, float 
     Size of the data buffer in seconds. Gets converted to n_samples 
     by multiplying by the sampling frequency (i.e., 100). 
    kill_signal : threading.Event 
    """ 
    buffer_len *= 100 
    b0 = len(list_) 
    while not kill_signal.is_set(): 
     time.sleep(0.01) 
     if len(list_) - b0 >= buffer_len: 
      b0 += buffer_len 
      print("Len of list_ is {}".format(len(list_))) 


list_ = [] 
kill_signal = Event() 
buffer_len = 2 # Print something every 2 seconds. 


data_thread = Thread(target=add_indefinitely, args=(list_, kill_signal)) 
data_thread.start() 

loop_thread = Thread(target=my_loop, args=(buffer_len, kill_signal)) 
loop_thread.start() 


def stop_all(): 
    """Stop appending to and querying the list. 
    SO users, call this function to clean up! 
    """ 
    kill_signal.set() 
    data_thread.join() 
    loop_thread.join() 

time.sleep(30) 
stop_all() 

现在,安全地做到这一点,我建议你使用queue。这将允许许多线程读取或写入队列,并且它将处理通信。如果一个线程试图从一个空队列中读取数据,它就会阻塞,直到某个其他线程将一个项目添加到队列中。

我不确定你想要做什么,所以我只是把它们放在一个列表中,留在那里。但是,现在列表只能由一个线程访问,在处理每批100个项目后清除它是安全的。

因为my_loop()现在被阻塞,所以当您设置kill信号时,它不一定会注意到。相反,我在请求队列中使用了None的Sentry值来告诉它关闭。如果这不起作用,则可以在从队列中获取项目时使用超时,检查终止信号,然后尝试再次获取项目。

from threading import Event, Thread 
from queue import Queue 
import time 

def add_indefinitely(request_queue, kill_signal): 
    """ 
    list_ : list 
     List to which data is added. 
    kill_signal : threading.Event 
    """ 
    while not kill_signal.is_set(): 
     request_queue.put([1] * 32) 
     time.sleep(0.01) # Equivalent to 100 Hz. 
    request_queue.put(None) # Signal to shut down 


def my_loop(buffer_len, kill_signal): 
    """ 
    buffer_len: int, float 
     Size of the data buffer in seconds. Gets converted to n_samples 
     by multiplying by the sampling frequency (i.e., 100). 
    kill_signal : threading.Event 
    """ 
    received_items = [] # replaces list_ 
    buffer_len *= 100 
    while True: 
     item = request_queue.get() 
     if item is None: 
      break 
     received_items.append(item) 
     if len(received_items) % buffer_len == 0: 
      print("Len of received_items is {}".format(len(received_items))) 


request_queue = Queue() 
kill_signal = Event() 
buffer_len = 2 # Print something every 2 seconds. 


data_thread = Thread(target=add_indefinitely, args=(request_queue, kill_signal)) 
data_thread.start() 

loop_thread = Thread(target=my_loop, args=(buffer_len, kill_signal)) 
loop_thread.start() 


def stop_all(): 
    """Stop appending to and querying the list. 
    SO users, call this function to clean up! 
    """ 
    kill_signal.set() 
    data_thread.join() 
    loop_thread.join() 

time.sleep(30) 
stop_all() 
0

我不知道如果我完全得到你想要做什么,但这里是我的想法:

add_indefinitely()在MyThread类中实现,该类使用Manager类的实例作为参数parent。当遇到increment时,MyThread实例调用parenton_increment()方法做东西。

from threading import Event, Thread 
import time 


class Manager(object): 

    def on_increment(self, thread): 
     print thread.get_data_len() 


class MyThread(Thread): 

    def __init__(self, parent, increment, kill_signal): 
     super(MyThread, self).__init__() 
     self.parent = parent 
     self.data = [] 
     self.increment = increment 
     self.kill_signal = kill_signal 

    def run(self): 
     while not self.kill_signal.is_set(): 
      self.data.append([1] * 32) 
      time.sleep(0.01) 
      if len(self.data) % self.increment == 0: 
       self.parent.on_increment(self) 

    def get_data_len(self): 
     return len(self.data) 


kill_signal = Event() 
manager = Manager() 
thread = MyThread(manager, 200, kill_signal) 
thread.deamon = True 


try: 
    thread.start() 
    while True: 
     time.sleep(1) 
except (KeyboardInterrupt, SystemExit): 
    kill_signal.set() 
    thread.join() 
0

这将是一个很多更有效有一个单独的线程循环和检查列表的长度,而只是有一个阻塞,等待事件发生。下面是说明如何做这样的事情额外增加threading.Lock对象来控制全局变量或资源的并发访问(如打印到stdout)为例:

from threading import Event, Lock, Thread 
import time 

THRESHOLD = 100 # minimum number of items in list_ before event triggered 
list_ = [] 
list_lock = Lock() # to control access to global list_ 
length_signal = Event() 
print_lock = Lock() # to control concurrent printing 

def add_indefinitely(): 
    while True: 
     with list_lock: 
      list_.append([1] * 32) 
      if len(list_) >= THRESHOLD: 
       with print_lock: 
        print('setting length_signal') 
       length_signal.set() 
     time.sleep(.01) # give other threads a change to run 

def length_reached(): 
    """ 
    Waits until list_ has reached a certain length, and then print message 
    """ 
    with print_lock: 
     print('waiting for list_ to reach threshold length') 
    length_signal.wait() # blocks until event is set 
    with print_lock: 
     with list_lock: 
      print('list_ now contains at least {} items'.format(len(list_))) 

# first start thread that will wait for the length to be reached 
length_reached_thread = Thread(target=length_reached) 
length_reached_thread.start() 

data_thread = Thread(target=add_indefinitely) 
data_thread.daemon = True 
data_thread.start() 

length_reached_thread.join() 
with print_lock: 
    print('finished')