我正在构建一个类,该类在其自己的进程中启动并以批量大小将数据推送到我的数据库。该课程使用Manager.list()
来获取数据。我认为这将是一种常见的模式,即在单独的过程中访问数据库,但我无法找到合适的库,因此我认为我会推出自己的库。Python多线程Manager.list(),如何正确访问数据
我在内部使用threading.Timer
来唤醒我的数据库worker并检查共享队列。然而,当它醒来时,队列中没有任何东西(尽管内容放在那里)。我是否错误地使用了Manager.list()
?
的源代码:
import random
from threading import Timer
import threading
from sqlalchemy import *
from multiprocessing import Process, Manager
from util.config import get_connection
def __convert_to_key(connection, table):
return "{}.{}".format(connection.name, table.name)
class ConnectionWorker():
__batch_size = 1000
__batch_insert_queue = None
__manager = Manager()
__wait_interval = 5.0
__finish = False
__connection = None
__table = None
__timer = None
finished = False
def __init__(self, connection, table):
self.__lock = threading.RLock()
self.__connection = connection
self.__table = table
p = Process(target=self.__insert_data)
p.start()
def get_batch_insert_queue(self):
self.__lock.acquire()
try:
if self.__batch_insert_queue is None:
self.__batch_insert_queue = self.__manager.list()
return self.__batch_insert_queue
finally:
self.__lock.release()
def __insert_data(self):
print("__insert_data, the queue is {}".format(len(self.get_batch_insert_queue())))
q = self.get_batch_insert_queue()
#push everything now if we have been told to finish
if self.__finish:
print("__finish flag has been set")
self.__connection.execute(self.__table.insert().values(q))
self.finished = True
return
#if there is nothing to do then just sleep
if len(q) == 0:
print("The queue is empty, sleeping")
self.__timer = Timer(self.__wait_interval, self.__insert_data)
self.__timer.start()
self.__timer.join()
values_to_insert = []
while len(q) > 0 and len(values_to_insert) < self.__batch_size:
values_to_insert.append(q.pop)
print("Inserting {} values".format(len(values_to_insert)))
self.__connection.execute(self.__table.insert().values(values_to_insert))
#don't sleep if the queue has more work to do
if len(q) >= self.__batch_size:
print("Not sleeping, there is more work to be done, {} items".format(len(q)))
self.__insert_data()
else:
print("Sleeping")
self.__timer = Timer(self.__wait_interval, self.__insert_data).start()
self.__timer.start()
self.__timer.join()
def finish(self):
print("Setting finish to true")
self.__finish = True
#test data
if __name__ == "__main__":
#create the db and get metadata
conn = get_connection()
query = "DROP TABLE IF EXISTS tmp_test"
try:
conn.execute(query)
except:
pass
query = """CREATE TABLE tmp_test (
value bigint DEFAULT NULL
) ENGINE=InnoDB;"""
conn.execute(query)
metadata = MetaData()
metadata.reflect(bind=conn)
tbl = metadata.tables["tmp_test"]
c = ConnectionWorker(conn, tbl)
q = c.get_batch_insert_queue()
for item in random.sample(xrange(1, 1000000000), 100000):
q.append(item)
print("The queue is {}".format(len(q)))
print("The batch queue is {}".format(len(c.get_batch_insert_queue())))
import time
time.sleep(10)
c.finish()
while not c.finished:
time.sleep(1)
运行日志:
__insert_data, the queue is 0
The queue is empty, sleeping
The queue is 100000
The batch queue is 100000
__insert_data, the queue is 0
The queue is empty, sleeping
__insert_data, the queue is 0
The queue is empty, sleeping
Setting finish to true
__insert_data, the queue is 0
The queue is empty, sleeping
第一个队列为空是有道理的(对象初始化),但接下来的两个看起来像他们应该在他们的项目。我还不清楚为什么当完成对象设置为True时,工作人员通过self._ 完成检查(我认为它应该打印“ _finish标志已设置”)。
评论欢迎(以及指向默认情况下处理所有这些的库)。
不了'C = ConnectionWorker(康涅狄格州,TBL)'开始一个新的进程和'Q = c.get_batch_insert_queue()'获得主线程使用的列表代理? – Christopher
没有冒犯的意图,但是你的代码对于我来说太复杂了,不想花时间来详细解释它。相反,我给了你更简单的代码,实际工作;-)你不需要接受它。如果你想自己解开它,太棒了!但是,请接受我的建议,并将您的代码**减少到最低要求。你在这里做了很多可怕的事情(比如在主程序中创建一个'Manager()'作为编译类定义的副作用),你的代码甚至不能运行在Windows上。 –