2017-08-06 57 views
1

我试图用多函数运行我的代码,但蒙戈回头率管理Python的多处理的MongoDB

“MongoClient叉之前打开。与 连接=假创建MongoClient,或分叉后创建客户。 “

我真的不明白我可以如何适应我的代码。 基本上结构是:

db = MongoClient().database 
db.authenticate('user', 'password', mechanism='SCRAM-SHA-1') 
collectionW = db['words'] 
collectionT = db['sinMemo'] 
collectionL = db['sinLogic'] 


def findW(word): 
    rows = collectionw.find({"word": word}) 
    ind = 0 
    for row in rows: 
     ind += 1 
     id = row["_id"] 

    if ind == 0: 
     a = ind 
    else: 
     a = id 
    return a 



def trainAI(stri): 
... 
     if findW(word) == 0: 

       _id = db['words'].insert(
        {"_id": getNextSequence(db.counters, "nodeid"), "word": word}) 
       story = _id 
      else: 
       story = findW(word) 
... 


def train(index): 
    # searching progress 
    progFile = "./train/progress{0}.txt".format(index) 
    trainFile = "./train/small_file_{0}".format(index) 
    if os.path.exists(progFile): 
     f = open(progFile, "r") 
     ind = f.read().strip() 
     if ind != "": 

      pprint(ind) 
      i = int(ind) 
     else: 
      pprint("No progress saved or progress lost!") 
      i = 0 
     f.close() 

    else: 
     i = 0 
    #get the number of line of the file  
    rangeC = rawbigcount(trainFile) 

    #fix unicode 
    non_bmp_map = dict.fromkeys(range(0x10000, sys.maxunicode + 1), 0xfffd) 
    files = io.open(trainFile, "r", encoding="utf8") 
    str1 = "" 
    str2 = "" 

    filex = open(progFile, "w") 

    with progressbar.ProgressBar(max_value=rangeC) as bar: 
     for line in files: 
      line = line.replace("\n", "") 
      if i % 2 == 0: 
       str1 = line.translate(non_bmp_map) 
      else: 
       str2 = line.translate(non_bmp_map) 

      bar.update(i) 
      trainAI(str1 + " " + str2) 
      filex.seek(0) 
      filex.truncate() 
      filex.write(str(i)) 
      i += 1 

#multiprocessing function 

maxProcess = 3 

def f(l, i): 
    l.acquire() 
    train(i + 1) 
    l.release() 

if __name__ == '__main__': 
    lock = Lock() 

    for num in range(maxProcess): 
     pprint("start " + str(num)) 
     Process(target=f, args=(lock, num)).start() 

此代码,用于读取在4个不同过程4不同的文件,并在同一时间在数据库中插入数据制成。 我只复制了一部分代码,以使您了解它的结构。

我试图添加连接=假这段代码,但没有...

db = MongoClient(connect=False).database 
    db.authenticate('user', 'password', mechanism='SCRAM-SHA-1') 
    collectionW = db['words'] 
    collectionT = db['sinMemo'] 
    collectionL = db['sinLogic'] 

然后我试着将它移动的F功能(右车前(),但我得到什么在该程序没有找到collectionW,collectionT和collectionL。

我不是蟒蛇很内行或MongoDB的,所以我希望这不是一个愚蠢的问题。

的代码的Ubuntu 16.04下运行.2用python 2.7.12

+0

这不完全是一个新话题,因为数据库连接的“线程安全性”的一般概念已经存在很长时间了。可能为什么错误信息如此描述和准确。你被告知只在你fork后建立一个连接**,所以连接只能在工作进程中**。如果你想要某种类型的** IPC **,那么你可以使用别的方法来做到这一点。但是在进程/线程之间复制的数据库句柄是“正确的”,并且已经持续了很长时间。 –

回答

1

db.authenticate将不得不连接到mongo服务器,它会尝试建立连接。所以,即使正在使用connect = False,db.authenticate也需要打开一个连接。 为什么不在fork之后创建mongo客户端实例?这看起来像最简单的解决方案。

+0

但我到底该怎么做?这是我不明白的地方 – Nuked

+0

您可以在目标函数f中创建mongo客户端。或者将用于创建mongo客户端的代码放在一个f将调用的函数中。 –

+0

我已经做到了,但我在写的问题,我不知道如何处理那些需要可变collectionW,collectionT和collectionL功能。 – Nuked

0

由于db.authenticate必须打开MongoClient并连接到服务器,它会创建在分叉子流程中不起作用的连接。因此,错误消息。试试这个:

db = MongoClient('mongodb://user:[email protected]', connect=False).database 

另外,删除锁l。在一个子进程中获取锁定对其他子进程没有影响。

+0

他已经创建了主进程的锁。所以,它会同步子进程。但同步子进程并不像正确的做法在这里做,因为它的工作作为一种大锁,并击败使用子进程的目的。 –

+0

在一个子获取锁对其它子过程没有影响。 =) –

+0

实际上多处理锁(未通过螺纹露出的锁)由信号量备份(https://github.com/python/cpython/tree/master/Modules/_multiprocessing),因此它们可被用于不同的过程同步。因此,获取多处理锁定会影响其他进程,如果他们也尝试获取相同的锁定。这是一个演示同样的主题https://gist.github.com/nipuntalukdar/5562d5efd6970495e7632399057d5e99 –

0

这里是我做到了我的问题:

import pathos.pools as pp 
import time 
import db_access 

class MultiprocessingTest(object): 

    def __init__(self): 
     pass 

    def test_mp(self): 
     data = [[form,'form_number','client_id'] for form in range(5000)] 

     pool = pp.ProcessPool(4) 
     pool.map(db_access.insertData, data) 

if __name__ == '__main__': 

    time_i = time.time() 

    mp = MultiprocessingTest() 
    mp.test_mp() 

    time_f = time.time() 

    print 'Time Taken: ', time_f - time_i 

这里是db_access.py:

from pymongo import MongoClient 

def insertData(form): 
    client = MongoClient() 
    db = client['TEST_001'] 
    db.initialization.insert({ 
     "form": form[0], 
     "form_number": form[1], 
     "client_id": form[2] 
    }) 

这是发生在你的代码,因为你正在发起MongoCLient()一劳永逸子流程。 MongoClient不是安全的。因此,在每个功能内启动工作,并让我知道是否有其他解决方案。