你已经问过你的问题中两个不同的东西。一个是如何将CSV从COPY FROM
转换为COPY TO
;另一个是如何将SELECT
查询中的行管道输入INSERT
。从SELECT
查询到INSERT
管道排是一种骗人的,因为当你可以从SELECT
查询流行,你不能流行到INSERT
,所以你必须要执行多个INSERT
小号分批。由于INSERT
s的原因,此方法的开销很高,但由于往返于CSV而导致数据丢失的问题较少。我将重点讨论为什么将CSV从COPY FROM
转换为COPY TO
很棘手,以及如何实现它。
psycopg2
让你通过(同步)copy_expert
函数做COPY
命令。它要求您传递可读文件对象COPY FROM
和可写文件对象COPY TO
。为了完成您所描述的内容,需要两个独立的线程来运行这两个命令中的每一个,一个带有write()
方法的文件对象,该方法在COPY FROM
命令无法跟上时会阻止,以及一个带有read()
方法的文件对象, COPY TO
命令跟不上。这是一个典型的生产者 - 消费者问题,如果要正确的话可能会很棘手。
这是我写得很快的一个(Python 3)。它可能充满了错误。让我知道如果你发现一个僵局(编辑欢迎)。
from threading import Lock, Condition, Thread
class Output(object):
def __init__(self, pipe):
self.pipe = pipe
def read(self, count):
with self.pipe.lock:
# wait until pipe is still closed or buffer is not empty
while not self.pipe.closed and len(self.pipe.buffer) == 0:
self.pipe.empty_cond.wait()
if len(self.pipe.buffer) == 0:
return ""
count = max(count, len(self.pipe.buffer))
res, self.pipe.buffer = \
self.pipe.buffer[:count], self.pipe.buffer[count:]
self.pipe.full_cond.notify()
return res
def close(self):
with self.pipe.lock:
self.pipe.closed = True
self.pipe.full_cond.notify()
class Input(object):
def __init__(self, pipe):
self.pipe = pipe
def write(self, s):
with self.pipe.lock:
# wait until pipe is closed or buffer is not full
while not self.pipe.closed \
and len(self.pipe.buffer) > self.pipe.bufsize:
self.pipe.full_cond.wait()
if self.pipe.closed:
raise Exception("pipe closed")
self.pipe.buffer += s
self.pipe.empty_cond.notify()
def close(self):
with self.pipe.lock:
self.pipe.closed = True
self.pipe.empty_cond.notify()
class FilePipe(object):
def __init__(self, bufsize=4096):
self.buffer = b""
self.bufsize = 4096
self.input = Input(self)
self.output = Output(self)
self.lock = Lock()
self.full_cond = Condition(self.lock)
self.empty_cond = Condition(self.lock)
self.closed = False
用例:
def read_thread(conn, f):
conn.cursor().copy_expert("COPY foo TO STDIN;", f)
f.close()
conn.close()
engine.execute(
"CREATE TABLE foo(id int);"
"CREATE TABLE bar(id int);"
"INSERT INTO foo (SELECT generate_series(1, 100000) AS id);"
"COMMIT;")
input_conn = engine.raw_connection()
output_conn = engine.raw_connection()
pipe = FilePipe()
t = Thread(target=read_thread, args=(input_conn, pipe.input))
t.start()
output_cur = output_conn.cursor()
output_cur.copy_expert("COPY bar FROM STDIN;", pipe.output)
output_conn.commit()
output_conn.close()
t.join()
print(list(engine.execute("SELECT count(*) FROM bar;"))) # 100000
有很多记录需要迁移? –
最终在我的ETL例程中会有几个这样的任务,其中一些可能有很多记录,而其他的可能只有几个。所以我正在寻找一个即使在大规模下也能正常工作的强大解决方案。 – Joe