2014-01-12 16 views
1

我正在构建一个类,该类在其自己的进程中启动并以批量大小将数据推送到我的数据库。该课程使用Manager.list()来获取数据。我认为这将是一种常见的模式,即在单独的过程中访问数据库,但我无法找到合适的库,因此我认为我会推出自己的库。Python多线程Manager.list(),如何正确访问数据

我在内部使用threading.Timer来唤醒我的数据库worker并检查共享队列。然而,当它醒来时,队列中没有任何东西(尽管内容放在那里)。我是否错误地使用了Manager.list()

的源代码:

import random 
from threading import Timer 
import threading 

from sqlalchemy import * 
from multiprocessing import Process, Manager 
from util.config import get_connection 


def __convert_to_key(connection, table): 
    return "{}.{}".format(connection.name, table.name) 


class ConnectionWorker(): 
    __batch_size = 1000 
    __batch_insert_queue = None 
    __manager = Manager() 
    __wait_interval = 5.0 
    __finish = False 
    __connection = None 
    __table = None 
    __timer = None 
    finished = False 

    def __init__(self, connection, table): 
     self.__lock = threading.RLock() 
     self.__connection = connection 
     self.__table = table 
     p = Process(target=self.__insert_data) 
     p.start() 

    def get_batch_insert_queue(self): 
     self.__lock.acquire() 
     try: 
      if self.__batch_insert_queue is None: 
       self.__batch_insert_queue = self.__manager.list() 
      return self.__batch_insert_queue 
     finally: 
      self.__lock.release() 

    def __insert_data(self): 
     print("__insert_data, the queue is {}".format(len(self.get_batch_insert_queue()))) 
     q = self.get_batch_insert_queue() 

     #push everything now if we have been told to finish 
     if self.__finish: 
      print("__finish flag has been set") 
      self.__connection.execute(self.__table.insert().values(q)) 
      self.finished = True 
      return 

     #if there is nothing to do then just sleep 
     if len(q) == 0: 
      print("The queue is empty, sleeping") 
      self.__timer = Timer(self.__wait_interval, self.__insert_data) 
      self.__timer.start() 
      self.__timer.join() 

     values_to_insert = [] 
     while len(q) > 0 and len(values_to_insert) < self.__batch_size: 
      values_to_insert.append(q.pop) 
     print("Inserting {} values".format(len(values_to_insert))) 
     self.__connection.execute(self.__table.insert().values(values_to_insert)) 

     #don't sleep if the queue has more work to do 
     if len(q) >= self.__batch_size: 
      print("Not sleeping, there is more work to be done, {} items".format(len(q))) 
      self.__insert_data() 
     else: 
      print("Sleeping") 
      self.__timer = Timer(self.__wait_interval, self.__insert_data).start() 
      self.__timer.start() 
      self.__timer.join() 

    def finish(self): 
     print("Setting finish to true") 
     self.__finish = True 

#test data 
if __name__ == "__main__": 
    #create the db and get metadata 
    conn = get_connection() 
    query = "DROP TABLE IF EXISTS tmp_test" 
    try: 
     conn.execute(query) 
    except: 
     pass 
    query = """CREATE TABLE tmp_test (
    value bigint DEFAULT NULL 
    ) ENGINE=InnoDB;""" 
    conn.execute(query) 

    metadata = MetaData() 
    metadata.reflect(bind=conn) 
    tbl = metadata.tables["tmp_test"] 

    c = ConnectionWorker(conn, tbl) 
    q = c.get_batch_insert_queue() 
    for item in random.sample(xrange(1, 1000000000), 100000): 
     q.append(item) 
    print("The queue is {}".format(len(q))) 
    print("The batch queue is {}".format(len(c.get_batch_insert_queue()))) 
    import time 
    time.sleep(10) 
    c.finish() 

    while not c.finished: 
     time.sleep(1) 

运行日志:

__insert_data, the queue is 0 
The queue is empty, sleeping 
The queue is 100000 
The batch queue is 100000 
__insert_data, the queue is 0 
The queue is empty, sleeping 
__insert_data, the queue is 0 
The queue is empty, sleeping 
Setting finish to true 
__insert_data, the queue is 0 
The queue is empty, sleeping 

第一个队列为空是有道理的(对象初始化),但接下来的两个看起来像他们应该在他们的项目。我还不清楚为什么当完成对象设置为True时,工作人员通过self._ 完成检查(我认为它应该打印“ _finish标志已设置”)。

评论欢迎(以及指向默认情况下处理所有这些的库)。

回答

0

数据永远不会含蓄跨进程共享。两种后果:

  1. 在主程序中创建的Manager.list()无关与工作进程创建的Manager.list();并且,

  2. 主程序中的self.__finish属性与工作进程中的self.__finish属性无关。

你应该退后一步,尝试更简单的代码,直到这些东西对你更有意义。 “通常”的做法是沿着这些方向(我抛出所有的课程和方法,所以有可能在这里看到什么是重要的)。请注意,有没有需要额外的线程,或sleep ING等:

# shared data must be passed, even mp data structures 
def worker(q): 
    values_to_insert = [] 
    while True: 
     item = q.get() # no need to sleep - blocks until data is ready 
     if item is None: 
      break 
     values_to_insert.append(item) 
     if len(values_to_insert) >= 39: # whatever - your `__batch_size` 
      print "processing", values_to_insert 
      values_to_insert = [] 
    # deal with any leftovers 
    if values_to_insert: 
     print "processing", values_to_insert 

if __name__ == "__main__": 
    import multiprocessing as mp 
    import random 

    q = mp.Queue(100) # bounded queue 
    proc = mp.Process(target=worker, args=(q,)) 
    proc.start() 
    for item in random.sample(xrange(1, 1000000000), 100000): 
     # will block if q has more than 100 items; blocking 
     # waits for worker to catch up 
     q.put(item) 
    q.put(None) # tell worker we're done 
    proc.join() 
+0

不了'C = ConnectionWorker(康涅狄格州,TBL)'开始一个新的进程和'Q = c.get_batch_insert_queue()'获得主线程使用的列表代理? – Christopher

+0

没有冒犯的意图,但是你的代码对于我来说太复杂了,不想花时间来详细解释它。相反,我给了你更简单的代码,实际工作;-)你不需要接受它。如果你想自己解开它,太棒了!但是,请接受我的建议,并将您的代码**减少到最低要求。你在这里做了很多可怕的事情(比如在主程序中创建一个'Manager()'作为编译类定义的副作用),你的代码甚至不能运行在Windows上。 –