2017-05-30 200 views
0

在ETL过程中,我想定期查询数据库“A”(例如,所有行的时间戳大于程序的最后一次运行时间),并将该数据移动到数据库“B”中以供进一步处理。两者都是PostgreSQL数据库。我想在Python脚本中进行数据传输,使用SQLAlchemy连接到两个数据库。什么是最不混乱,最脆弱的方式呢?如何使用SQLAlchemy将数据直接从一个postgresql数据库传输到另一个数据库?

我知道Postgres的COPY TOCOPY FROM命令允许通过中间文件(see here)将表行和查询结果从一个数据库服务器传输到另一个数据库服务器。在Unix命令行中,您甚至可以将数据库A的输出作为输入传送到数据库B,而不需要潜在的大型中间文件(see excellent instructions here)。我想知道的是如何在使用两个SQLAlchemy连接的Python脚本中执行最后一个技巧,而不是使用subprocess来运行shell命令。

import sqlalchemy 
dbA = sqlalchemy.create_engine(connection_string_A) 
dbB = sqlalchemy.create_engine(connection_string_B) 

# how do I do this part? 
dbA.execute('SELECT (column) FROM widgets...') # somehow pipe output into... 
dbB.execute('INSERT INTO widgets (column) ...') # without holding lots of data in memory or on disk 

为了记录我没有使用SQLAlchemy的任何ORM功能,只是裸体SQL查询。

+0

有很多记录需要迁移? –

+0

最终在我的ETL例程中会有几个这样的任务,其中一些可能有很多记录,而其他的可能只有几个。所以我正在寻找一个即使在大规模下也能正常工作的强大解决方案。 – Joe

回答

1

你已经问过你的问题中两个不同的东西。一个是如何将CSV从COPY FROM转换为COPY TO;另一个是如何将SELECT查询中的行管道输入INSERT。从SELECT查询到INSERT

管道排是一种骗人的,因为当你可以从SELECT查询流行,你不能流行到INSERT,所以你必须要执行多个INSERT小号分批。由于INSERT s的原因,此方法的开销很高,但由于往返于CSV而导致数据丢失的问题较少。我将重点讨论为什么将CSV从COPY FROM转换为COPY TO很棘手,以及如何实现它。

psycopg2让你通过(同步)copy_expert函数做COPY命令。它要求您传递可读文件对象COPY FROM和可写文件对象COPY TO。为了完成您所描述的内容,需要两个独立的线程来运行这两个命令中的每一个,一个带有write()方法的文件对象,该方法在COPY FROM命令无法跟上时会阻止,以及一个带有read()方法的文件对象, COPY TO命令跟不上。这是一个典型的生产者 - 消费者问题,如果要正确的话可能会很棘手。

这是我写得很快的一个(Python 3)。它可能充满了错误。让我知道如果你发现一个僵局(编辑欢迎)。

from threading import Lock, Condition, Thread 


class Output(object): 
    def __init__(self, pipe): 
     self.pipe = pipe 

    def read(self, count): 
     with self.pipe.lock: 
      # wait until pipe is still closed or buffer is not empty 
      while not self.pipe.closed and len(self.pipe.buffer) == 0: 
       self.pipe.empty_cond.wait() 

      if len(self.pipe.buffer) == 0: 
       return "" 

      count = max(count, len(self.pipe.buffer)) 
      res, self.pipe.buffer = \ 
       self.pipe.buffer[:count], self.pipe.buffer[count:] 
      self.pipe.full_cond.notify() 
     return res 

    def close(self): 
     with self.pipe.lock: 
      self.pipe.closed = True 
      self.pipe.full_cond.notify() 


class Input(object): 
    def __init__(self, pipe): 
     self.pipe = pipe 

    def write(self, s): 
     with self.pipe.lock: 
      # wait until pipe is closed or buffer is not full 
      while not self.pipe.closed \ 
        and len(self.pipe.buffer) > self.pipe.bufsize: 
       self.pipe.full_cond.wait() 

      if self.pipe.closed: 
       raise Exception("pipe closed") 

      self.pipe.buffer += s 
      self.pipe.empty_cond.notify() 

    def close(self): 
     with self.pipe.lock: 
      self.pipe.closed = True 
      self.pipe.empty_cond.notify() 


class FilePipe(object): 
    def __init__(self, bufsize=4096): 
     self.buffer = b"" 
     self.bufsize = 4096 
     self.input = Input(self) 
     self.output = Output(self) 
     self.lock = Lock() 
     self.full_cond = Condition(self.lock) 
     self.empty_cond = Condition(self.lock) 
     self.closed = False 

用例:

def read_thread(conn, f): 
    conn.cursor().copy_expert("COPY foo TO STDIN;", f) 
    f.close() 
    conn.close() 

engine.execute(
    "CREATE TABLE foo(id int);" 
    "CREATE TABLE bar(id int);" 
    "INSERT INTO foo (SELECT generate_series(1, 100000) AS id);" 
    "COMMIT;") 
input_conn = engine.raw_connection() 
output_conn = engine.raw_connection() 
pipe = FilePipe() 

t = Thread(target=read_thread, args=(input_conn, pipe.input)) 
t.start() 
output_cur = output_conn.cursor() 
output_cur.copy_expert("COPY bar FROM STDIN;", pipe.output) 
output_conn.commit() 
output_conn.close() 
t.join() 

print(list(engine.execute("SELECT count(*) FROM bar;"))) # 100000 
+0

说我提出一个问题会更准确,但有两条建议或线索可以看出答案。你提供了一个非常吸引人的解决方案,但是我认为如果这很复杂的话,最好使用Unix shell命令和管道。 – Joe

0

如果数据不是非常大(可以在单台主机的主存储器中持有),你可以尝试基于熊猫/ python3我的开源ETL工具/ sqlalchemy,bailaohe/parade,我在http://README.md中提供了一个教程。您可以利用熊猫对数据进行转换并直接返回结果数据框。通过一些配置,熊猫数据帧可以转储到不同的目标连接。

对于您的问题,您可以用游行如下生成一个简单的SQL类型的任务:

# -*- coding:utf-8 -*- 
from parade.core.task import SqlETLTask 
from parade.type import stdtypes 


class CopyPostgres(SqlETLTask): 

    @property 
    def target_conn(self): 
     """ 
     the target connection to write the result 
     :return: 
     """ 
     return 'target_postgres' 

    @property 
    def source_conn(self): 
     """ 
     the source connection to write the result 
     :return: 
     """ 
     return 'source_postgres' 

    @property 
    def etl_sql(self): 
     """ 
     the single sql statement to process etl 
     :return: 
     """ 
     return """SELECT (column) FROM widgets""" 

你甚至可以直接游行撰写DAG的工作流程与多个任务,并安排了工作流程。希望这会有所帮助。

相关问题