2014-10-01 21 views
1

忍受我 - 这是我的第一个多线程/处理python项目。让多个进程/线程始终运行,访问sqlite3数据库,直到满足条件

我正在其上应该运行some.exe其中每个需要的ID作为参数的Ñ实例一个Python脚本。这些ID是从本地sqlite数据库中提取的,并且如果成功处理则删除。任何ID都不能由一次处理多个some.exe(因此WORK 布尔型)。

我知道,下面的pool.map需要ITER某种但是这是我的,包括任何形式的多功能使用线程/处理的第一个项目,我不知道该怎么处理它。

该脚本应该运行,直到没有ID剩下 - 持续运行n实例some.exesome.exe将可能需要1-6分钟为每个ID。

如果相关,这将运行在Windows机器上。

的代码部分只是伪代码和所有非必要的部分已经被省略:

#!/usr/bin/python 

import time, sqlite3 
from datetime import datetime 
from multiprocessing.pool import ThreadPool as Pool 

def run_worker(lite_cur): 

    lite_cur.execute("SELECET ID FROM IDS WHERE WORK != 1") 
    found_id = lite_cur.fetchone() 

    lite_cur.execute("UPDATE IDS SET WORK = 1 WHERE ID = \'"+found_id+"\'") 

    #starting a subprocess in a pool is probably not what one should do.. help? 
    process = subprocess.Popen(["some.exe", found_id]) 
    process.wait() 

    #how would one check if some.exe chrashed or completed successfully? 
    if process = "some.exe completed without errors!": 
     lite_cur.execute("DELETE FROM IDS WHERE ID = \'"+found_id+"\'") 
    else: 
     #do this if some.exe crashed or reported errors. 
     lite_cur.execute("UPDATE IDS SET WORK = 0 WHERE ID = \'"+found_id+"\'") 


def run_checker(lite_cur, ids_left): 
    time.sleep(600) 
    lite_cur.execute("SELECT * FROM IDS") 
    #may exceed 1 million, is there a better/faster way? 
    if len(lite_cur.fetchall()) == 0: 
     ids_left = False 

def main(): 

    #lite_db_name will be implemented as an argument. 
    lite_db_name = "some.db" 
    lite_con = sqlite3.connect(lite_db_name) 
    lite_cur = lite_con.cursor() 

    #IDs should be self-explanatory and WORK is used as a boolean to define if a worker is already working on this ID 
    lite_cur.execute("CREATE TABLE IF NOT EXISTS IDS(ID TEXT, WORK INTEGER DEFAULT 0)") 

    #max_worker will be implemented as an argument 
    max_worker = 4 
    worker_pool = Pool(max_worker) 
    #a pool with the limit of 1 is probably dumb as duck.. 
    checker_pool = Pool(1) 

    lite_cur.execute("SELECT * FROM IDS") 
    if len(lite_cur.fetchall()) > 0: 
     ids_left = True 
    else: 
     ids_left = False 

    while ids_left: 
     worker_pool.map(run_worker(lite_cur)) 
     checker_pool.map(run_checker(lite_cur, ids_left)) 

    end_time = datetime.now() 
    print ("Congratulation - All IDs processed.") 
    print ("It took: {}".format(end_time - start_time)) 

if __name__ == "__main__": 
    main() 

我会非常感谢您的任何建议和意见。

编辑:抱歉没有发布明确的问题。这个问题的意图是为任何进一步的发展获得一些领先的建议。

+1

您能否澄清一下具体的问题是什么? – 2014-10-01 08:57:12

+1

对具体问题没有明确的问题。我想你会期待某种“如何”。如果您的介绍几乎不匹配来源,那就更困难了。甚至没有一个句子以问号结尾。我想这是因为你自己不确定你是否有问题。没有绝对的“正确”方式。以一种您仍然可以在一年内了解您的源/解决方案的方式解决您的问题。 – 2014-10-01 08:57:33

+0

@DonQuestion你是对的。我没有要求具体的答案,也没有要求。我要求你引导我去*正确的方向,或者给我一些提示,告诉我如何完成其​​余的伪代码部分。我担心*问题不会很受欢迎,但无论如何我都会问,因为我经常从评论或回答中获得帮助,这给了我一个方向提示。 – 2014-10-01 09:13:33

回答

1

示范代码(功能,测试):

import sqlite3 as sql 
from this import s as nonsense 
import subprocess 
import shlex 
import time 

max_parallel_processes = 10 

def getdb(tableid = "test"): 
    dbid = ":memory:"  

    stmt_create = "CREATE TABLE %s (id int, comment text)" % tableid 
    stmt_insert = "INSERT INTO %s VALUES (?, ?)" % tableid 

    values = enumerate(nonsense.split()) 

    db = sql.connect(dbid) 
    db.execute(stmt_create) 
    db.executemany(stmt_insert, values) 
    return db 


def get_ids(db, tableid = "test"): 
    stmt_select_id = "SELECT id from %s " % tableid 
    crs = db.execute(stmt_select_id) 
    result = crs.fetchall() 
    for i in result: 
     yield i 


def main(): 
    from random import randint 

    db = getdb() 
    process_lst = {}  
    sleep_between_polls_in_seconds = 0.1 

    for rowid in get_ids(db): 
     if len(process_lst) < max_parallel_processes: 
      cmd_str = "sleep %s" % randint(1, 3) 
      cmd = shlex.split(cmd_str) 

      print "adding : %s (%s)" % (rowid, cmd_str) 

      proc = subprocess.Popen(cmd)  
      process_lst[proc] = rowid 
      proc.poll() 
     else: 
      print "max processes (%s) reached" % max_parallel_processes 

      for proc in process_lst.keys(): 
       finished = proc.poll() is not None   
       if finished: 
        print "%s finished" % process_lst[proc] 
        del process_lst[proc] 

       time.sleep(sleep_between_polls_in_seconds) 

    print "All processes processed: %s " %(len (process_lst) == 0) 



if __name__ == "__main__": 
    main() 

在我的例子,我没有被调用的子过程,你的代码似乎必须做的没有输出的测试(错误,标准输出),但这是通过Popen构造函数很容易实现。 此外,重定向stdout/stderr可能允许用select -construct(至少在* nix-environments之下)替代暂停轮询循环的time.sleep

这样你就可以通过避免可怕的线程来实现并行化。请记住,subprocess.Popenthreading.Thread都会产生他们自己的过程。这是额外将一个子进程进程封装到线程进程中的开销,没有获得的功能。你仍然必须建立一个队列式的结构。

希望有所帮助。需要有两个原因 proc.poll()

编辑

  1. 以启动外部进程
  2. 让它在后台运行(非阻塞)

添加stdout=subprocess.PIPE到Popen允许在我的例子中读取标准输出文件Popen().stdout

with proc.stdout as f: 
    program_output = f.read() 
+0

非常感谢你 - 解释和帮助了很多,并且让我非常喜欢*微调的*解决方案! – 2014-10-02 07:58:01

+1

我的解决方案中隐藏着一点警告,我希望你能意识到。我只是测试“process-list/queue”是否完全满了,但最后还要检查它是否完全空,否则最后的10个进程可能永远不会被检查! – 2014-10-02 08:14:20

+0

为什么我需要'proc.poll()'in'if len(process_lst) 2014-10-02 09:10:57

相关问题