2015-10-13 79 views
3

我是新来的Python中的多线程,我目前正在编写附加到csv文件的脚本。如果我有多个线程提交到一个concurrent.futures.ThreadPoolExecutor,将行添加到csv文件。如果追加是这些线程唯一的文件相关操作,我该怎么做才能保证线程安全?多个线程在Python中写入相同的CSV

简化我的代码版本:

with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: 
    for count,ad_id in enumerate(advertisers): 

     downloadFutures.append(executor.submit(downloadThread, arguments.....)) 
     time.sleep(random.randint(1,3)) 

而我的线程类的存在:

def downloadThread(arguments......): 

       #Some code..... 

       writer.writerow(re.split(',', line.decode())) 

我应该设立一个单独的单线程执行来处理文字还是woth担心如果我只是追加?

编辑:我要阐述的是,当写操作发生时可以当文件被下一个追加到分钟之间有很大不同,我只是担心,没有发生这种情况下测试我的剧本的时候,我宁愿被覆盖为了那个原因。

+0

你也许可以做一个线程'csvwriter'使用的一个技巧在[这个答案](http://stackoverflow.com/a/13618333/355230)中提到的相关问题。 – martineau

回答

8

我不确定csvwriter是否线程安全。该documentation没有明确规定,因此是安全的,如果多个线程使用相同的对象,您应该保护与使用量的threading.Lock

# create the lock 
import threading 
csv_writer_lock = threading.Lock() 

def downloadThread(arguments......): 
    # pass csv_writer_lock somehow 
    # Note: use csv_writer_lock on *any* access 
    # Some code..... 
    with csv_writer_lock: 
     writer.writerow(re.split(',', line.decode())) 

话虽这么说,它可能确实是为downloadThread更优雅将写入任务提交给执行程序,而不是显式使用像这样的锁。

+0

我会使用一个锁来访问共享'writer'(或者为它自动创建一个包装类/对象)。 – martineau

+0

@martineau:好点!我已经更新了我的答案以反映这一点。 – Claudiu

+0

可能是我对我的问题最直接的回答,非常感谢。 – GreenGodot

1

这里是一些代码,它也处理头痛引起的unicode的问题:

def ensure_bytes(s): 
    return s.encode('utf-8') if isinstance(s, unicode) else s 

class ThreadSafeWriter(object): 
''' 
>>> from StringIO import StringIO 
>>> f = StringIO() 
>>> wtr = ThreadSafeWriter(f) 
>>> wtr.writerow(['a', 'b']) 
>>> f.getvalue() == "a,b\\r\\n" 
True 
''' 

def __init__(self, *args, **kwargs): 
    self._writer = csv.writer(*args, **kwargs) 
    self._lock = threading.Lock() 

def _encode(self, row): 
    return [ensure_bytes(cell) for cell in row] 

def writerow(self, row): 
    row = self._encode(row) 
    with self._lock: 
     return self._writer.writerow(row) 

def writerows(self, rows): 
    rows = (self._encode(row) for row in rows) 
    with self._lock: 
     return self._writer.writerows(rows) 

# example: 
with open('some.csv', 'w') as f: 
    writer = ThreadSafeWriter(f) 
    writer.write([u'中文', 'bar']) 

更详细的解决方案是here

相关问题