2011-09-20 27 views
2

我需要更新空间数据库中的每个记录,其中有一组覆盖多边形数据集的点的数据集。对于每个点要素,我想分配一个键以将其与它所在的面要素关联起来。因此,如果我的观点'纽约市'位于美国多边形和美国多边形'GID = 1'内,我将为我的观点纽约市分配'gid_fkey = 1'。使用Python进行并行数据库更新(PostGIS/PostgreSQL)

为此,我创建了以下查询。

procQuery = 'UPDATE city SET gid_fkey = gid FROM country WHERE ST_within((SELECT the_geom FROM city WHERE wp_id = %s), country.the_geom) AND city_id = %s' % (cityID, cityID) 

目前我正在从另一个查询中获取cityID信息,该查询仅选择gid_fkey为NULL的所有cityID。本质上,我只需要遍历这些并运行前面显示的查询。由于查询只依赖另一个表中的静态信息,因此所有这些过程都可以一次运行。我已经实现以下穿入过程,但我似乎无法使移植到多处理器

import psycopg2, pprint, threading, time, Queue 

queue = Queue.Queue() 
pyConn = psycopg2.connect("dbname='geobase_1' host='localhost'") 
pyConn.set_isolation_level(0) 
pyCursor1 = pyConn.cursor() 

getGID = 'SELECT cityID FROM city' 
pyCursor1.execute(getGID) 
gidList = pyCursor1.fetchall() 

class threadClass(threading.Thread): 

def __init__(self, queue): 
     threading.Thread.__init__(self) 
     self.queue = queue 

def run(self): 

     while True: 
      gid = self.queue.get() 

      procQuery = 'UPDATE city SET gid_fkey = gid FROM country WHERE ST_within((SELECT the_geom FROM city WHERE wp_id = %s), country.the_geom) AND city_id = %s' % (cityID, cityID) 

      pyCursor2 = pyConn.cursor()       
      pyCursor2.execute(procQuery) 

      print gid[0]      
      print 'Done' 

def main(): 

    for i in range(4): 
     t = threadClass(queue) 
     t.setDaemon(True) 
     t.start() 

     for gid in gidList: 
      queue.put(gid) 

    queue.join() 

main() 

我甚至不知道,如果多线程是最优的,但它绝对不是一个通过一个会更快。

我将使用的机器有四个内核(四核)和一个最小的Linux操作系统,没有GUI,PostgreSQL,PostGIS和Python,如果这有所作为。

我需要更改以启用这个痛苦简单的多处理任务吗?

回答

4

好吧,这是我自己的帖子的答案。干得好我= D

在我的系统从单核心线程转向四核多处理时,速度提高了约150%。

import multiprocessing, time, psycopg2 

class Consumer(multiprocessing.Process): 

def __init__(self, task_queue, result_queue): 
    multiprocessing.Process.__init__(self) 
    self.task_queue = task_queue 
    self.result_queue = result_queue 

def run(self): 
    proc_name = self.name 
    while True: 
     next_task = self.task_queue.get() 
     if next_task is None: 
      print 'Tasks Complete' 
      self.task_queue.task_done() 
      break    
     answer = next_task() 
     self.task_queue.task_done() 
     self.result_queue.put(answer) 
    return 


class Task(object): 
def __init__(self, a): 
    self.a = a 

def __call__(self):   
    pyConn = psycopg2.connect("dbname='geobase_1' host = 'localhost'") 
    pyConn.set_isolation_level(0) 
    pyCursor1 = pyConn.cursor() 

     procQuery = 'UPDATE city SET gid_fkey = gid FROM country WHERE ST_within((SELECT the_geom FROM city WHERE city_id = %s), country.the_geom) AND city_id = %s' % (self.a, self.a) 

    pyCursor1.execute(procQuery) 
    print 'What is self?' 
    print self.a 

    return self.a 

def __str__(self): 
    return 'ARC' 
def run(self): 
    print 'IN' 

if __name__ == '__main__': 
tasks = multiprocessing.JoinableQueue() 
results = multiprocessing.Queue() 

num_consumers = multiprocessing.cpu_count() * 2 
consumers = [Consumer(tasks, results) for i in xrange(num_consumers)] 
for w in consumers: 
    w.start() 

pyConnX = psycopg2.connect("dbname='geobase_1' host = 'localhost'") 
pyConnX.set_isolation_level(0) 
pyCursorX = pyConnX.cursor() 

pyCursorX.execute('SELECT count(*) FROM cities WHERE gid_fkey IS NULL')  
temp = pyCursorX.fetchall()  
num_job = temp[0] 
num_jobs = num_job[0] 

pyCursorX.execute('SELECT city_id FROM city WHERE gid_fkey IS NULL')  
cityIdListTuple = pyCursorX.fetchall()  

cityIdList = [] 

for x in cityIdListTuple: 
    cityIdList.append(x[0]) 


for i in xrange(num_jobs): 
    tasks.put(Task(cityIdList[i - 1])) 

for i in xrange(num_consumers): 
    tasks.put(None) 

while num_jobs: 
    result = results.get() 
    print result 
    num_jobs -= 1 

现在我有我已经张贴在这里的另一个问题:

Create DB connection and maintain on multiple processes (multiprocessing)

希望我们能摆脱一些开销和速度这个婴儿起来,甚至更多。

+0

嗨@ene,如果这解决了你的问题,这是一个很好的做法,将其标记为回答:) – eberbis

+0

是的,这很奇怪,因为我只是一个客人用户或一些这样的时候,我发布这个问题,我没有把我自己的问题标记为正确的能力。你可以看到缩略图图像没有与我一起更新。建议如何解决,欢迎 –

+0

哦,是的..问题是,你发布你的问题在一个不同的(未注册)用户(http://stackoverflow.com/users/954992/ene),你现在正在使用注册的(http://stackoverflow.com/users/965035/ene)回复。正如你可以看到它们上的ID不同。这可能有所帮助:http://meta.stackexchange.com/questions/74024/registration-with-my-unregistered-account – eberbis

0

在普通的SQL一个可以这样做:

UPDATE city ci 
SET gid_fkey = co.gid 
FROM country co 
WHERE ST_within(ci.the_geom , co.the_geom) 
AND ci.city_id = _some_parameter_ 
     ; 

有可能是一个问题,如果一个城市将融入一个以上的国家(导致多个更新到同一目标行),但是这可能是而不是数据中的情况。

相关问题