2013-07-02 84 views
2

我正在浏览一个数据集并尝试从数据库(postgres)计算特征。Python代码随机冻结

问题在于,有时候程序会停留在某处(通过数据库日志确认,我启用了该程序,长时间没有新的查询发生),并且当按下ctrl + c时,程序似乎恢复正常(我还没有确认,如果计算是正确的,因为有很多行)。它不会卡在相同的位置,但它似乎具有随机模式。任何想法我可能做错了什么?

我有2个文件main.py和NAC.py.

main.py:

import NAC 
from dateutil.parser import parse 
from datetime import timedelta 
rows = fc.Read_CSV_to_Dict(input_file) #just a wrapper around csv.Dictreader 
i=0 
start_time = time.time() 
for row in rows : #rows has about 600,000 rows 
    ret1,ret2 = NAC.function(row['key1'], ...) #and other parameters 
    #new keys 
    row['newKey1'],row['newKey2'] = ret1 
    row['newKey3'],row['newKey4'] = ret2 #unpacking 
    i=i+1 
    if(i%10000==0): #progress monitor 
     print i 
print (time.time()-start_time)/60 
NAC.db_close() 

NAC.py:

from dateutil.parser import parse 
from datetime import timedelta 
import psycopg2 
import psycopg2.extras 

def function(param1, ...): 
    """  
    Returns: 
     2 element list, each a list by itself 
    """ 
    nsclist = [0]*param2_count 
    naclist = [0]*param2_count 
    for i in range(param2_count): 
     stime = (begintime + timedelta(seconds = 60*intervalPeriod * i)) 
     etime = (begintime + timedelta(seconds = 60*intervalPeriod * (i+1))) 
     table1_query = "select sum(count)from table1 where column1= '{0}' and column2>'{1}'::TIMESTAMP WITH TIME ZONE and column2<='{2}'::TIMESTAMP WITH TIME ZONE" 
     cur.execute(sched_query.format(param1,stime,etime)) 
     nsclist[i] = cur.fetchone()[0] 
     if(nsclist[i] == []): 
      nsclist[i] = 0 
     table2_query = "select sum(count)from table2 where column1 = '{0}' and column2 >'{1}'::TIMESTAMP WITH TIME ZONE and column2 <='{2}'::TIMESTAMP WITH TIME ZONE" 
     cur.execute(table2_query .format(param1,stime,etime)) 
     naclist[i] = cur.fetchone()[0] 
     if(naclist[i] == []): 
      naclist[i] = 0 
    return nsclist, naclist 

def db_close(): 
    cur.close() 
    conn.close() 

intervalPeriod = 5 #minutes 
conn = psycopg2.connect(cs.local_connstr) 
cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor) 

DB日志时间戳:

2013-07-01 18:26:01 PDT LOG: statement: select sum(count)from ... 
2013-07-01 18:26:01 PDT LOG: statement: select sum(count)from ... 
2013-07-01 18:26:01 PDT LOG: statement: select sum(count)from ... 
2013-07-01 18:26:01 PDT LOG: statement: select sum(count)from ... 
2013-07-01 18:26:01 PDT LOG: statement: select sum(count)from ... 
2013-07-01 18:26:01 PDT LOG: statement: select sum(count)from ... 
2013-07-01 18:26:01 PDT LOG: statement: select sum(count)from ... 
2013-07-01 18:26:01 PDT LOG: statement: select sum(count)from ... 
2013-07-01 18:29:30 ctl+c pressed (manually added... not in the log) 
2013-07-01 18:29:30 PDT LOG: statement: select sum(count)from ... 
2013-07-01 18:29:30 PDT LOG: statement: select sum(count)from ... 
2013-07-01 18:29:30 PDT LOG: statement: select sum(count)from ... 
2013-07-01 18:29:30 PDT LOG: statement: select sum(count)from ... 
2013-07-01 18:29:30 PDT LOG: statement: select sum(count)from ... 
2013-07-01 18:29:30 PDT LOG: statement: select sum(count)from ... 

回答

0

原来,有一个与光标的问题。我只需在每次函数调用时打开并关闭游标。我不知道为什么。

from dateutil.parser import parse 
from datetime import timedelta 
import psycopg2 
import psycopg2.extras 

def function(param1, ...): 
    """  
    Returns: 
     2 element list, each a list by itself 
    """ 
    cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor) 
    nsclist = [0]*param2_count 
    naclist = [0]*param2_count 
    for i in range(param2_count): 
     table1_query = "select sum(count)from table1 where column1= '{0}' and column2>'{1}'::TIMESTAMP WITH TIME ZONE and column2<='{2}'::TIMESTAMP WITH TIME ZONE" 
     cur.execute(sched_query.format(param1,stime,etime)) 
     nsclist[i] = cur.fetchone()[0] 
     if(nsclist[i] == []): 
      nsclist[i] = 0 
     table2_query = "select sum(count)from table2 where column1 = '{0}' and column2 >'{1}'::TIMESTAMP WITH TIME ZONE and column2 <='{2}'::TIMESTAMP WITH TIME ZONE" 
     cur.execute(table2_query .format(param1,stime,etime)) 
     naclist[i] = cur.fetchone()[0] 
     if(naclist[i] == []): 
      naclist[i] = 0 
    cur.close() 
    return nsclist, naclist 

def db_close(): 
    conn.close() 

intervalPeriod = 5 #minutes 
conn = psycopg2.connect(cs.local_connstr)