2017-08-26 31 views
2

我在php中使用PThreads进行多线程。我已经在Windows上的XAMPP服务器上成功安装并运行了它。 我有100K记录在我的数据库,我想运行parallel.Every螺纹20个线程将调用来自数据库和流程5K记录them.Here是我这个PHP的Pthread问题

require('mailscript.php'); 
class My extends Thread{ 

    function __construct() { 
     $this->mailscript = new mailscript(); 
    } 

    function run(){ 
     $this->mailscript->runMailScript(5000); 
    } 
} 

for($i=0;$i<20;$i++){ 
    $pool[] = new My(); 
} 

foreach($pool as $worker){ 
    $worker->start(); 
} 
foreach($pool as $worker){ 
    $worker->join(); 
} 

代码时,我只运行此代码它每个线程最多运行约600条记录。对于PThread中的线程数有任何限制问题。有什么问题请帮助我

回答

1

hie,这里是我将如何处理pthread与你的例子与一个集合方法,如果必要的话会被使用。希望这会帮助你。

/*pthreads batches */ 
$batches = array(); 

$nbpool = 20; // cpu 10 cores 

/* job 1 */ 
$list = [/* data1 */]; 
$batches[] = array_chunk($list, 5000); 

/* job 2 */ 
$list2 = [/* data2 */]; 
$batches[] = array_chunk($list2, 10000); 

/*final collected results*/ 
$resultFinal = []; 

/* loop across batches */ 
foreach ($batches as $key => $chunks) { 

    /* for intermediate collection */ 
    $data[$key] = []; 

    /* how many workers */ 
    $workCount = count($chunks); 

    /* set pool job up to max cpu capabilities */ 
    $pool = new Pool($nbpool, Worker::class); 

    /* pool cycling submit */ 
    foreach (range(1, $workCount) as $i) { 
     $chunck = $chunks[$i - 1]; 
     $pool->submit(new processWork($chunck)); 
    } 

    /* on collection cycling */ 
    $collector = function (\Collectable $work) use (&$data) { 

     /* is worker complete ? */ 
     $isGarbage = $work->isGarbage(); 

     /* worker complete */ 
     if ($isGarbage) { 
      $data[$key] = $work->result; 
     } 
     return $isGarbage; 
    }; 
    do { 
     /* collection on pool stack */ 
     $count = $pool->collect($collector); 
     $isComplete = count($data) === $workCount; 
    } while (!$isComplete); 

    /* push stack results */ 
    array_push($resultFinal, $data); 

    /* close pool */ 
    $pool->shutdown(); 
} 

class processWork extends \Threaded implements \Collectable { 

    private $isGarbage; 
    private $process; 
    public $result; 

    public function __construct($process) { 
     $this->process = $process; 
    } 

    public function run() { 
     $workerDone = array(); 
     foreach ($this->process as $k => $el) { 
      /* whatever stuff with $this->process */ 
     } 
     $this->result = $workerDone; 
     $this->isGarbage = true; // yeah, it s done 
    } 

    public function isGarbage(): bool { 
     return $this->isGarbage; 
    } 
}