2013-01-14 19 views
0

我有一个约200,000个实体的列表,我需要为这些实体中的每一个查询特定的RESTful API,最后以所有以JSON格式保存的200,000个实体txt文件。 这样做的天真的方式是通过200,000个实体的列表并逐个查询,将返回的JSON添加到列表中,并在完成时将所有内容都添加到文本文件中。喜欢的东西:向文本文件写入大量查询

from apiWrapper import api 
from entities import listEntities #list of the 200,000 entities 
a=api() 
fullEntityList=[] 
for entity in listEntities: 
fullEntityList.append(a.getFullEntity(entity)) 

with open("fullEntities.txt","w") as f: 
    simplejson.dump(fullEntityList,f) 

显然,这是不可靠的,200000个查询到API将需要大约10个小时左右,所以我想它得到其写入文件之前,事情会导致错误。 我想正确的方法是把它写成块,但不知道如何实现它。有任何想法吗? 另外,我不能用数据库做到这一点。

回答

2

我建议将它们写入SQLite数据库。这是他们为我自己的小型网络蜘蛛应用程序做的。因为你可以很容易地查询关键字,并检查你已经检索的关键字。这样,您的应用程序可以轻松地继续停止。特别是如果您下周添加1000个新条目。

从一开始就将“恢复”设计到您的应用程序中。如果有一些意外的异常(比如由于网络拥塞而导致超时),则不必从头开始重新启动,而只需要那些尚未成功检索的查询。在200,000个查询中,99.9%的正常运行时间意味着您必须预期200次失败!

对于空间效率和性能,它可能会使用压缩格式,比如在将zson转储到数据库blob之前压缩json。

SQLite是一个不错的选择,除非你的蜘蛛同时在多个主机上运行。对于单个应用程序,sqlite是完美的。

1

最简单的方法是打开文件中'a'(附加)模式,并把它们写一个接一个,因为他们进来。

更好的方法是使用作业队列。这将允许您将a.getFullEntity调用派生成工作者线程并处理结果,但是当它们返回时/如果它们返回时要处理结果,或计划重试失败等。 请参阅Queue

+0

你可以在工作队列上稍微扩展一点吗?使用哪些模块?链接到文档? – leonsas

+1

该链接已经在那里......并且该模块被称为“队列” – wim

0

我也会使用一个单独的文件写入线程,并使用Queue来记录所有实体。当我开始时,我认为这将在5分钟内完成,但后来证明有点困难。 simplejson和我知道的所有其他这样的库不支持部分编写,所以你不能先写一个列表中的元素,然后再添加另一个等。所以,我试图通过编写[,,]与文件分开,然后分别转储每个实体。

没有能够检查(因为我没有你的API),你可以尝试:

import threading 
import Queue 
import simplejson 
from apiWrapper import api 
from entities import listEntities #list of the 200,000 entities 

CHUNK_SIZE = 1000 

class EntityWriter(threading.Thread): 
    lines_written = False 
    _filename = "fullEntities.txt" 

    def __init__(self, queue): 
     super(EntityWriter, self).__init() 
     self._q = queue 
     self.running = False 

    def run(self): 
     self.running = True 
     with open(self._filename,"a") as f: 
      while True: 
       try: 
        entity = self._q.get(block=False) 
        if not EntityWriter.lines_written: 
         EntityWriter.lines_written = True 
         f.write("[") 
         simplejson.dump(entity,f) 
        else: 
         f.write(",\n") 
         simplejson.dump(entity,f) 
       except Queue.Empty: 
        break 
     self.running = False 

    def finish_file(self): 
     with open(self._filename,"a") as f: 
      f.write("]") 


a=api() 
fullEntityQueue=Queue.Queue(2*CHUNK_SIZE) 
n_entities = len(listEntities) 
writer = None 
for i, entity in listEntities: 
    fullEntityQueue.append(a.getFullEntity(entity)) 
    if (i+1) % CHUNK_SIZE == 0 or i == n_entities-1: 
     if writer is None or not writer.running: 
      writer = EntityWriter(fullEntityQueue) 
      writer.start() 
writer.join() 
writer.finish_file() 

这是什么脚本

主循环仍然迭代列表获取实体的全部信息。之后每个实体现在被放入队列中。每个1000个实体(并在列表的末尾)都会启动一个EntityWriter线程,该线程与主线程并行运行。来自Queue的EntityWriter get并将其转储到所需的输出文件。

需要一些额外的逻辑来使JSON成为一个列表,如上所述,我手动编写[,,]。原则上,生成的文件在重新加载时应该被simplejson所理解。