2011-12-01 22 views
2

道歉,如果这已被覆盖之前 - 我做了我的搜索,但可能可能不知道使用正确的术语。处理单个阵列的多个“代理”

此过程由PHP处理。

这里的情况:

我有一个大型的文件名数组。我已经打开这些文件并将其内容输入到数据库中。一次处理这些文件需要24小时以上,并且这些文件每天都会更新。

将单个大阵列分解为四个较小的阵列并运行并发进程会在24小时窗口过去之前完成作业,但有时一个或两个进程将在其他时间之前完成几个小时,因为文件大小每天都有所不同。

就像谁炒股零售货架(还有谁工作过那个噩梦?)间距,以帮助有什么完成自己的任务后留下的人,我想有一个脚本在地方,这些“代理人”照着做。

这里是什么,我想通了一些基础知识 - 这可能是错的,我不是太自豪地抗议,如果我:-)

$files = array('file1','file2','file3','file4','file5'); 
//etc... on to over 4k elements 

while($file = array_pop($files)){ 

    //Something in here... I have no idea what. 

} 

想法?类似于四个函数调用或四个循环内的整个'while'已经跨越了我的想法,但我很确定它将等待执行后续调用,直到前一个(s)完成。

任何帮助表示赞赏。我严重卡在这一个!

谢谢!

+0

您确定要使用PHP来做到这一点吗?使用支持简单多线程的选择语言(如C#中的ThreadPool) – CodeZombie

+0

Python .....也许这里的答案 –

+0

这是我最熟悉的语言,抓取源文件的代码是用PHP编写的,但它是从一个bash控制台而不是一个网站运行的 - 所以一些shell脚本完全处于可能的范围之内。 – user1075581

回答

2

数据库支持的消息队列似乎是一个明显的解决方案,但我认为这在这种情况下是过度的。我只是将要处理的文件放入一个专用队列目录中,然后使用DirectoryIterator类对其进行扫描。事情是这样的:

while (true) { 
    look in the queue directory for a file 
    if you don't fine one, exit the script, all processing is done 
    if you find one, rename it or move it to a work directory 
    if the rename/move command succeeded, process the file 
    if the rename/move command failed, one of the other threads got it first 
} 

编辑:

关于开展工作人员,你可以使用一个简单的shell脚本产卵的PHP程序在后台:

NUM_WORKERS=5 
for WORKER in $(seq 1 ${NUM_WORKERS}) 
do 
    echo "starting worker ${WORKER}" 
    php -f /path/to/my/process.php & 
done 

然后,创建一个cron运行这个启动器,例如,在午夜:

0 0 * * * /path/to/launcher.sh 
+0

我喜欢这个,因为它建立在我已有的东西上(所有目标文件在处理到数据库中之前都被下载到一个目录中),但我能想到的唯一方法就是使其具有一个shell或执行后台进程的perl脚本(在命令的末尾添加一个&),并能够确定进程何时结束,从而将新文件分配给可用的“代理程序”。不过,Shell脚本和Perl现在已经超出了我的视野。它在我的“待办事项”列表上,但:-) – user1075581

+0

您不会将文件分配给代理。代理程序会查找更多的文件进行处理,如果没有文件则退出。主脚本只会启动四个(或十个)代理。 –

+0

嗯...所以这更像是一个“配电设施”? (因为没有更好的anaogy)。这运行在一个目标目录上,然后它扫描另外四个目录以查找文件。如果一个目录是空的,它将一个文件移入它。每个文件夹都由一个单独的进程监视,该进程将该目录中的任何文件的内容输入到数据库中?我甚至关闭?我觉得今天早上我还没有喝足够的咖啡...... – user1075581

2

你想要什么叫“消息队列”。类似于beanstalkd

您将基本上创建一个包含您的个人文件名的消息列表。然后您将创建一组处理器来处理它们。每个处理器将处理一个文件,然后返回队列以查看是否有更多消息/文件正在等待处理。

编辑: 下面是一个类比来帮助解释消息队列。你的第一个想法就像是一个人工经理拿着一堆文件,把他们分成四堆,然后把他的四个员工的每一个都交给一堆人处理。消息队列更像这样:管理器将所有文件放在一个表上,并告诉每个员工从表中取出单个文件并处理它。他告诉他们何时完成第一个文件,以便在文件没有更多文件的情况下继续进行文件处理。当所有文件完成后,员工可以回家。

一名员工可能以非常大的文件结束,只处理一些,而另一名员工可能会得到较小的文件并处理很多问题。每个员工处理多少并不重要,他们都会继续工作,直到表格为空。

+0

在确定它是否需要我之前,我需要对此进行更多的了解。 @ alex-howansky提到了一个处理目录中文件的解决方案,这与我已有的东西非常一致,但对于并行执行进程而不是串行进程的代码,我仍然有点卡住(因此希望进一步研究豆杆)。 – user1075581

+0

“消息队列”是一个很好的搜索术语。这是一个普遍的想法,不限于beanstalkd,这是一个特定的实现。对于Alex的队列实现,您可以使用一个php或bash脚本将所有文件移动到队列目录中。然后它会启动一系列独立的PHP脚本,它们遵循他提供的伪代码。这些脚本会同时处理队列中的文件。这仍然是一个“消息队列”,只是一个更简单的代码自己的版本。 –

1

我会有一个插座服务器主脚本,将文件路径提交给x个从脚本,直到没有文件需要处理。这样,所有的从属脚本将继续运行,并且可以根据请求动态地分配文件路径。

事情是这样的:

master.php

<?php 

    // load the array of files to process (however you do this) 
    $fileList = file('filelist.txt'); 

    // Create a listening socket on localhost 
    $serverSocket = stream_socket_server('tcp://127.0.0.1:7878'); 
    $sockets = array($serverSocket); 
    $clients = array(); 

    // Loop while there are still files to process 
    while (count($fileList)) { 

    // Run a select() call on the existing sockets' read buffers 
    // Skip to next iteration if no sockets are waiting for handling 
    if (stream_select($read = $sockets, $write = NULL, $except = NULL, 1) < 1) { 
     continue; 
    } 

    // Loop sockets with data to read 
    foreach ($read as $socket) { 

     if ($socket == $serverSocket) { 
     // Accept new clients 
     $sockets[] = $clients[] = stream_socket_accept($serverSocket); 
     } else if (trim(fgets($socket)) == 'next') { 
     // Hand out a new file path to the client 
     fwrite($socket, array_shift($fileList)."\n"); 
     if (!count($fileList)) { 
      break 2; 
     } 
     } 

    } 

    } 

    // When we're done, disconnect the clients 
    foreach ($clients as $socket) { 
    @fclose($socket); 
    } 

    // ...and close the listen socket 
    @fclose($serverSocket); 

slave.php

<?php 

    $socket = fsockopen('127.0.0.1', 7878); 

    while (!feof($socket)) { 

    // Get a new file path from the master 
    fwrite($socket,"next\n"); 
    $path = trim(fgets($socket)); 

    if (is_file($path)) { 
     // Process the file at $path here 
    } 

    } 

,那么你只需要启动master.php,那么当它正在运行,但可以启动多个实例slave.php如你所愿,他们将一直运行,直到没有更多的文件要处理。

显然,这没有错误处理,但它应该提供一个基本的框架,让你开始。这依赖于阻止函数调用(stream_select()fgets())以避免竞争条件 - 这可能或可能不足以满足您的需要。

+0

这比我自己想象的更聪明!我喜欢这个概念!然而,在我能够尝试之前还有一段时间 - 准备好迎接一段时间。不错,但! :-D – user1075581