PHP高级编程之发布订阅

Mr. Neo Chen (陈景峯), netkiller, BG7NYT


中国广东省深圳市龙华新区民治街道溪山美地
518131
+86 13113668890


版权声明

转载请与作者联系,转载时请务必标明文章原始出处和作者信息及本声明。

文档出处:
http://netkiller.github.io
http://netkiller.sourceforge.net

微信扫描二维码进入 Netkiller 微信订阅号

QQ群:128659835 请注明“读者”

2014-09-05

摘要

目录

1. Redis 订阅发布代码

订阅代码

			
<?php
$redis = new Redis();
$redis->connect('192.168.2.1',6379);
$channel = $argv[1];  // channel
$redis->subscribe(array('channel'.$channel), 'callback');
function callback($instance, $channelName, $message) {
  echo $channelName, "==>", $message,PHP_EOL;
}
			
			

发布代码

			
<?php

$redis = new Redis();
$redis->connect('192.168.2.1',6379);
$channel = $argv[1];  // channel
$msg = $argv[2]; // msg
$redis->publish('channel'.$channel, $msg);
			
			

2. 守护进程

上面程序发布端运行后始终处在前台,终端会接收来自键盘的消息,可能会终止程序运行。我们采用守护进程方式,使之进入后台运行。

			
<?php
/*
Homepage: http://netkiller.github.io
Author: neo chan <netkiller@msn.com>
*/

class Example {
	/* config */
	const HOST = '192.168.2.1';
	const PORT = 6379;
	const MAXCONN = 2048;

	const pidfile = __CLASS__;
	const uid	= 80;
	const gid	= 80;
	
	protected $pool = NULL;
	protected $redis = NULL;

	public function __construct() {
		$this->pidfile = '/var/run/'.self::pidfile.'.pid';
		$this->redis = new Redis();
	}
	private function daemon(){
		if (file_exists($this->pidfile)) {
			echo "The file $this->pidfile exists.\n";
			exit();
		}
		
		$pid = pcntl_fork();
		if ($pid == -1) {
			 die('could not fork');
		} else if ($pid) {
			 // we are the parent
			 //pcntl_wait($status); //Protect against Zombie children
			exit($pid);
		} else {
			// we are the child
			file_put_contents($this->pidfile, getmypid());
			posix_setuid(self::uid);
			posix_setgid(self::gid);
			return(getmypid());
		}
	}
	private function run(){
		$this->pool = new Pool(self::MAXCONN, \ExampleWorker::class, []);

		$this->redis->connect(self::HOST, self::PORT);
		$channel = array('news', 'login', 'logout');
		$this->redis->subscribe($channel, 'callback');
		function callback($instance, $channelName, $message) {
  			echo $channelName, "==>", $message,PHP_EOL;
			//print_r($message);
			$this->pool->submit(new Fee($message));
		}
	
		$pool->shutdown();	
	}
	private function start(){
		$pid = $this->daemon();
		$this->run();
	}
	private function onestart(){
		$this->run();
	}	

	private function stop(){

		if (file_exists($this->pidfile)) {
			$pid = file_get_contents($this->pidfile);
			posix_kill($pid, 9); 
			unlink($this->pidfile);
		}
	}
	private function help($proc){
		printf("%s start | stop | onestart | help \n", $proc);
	}
	public function main($argv){
		if(count($argv) < 2){
			printf("please input help parameter\n");
			exit();
		}
		if($argv[1] === 'stop'){
			$this->stop();
		} else if($argv[1] === 'start'){
			$this->start();
		} else if ($argv[1] === 'onestart') {
			$this->onestart();
		} else{
			$this->help($argv[0]);
		}
	}
}

$example = new Example();
$example->main($argv);			
			
			

收到任务通过$this->pool->submit将任务分配给线程程序处理