<?php class test extends Thread { public $name = ''; public $runing = false; public function __construct($name) { $this->name = $name; $this->runing = true; } public function run() { $n = 0; while ($this->runing) { printf("name: %s %s\n",$this->name, $n); $n++; sleep(1); } } } $pool[] = new test('a'); $pool[] = new test('b'); $pool[] = new test('c'); foreach ($pool as $w) { $w->start(); }
线程池实现方法
$pool = array(); while($member = $row->fetch(PDO::FETCH_ASSOC)) { while ( true ){ if(count($pool) < 2000){ //定义线程池数量,小于线程池数量则开启新的线程直到小于2000为止 $pool[$id] = new Update($member); $pool[$id]->start(); break; }else{ foreach ( $pool as $name => $worker){ //如果线程已经运行结束,销毁线程,给新的任务使用 if(! $worker->isRunning()){ unset($pool[$name]); } } } } }
<?php class ExampleWork extends Stackable { public function __construct($data) { $this->local = $data; } public function run() { // print_r($this->local);echo "\r\n"; echo '------------------- '. $this->local . " -----------------\r\n"; sleep(1); } } class ExampleWorker extends Worker { public function __construct($name) { $this->name = $name; $this->data = array(); } public function run(){ $this->name = sprintf("(%lu)", $this->getThreadId()); } } /* Dead simple pthreads pool */ class Pool { /* to hold worker threads */ public $workers; /* to hold exit statuses */ public $status; /* prepare $size workers */ public function __construct($size = 10) { $this->size = $size; } /* submit Stackable to Worker */ public function submit(Stackable $stackable) { if (count($this->workers)<$this->size) { $id = count($this->workers); $this->workers[$id] = new ExampleWorker(sprintf("Worker [%d]", $id)); $this->workers[$id]->start(PTHREADS_INHERIT_NONE); if ($this->workers[$id]->stack($stackable)) { return $stackable; } else trigger_error(sprintf("failed to push Stackable onto %s", $this->workers[$id]->getName()), E_USER_WARNING); }else{ for ($i=0;$i<count($this->workers);$i++){ if( ! $this->workers[$i]->isWorking()){ $this->workers[$i]->stack($stackable); return $stackable; } } } return false; } public function status(){ for ($i=0;$i<count($this->workers);$i++){ printf("(%s:%s)\r\n",$i, $this->workers[$i]->isWorking()); } printf("\r\n"); } /* Shutdown the pool of threads cleanly, retaining exit status locally */ public function shutdown() { foreach($this->workers as $worker) { $this->status[$worker->getThreadId()]=$worker->shutdown(); } } } /* Create a pool of ten threads */ $pool = new Pool(100); /* Create and submit an array of Stackables */ $work = array(); for ($target = 0; $target < 1000; $target++){ $work[$target]=$pool->submit(new ExampleWork($target)); if($work[$target] == false){ $target--; sleep(1); continue; } for ($i=0;$i<count($work);$i++){ if($work[$i]->isRunning()){ printf("cell: %s, status: %s\r\n",$i, $work[$i]->isRunning()); } } printf("\r\n"); } $pool->shutdown(); exit();
PHP Fatal error: Uncaught exception 'PDOException' with message 'You cannot serialize or unserialize PDO instances' in /home/www/threads.php:38 Stack trace: #0 /home/www/threads.php(38): PDO->__sleep() #1 [internal function]: SQLWorker->run() #2 {main} thrown in /home/www/threads.php on line 38 not ready
<?php class MyWorker extends Worker{ public static $pdo; function __construct($conf){ $this->conf = $conf; } function run(){ self::$pdo = new PDO( 'mysql:host=localhost;dbname=test'); } function get_connection(){ return self::$pdo; } } ?>