0

我正在寻找一种功能强大且快速的方式来处理Google App Engine中大文件的处理。如何处理GAE上的大文件处理?

它的工作原理如下(简化的工作流程,在年底):

  1. 客户发送一个CSV文件,我们的服务器将通过线治疗,行。
  2. 文件上传完成后,会在NDB数据存储Uploads中添加一个条目,其中包含CSV名称,文件路径(至Google存储)以及一些基本信息。然后,创建一个任务,称为“预处理”。
  3. 预处理任务将在CSV文件的所有行上循环(可能是数百万),并且会为每个行添加一个NDB条目到UploadEntries模型,其中包含CSV ID,行,要提取/如果该行已经开始处理,并且结束处理(“is_treating”,“is_done”)
  4. 一旦预处理任务结束,它就更新客户端的信息“XXX行将被处理“
  5. 致电Uploads.next()。该next方法:
    • 搜索已经以虚假is_treatingis_doneUploadEntries
    • 将在Redis的数据存储添加一个任务找到下一行。 (使用Redis数据存储是因为这里的工作是在不由Google管理的服务器上进行的)
    • 也会在任务中创建新条目Process-healthcheck(此任务在5分钟后运行并检查7)是否已正确执行。如果不是,则认为Redis/Outside服务器发生故障并且执行与7)相同的操作,但没有结果(而是使用“error”))。
    • 然后,它将UploadEntries.is_treating更新为该条目的True。
  6. 外部服务器将处理数据,并通过向服务器上的端点发出POST请求返回结果。
  7. 该端点更新数据存储中的UploadEntries条目(包括“is_treating”和“is_done”),并调用Uploads.next()以启动下一行。
  8. 在Uploads.next中,搜索下一个条目时不会返回任何内容,我认为该文件将被最终处理,并调用任务post-process,该任务将使用处理后的数据重建CSV,并将其返回给客户。

这里有几件事情要记住:

  1. ,做真正的工作是谷歌的AppEngine之外的服务器上,这就是为什么我不得不拿出Redis的。
  2. 目前的做事方式给了我要处理的并行条目数量的灵活性:在5)中,Uploads.next()方法包含一个limit参数,它让我可以并行搜索并行进程。可以是1,5,20,50。
  3. 在这种情况下,我不能直接将pre-processing任务中的所有行直接添加到Redis becase中,下一位客户将不得不等待第一个文件完成处理,这将会花费太长的时间

但该系统具有的各种问题,这就是为什么我转向你的帮助:

  1. 有时候,这个系统是如此之快,数据存储是没有正确时更新调用Uploads.next(),条目返回a (只是entry.is_treating = True还没有被推送到数据库)
  2. Redis或我的服务器(我真的不知道)有时会丢失任务,或者没有进行处理后的POST请求,所以任务永远不会去is_done = True。这就是为什么我必须实施Healcheck系统,以确保无论如何都能正确处理生产线。这具有双重优势:该任务的名称包含csv ID和行。每个文件都是独一无二的。如果数据存储不是最新的并且同一任务运行两次,则健康检查的创建将失败,因为已存在相同的名称,让我知道存在并发问题,所以我忽略该任务,因为它意味着数据存储尚未更新。

我initiall至想过通过线一个独立的进程文件,行,但这并没有能够并行运行多个线的大的缺点。此外,谷歌将任务的执行时间限制在24小时内,而不是默认值,当文件非常大时,它可以运行超过24小时。

的信息,如果有帮助,我使用Python的


,并简化工作流程,这里就是我试图以尽可能最好的方式来实现:

  • 处理一个大文件,运行多个paralllel进程,每行一个。
  • 使用Redis将工作发送到外部服务器。一旦这样做,是对外部服务器通过POST请求到主服务器的
  • 主服务器,然后更新有关该行的信息返回结果,并进入到下一行

我真的很感激,如果有人有一个更好的方式来做到这一点。我真的相信我不是第一个做这种工作的人,我很确定我没有做正确的工作。我相信Stackoverflow是Stack Exchange最好的部分,因为它是一个算法问题,但它也有可能我没有看到更好的网络。如果是这样,我很抱歉那)。

+1

我想你可以使用app engine mapreduce来做这件事,它可以从GCS逐行读取CSV格式的缓冲区,并在多个实例上运行它。它会根据您的设置处理每个请求的N行。但是,GAE实例很昂贵。 –

回答

1

,它真正的工作是谷歌的AppEngine之外

你有没有考虑过使用Google Cloud Dataflow的不是处理大型文件的服务器? 这是一个托管服务,将为您处理文件分割和处理。

基于这里最初的想法是一个大纲过程:

  • 用户上传文件直接到谷歌云存储,使用signed urls或Blob存储API
  • 从AppEngine上的请求启动该启动一个小的计算引擎实例阻止请求(BlockingDataflowPipelineRunner)启动数据流任务。 (恐怕它需要成为一个计算实例,因为沙箱和阻塞I/O问题)。
  • 数据流任务完成后,计算引擎实例将被解除阻塞并将消息发布到pubsub中。
  • pubsub消息调用AppEngine服务上的webhook,该服务将任务状态从“进行中”更改为“完成”,以便用户可以获取其结果。
+0

真棒,Dataflow看起来适合我。你知道它是否适用于输入是XLS/XLSX,它与我开始(所以我不能只读行)? –

+0

您可能需要制作自定义来源。用于开发自定义文件读取器的[便利类](https://cloud.google.com/dataflow/model/custom-io-python#convenience-source-base-classes)。也许这可以与[用于阅读XLS的许多python库之一](http://www.python-excel.org/)结合使用。 –

+0

好的,这是我怀疑。我开始阅读Dataflow背后的想法,但仍有一个问题困扰着我:我如何应用需要在外部服务器上等待回复的'Transform'方法(我相信自己做的)?我是否会阻止当前进程并每隔n秒查询一次响应的状态,还是有更好的方法? –