Home | 简体中文 | 繁体中文 | 杂文 | 打赏(Donations) | ITEYE 博客 | OSChina 博客 | Facebook | Linkedin | 知乎专栏 | Search | Email

5.14. pthreads

编译PHP时需要加入 --enable-maintainer-zts 选项才能安装pthreads

# pecl install pthread
	

配置文件

	
cat > /srv/php-5.5.7/etc/conf.d/pthreads.ini <<EOF
extension=pthreads.so
EOF
	
	
$ php -m |grep pthreads
pthreads
	

5.14.1. pecl/pthreads requires PHP (version >= 7.0.0RC5), installed version is 5.6.16

pthreads 3.0.0 之后不再支持5.6.x,PHP 版本大于等于 7.0.0

			
# pecl install pthreads
pecl/pthreads requires PHP (version >= 7.0.0RC5), installed version is 5.6.16
No valid packages found
install failed
			
			

解决方法是安装较低版本的pthreads-2.0.10

# pecl install pthreads-2.0.10
			

5.14.2. Thread

		
<?php

class test extends Thread {

    public $name   = '';
    public $runing = false;

    public function __construct($name) {

        $this->name   = $name;
        $this->runing = true;
    }

    public function run() {
	$n = 0;
        while ($this->runing) {
		printf("name: %s %s\n",$this->name, $n);
		$n++;
                sleep(1);
        }
    }

}

$pool[] = new test('a');
$pool[] = new test('b');
$pool[] = new test('c');

foreach ($pool as $w) {
    $w->start();
}
		
		

线程池实现方法

		
	$pool = array();
	while($member = $row->fetch(PDO::FETCH_ASSOC))
	{

		while ( true ){
			if(count($pool) < 2000){ //定义线程池数量,小于线程池数量则开启新的线程直到小于2000为止
				$pool[$id] = new Update($member);
				$pool[$id]->start();
				break;
			}else{
				foreach ( $pool as $name => $worker){
					//如果线程已经运行结束,销毁线程,给新的任务使用
					if(! $worker->isRunning()){
						unset($pool[$name]);
					}
				}
			}

		}

	}
		
		

5.14.3. Pool

		
<?php
class ExampleWork extends Stackable {
        public function __construct($data) {
                $this->local = $data;
        }
        public function run() {
	//	print_r($this->local);echo "\r\n";
	echo '------------------- '. $this->local . " -----------------\r\n";
	sleep(1);
        }
}
class ExampleWorker extends Worker {

        public function __construct($name) {
                $this->name = $name;
                $this->data = array();
        }
        public function run(){
                $this->name = sprintf("(%lu)", $this->getThreadId());
        }
}
/* Dead simple pthreads pool */
class Pool {
        /* to hold worker threads */
        public $workers;
        /* to hold exit statuses */
        public $status;
        /* prepare $size workers */
        public function __construct($size = 10) {
                $this->size = $size;
        }
        /* submit Stackable to Worker */
        public function submit(Stackable $stackable) {
            if (count($this->workers)<$this->size) {
                    $id = count($this->workers);
                    $this->workers[$id] = new ExampleWorker(sprintf("Worker [%d]", $id));
                    $this->workers[$id]->start(PTHREADS_INHERIT_NONE);

                    if ($this->workers[$id]->stack($stackable)) {
                           return $stackable;
                    } else trigger_error(sprintf("failed to push Stackable onto %s", $this->workers[$id]->getName()), E_USER_WARNING);
            }else{
				for ($i=0;$i<count($this->workers);$i++){
                	if( ! $this->workers[$i]->isWorking()){
						$this->workers[$i]->stack($stackable);
						return $stackable;
				}
        	}
		}

                return false;
        }
	public function status(){
		for ($i=0;$i<count($this->workers);$i++){
		printf("(%s:%s)\r\n",$i, $this->workers[$i]->isWorking());
		}
		printf("\r\n");

	}
        /* Shutdown the pool of threads cleanly, retaining exit status locally */
        public function shutdown() {
                foreach($this->workers as $worker) {
                        $this->status[$worker->getThreadId()]=$worker->shutdown();
                }
        }
}
/* Create a pool of ten threads */
$pool = new Pool(100);
/* Create and submit an array of Stackables */
$work = array();
for ($target = 0; $target < 1000; $target++){

    $work[$target]=$pool->submit(new ExampleWork($target));

	if($work[$target] == false){
		$target--;
		sleep(1);
		continue;
	}
	for ($i=0;$i<count($work);$i++){
		if($work[$i]->isRunning()){
			printf("cell: %s, status: %s\r\n",$i, $work[$i]->isRunning());
		}
	}
	printf("\r\n");
}
$pool->shutdown();
exit();
		
		

pthreads 自带 Pool

		
<?php
class ExampleWorker extends Worker {
 
	public function __construct(Logging $logger) {
		$this->logger = $logger;
	}
 
	protected $logger;	
}
 
/* the collectable class implements machinery for Pool::collect */
class Work extends Stackable {
	public function __construct($number) {
		$this->number = $number;
	}
	public function run() {
		$this->worker
			->logger
			->log("%s executing in Thread #%lu",
				  __CLASS__, $this->worker->getThreadId());
		sleep(1);
		printf("runtime: %s, %d\n", date('Y-m-d H:i:s'), $this->number);
		$this->status = "OK";
	}
}
 
class Logging extends Stackable {
 
	protected function log($message, $args = []) {
		$args = func_get_args();	
 
		if (($message = array_shift($args))) {
			echo vsprintf("{$message}\n", $args);
		}
	}
}
 
$pool = new Pool(5, \ExampleWorker::class, [new Logging()]);
 
foreach (range(0, 100) as $number) {
	$pool->submit(new Work($number));
}
 
 
$pool->shutdown();
 
var_dump($pool);
?>
		
		

例 5.7. Threads - Pool

			
# cat pool.php 
<?php

class MyWork extends Stackable {

    public $name;

    public function __construct($name) {
        echo "Stackable executed  $name\n";
        $this->name = $name;
    }

    public function run() {
        echo "Stackable $this->name start running\n";
        for ($i = 1; $i <= 5; $i++) {
            echo "Run $this->name : $i\n";
            sleep(1);
        }
    }

}

class MyWorker extends Worker {
    public function __construct($name) {
	$this->name = $name;
    }
    public function run() {
        echo "Worker started $this->name\n";
    }
}

$pool = new Pool(3, \MyWorker::class, array("pthreads"));
$pool->submit(new MyWork("A"));
$pool->submit(new MyWork("B"));
$pool->submit(new MyWork("C"));
$pool->shutdown();
			
			

5.14.4. FAQ

5.14.4.1. You cannot serialize or unserialize PDO instances

			
PHP Fatal error:  Uncaught exception 'PDOException' with message 'You cannot serialize or unserialize PDO instances' in /home/www/threads.php:38
Stack trace:
#0 /home/www/threads.php(38): PDO->__sleep()
#1 [internal function]: SQLWorker->run()
#2 {main}
  thrown in /home/www/threads.php on line 38
 not ready
 			
			
			
<?php
class MyWorker extends Worker{
    public static $pdo;

    function __construct($conf){
        $this->conf = $conf;
    }

    function run(){
        self::$pdo = new PDO(
            'mysql:host=localhost;dbname=test');
    }

    function get_connection(){
        return self::$pdo;
    }
}
?>
			
			

5.14.5. 互斥锁

什么情况下会用到互斥锁?在你需要控制多个线程同一时刻只能有一个线程工作的情况下可以使用。

下面我们举一个例子,一个简单的计数器程序,说明有无互斥锁情况下的不同。

		
<?php
$counter = 0;
//$handle=fopen("php://memory", "rw"); 
//$handle=fopen("php://temp", "rw"); 
$handle=fopen("/tmp/counter.txt", "w"); 
fwrite($handle, $counter ); 
fclose($handle);

class CounterThread extends Thread {
	public function __construct($mutex = null){
		$this->mutex = $mutex;
        $this->handle = fopen("/tmp/counter.txt", "w+");
    }
	public function __destruct(){
		fclose($this->handle);
	}
    public function run() {
		if($this->mutex)
			$locked=Mutex::lock($this->mutex);

		$counter = intval(fgets($this->handle));
		$counter++;
		rewind($this->handle);
		fputs($this->handle, $counter ); 
		printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter);
		
		if($this->mutex)
			Mutex::unlock($this->mutex);
    }
}

//没有互斥锁
for ($i=0;$i<50;$i++){
	$threads[$i] = new CounterThread();
	$threads[$i]->start();

}

//加入互斥锁
$mutex = Mutex::create(true);
for ($i=0;$i<50;$i++){
	$threads[$i] = new CounterThread($mutex);
	$threads[$i]->start();

}

Mutex::unlock($mutex);
for ($i=0;$i<50;$i++){
	$threads[$i]->join();
}
Mutex::destroy($mutex);

?>
		
		

我们使用文件/tmp/counter.txt保存计数器值,每次打开该文件将数值加一,然后写回文件。当多个线程同时操作一个文件的时候,就会线程运行先后取到的数值不同,写回的数值也不同,最终计数器的数值会混乱。

没有加入锁的结果是计数始终被覆盖,最终结果是2

而加入互斥锁后,只有其中的一个进程完成加一工作并释放锁,其他线程才能得到解锁信号,最终顺利完成计数器累加操作

上面例子也可以通过对文件加锁实现,这里主要讲的是多线程锁,后面会涉及文件锁。

5.14.6. FAQ

5.14.6.1. configure: error: pthreads requires ZTS, please re-compile PHP with ZTS enabled

重新编译加入 --enable-maintainer-zts

5.14.6.2. pecl/pthreads requires PHP (version >= 7.0.0RC2), installed version is 5.6.13

[root@localhost src]# pecl search pthreads
Retrieving data...0%
Matched packages, channel pecl.php.net:
=======================================
Package  Stable/(Latest) Local
pthreads 3.0.7 (stable)        Threading API
			
			
[root@localhost src]# pecl install pthreads
pecl/pthreads requires PHP (version >= 7.0.0RC2), installed version is 5.6.13
No valid packages found
install failed
			

解决方法,手工编译旧的安装包

			
[root@localhost src]# wget https://pecl.php.net/get/pthreads-3.0.6.tgz
[root@localhost src]# tar zxvf pthreads-3.0.6.tgz 		
[root@localhost src]# cd pthreads-3.0.6
[root@localhost pthreads-3.0.6]# phpize
[root@localhost pthreads-3.0.6]# ./configure --enable-pthreads --with-php-config=/srv/php/bin/php-config
[root@localhost pthreads-3.0.6]# make && make install
			
			

5.14.6.3. Class 'Stackable' not found in /path/to/file.php

故障出现在PHP 7.x,pecl 已经升级至 3.1.6

Stackable 是 Threaded 的一个别名,这个类使用直到 pthreads v.2.0.0,之后便取消Stackable。

$ pecl search pthread
Retrieving data...0%
Matched packages, channel pecl.php.net:
=======================================
Package  Stable/(Latest) Local
pthreads 3.1.6 (stable)        Threading API			
			

测试

$stackable = new Stackable;
var_dump(get_class($stackable));

Outputs: string(8) "Threaded"			
			

源码

/**
 * Stackable is an alias of Threaded. This class name was used in pthreads until
 * version 2.0.0
 * @link http://www.php.net/manual/en/class.threaded.php
 */
class Stackable extends Threaded implements Traversable, Countable, ArrayAccess {

}
			

解决方案,将 Stackable 改为 Threaded