我正在寻找一种功能强大且快速的方式来处理Google App Engine中大文件的处理。如何处理GAE上的大文件处理?
它的工作原理如下(简化的工作流程,在年底):
- 客户发送一个CSV文件,我们的服务器将通过线治疗,行。
- 文件上传完成后,会在NDB数据存储
Uploads
中添加一个条目,其中包含CSV名称,文件路径(至Google存储)以及一些基本信息。然后,创建一个任务,称为“预处理”。 - 预处理任务将在CSV文件的所有行上循环(可能是数百万),并且会为每个行添加一个NDB条目到
UploadEntries
模型,其中包含CSV ID,行,要提取/如果该行已经开始处理,并且结束处理(“is_treating”,“is_done”) - 一旦预处理任务结束,它就更新客户端的信息“XXX行将被处理“
- 致电
Uploads.next()
。该next
方法:- 搜索已经以虚假
is_treating
和is_done
的UploadEntries
, - 将在Redis的数据存储添加一个任务找到下一行。 (使用Redis数据存储是因为这里的工作是在不由Google管理的服务器上进行的)
- 也会在任务中创建新条目
Process-healthcheck
(此任务在5分钟后运行并检查7)是否已正确执行。如果不是,则认为Redis/Outside服务器发生故障并且执行与7)相同的操作,但没有结果(而是使用“error”))。 - 然后,它将
UploadEntries.is_treating
更新为该条目的True。
- 搜索已经以虚假
- 外部服务器将处理数据,并通过向服务器上的端点发出POST请求返回结果。
- 该端点更新数据存储中的
UploadEntries
条目(包括“is_treating
”和“is_done
”),并调用Uploads.next()
以启动下一行。 - 在Uploads.next中,搜索下一个条目时不会返回任何内容,我认为该文件将被最终处理,并调用任务
post-process
,该任务将使用处理后的数据重建CSV,并将其返回给客户。
这里有几件事情要记住:
- ,做真正的工作是谷歌的AppEngine之外的服务器上,这就是为什么我不得不拿出Redis的。
- 目前的做事方式给了我要处理的并行条目数量的灵活性:在5)中,
Uploads.next()
方法包含一个limit
参数,它让我可以并行搜索并行进程。可以是1,5,20,50。 - 在这种情况下,我不能直接将
pre-processing
任务中的所有行直接添加到Redis becase中,下一位客户将不得不等待第一个文件完成处理,这将会花费太长的时间
但该系统具有的各种问题,这就是为什么我转向你的帮助:
- 有时候,这个系统是如此之快,数据存储是没有正确时更新调用
Uploads.next()
,条目返回a (只是entry.is_treating = True
还没有被推送到数据库) - Redis或我的服务器(我真的不知道)有时会丢失任务,或者没有进行处理后的POST请求,所以任务永远不会去
is_done = True
。这就是为什么我必须实施Healcheck系统,以确保无论如何都能正确处理生产线。这具有双重优势:该任务的名称包含csv ID和行。每个文件都是独一无二的。如果数据存储不是最新的并且同一任务运行两次,则健康检查的创建将失败,因为已存在相同的名称,让我知道存在并发问题,所以我忽略该任务,因为它意味着数据存储尚未更新。
我initiall至想过通过线一个独立的进程文件,行,但这并没有能够并行运行多个线的大的缺点。此外,谷歌将任务的执行时间限制在24小时内,而不是默认值,当文件非常大时,它可以运行超过24小时。
的信息,如果有帮助,我使用Python的
,并简化工作流程,这里就是我试图以尽可能最好的方式来实现:
- 处理一个大文件,运行多个paralllel进程,每行一个。
- 使用Redis将工作发送到外部服务器。一旦这样做,是对外部服务器通过POST请求到主服务器的
- 主服务器,然后更新有关该行的信息返回结果,并进入到下一行
我真的很感激,如果有人有一个更好的方式来做到这一点。我真的相信我不是第一个做这种工作的人,我很确定我没有做正确的工作。我相信Stackoverflow是Stack Exchange最好的部分,因为它是一个算法问题,但它也有可能我没有看到更好的网络。如果是这样,我很抱歉那)。
我想你可以使用app engine mapreduce来做这件事,它可以从GCS逐行读取CSV格式的缓冲区,并在多个实例上运行它。它会根据您的设置处理每个请求的N行。但是,GAE实例很昂贵。 –