版权 © 2014 http://netkiller.github.io
版权声明
转载请与作者联系,转载时请务必标明文章原始出处和作者信息及本声明。
|
|
|
微信扫描二维码进入 Netkiller 微信订阅号 QQ群:128659835 请注明“读者” |
2014-09-05
订阅代码
<?php $redis = new Redis(); $redis->connect('192.168.2.1',6379); $channel = $argv[1]; // channel $redis->subscribe(array('channel'.$channel), 'callback'); function callback($instance, $channelName, $message) { echo $channelName, "==>", $message,PHP_EOL; }
发布代码
<?php $redis = new Redis(); $redis->connect('192.168.2.1',6379); $channel = $argv[1]; // channel $msg = $argv[2]; // msg $redis->publish('channel'.$channel, $msg);
上面程序发布端运行后始终处在前台,终端会接收来自键盘的消息,可能会终止程序运行。我们采用守护进程方式,使之进入后台运行。
<?php /* Homepage: http://netkiller.github.io Author: neo chan <netkiller@msn.com> */ class Example { /* config */ const HOST = '192.168.2.1'; const PORT = 6379; const MAXCONN = 2048; const pidfile = __CLASS__; const uid = 80; const gid = 80; protected $pool = NULL; protected $redis = NULL; public function __construct() { $this->pidfile = '/var/run/'.self::pidfile.'.pid'; $this->redis = new Redis(); } private function daemon(){ if (file_exists($this->pidfile)) { echo "The file $this->pidfile exists.\n"; exit(); } $pid = pcntl_fork(); if ($pid == -1) { die('could not fork'); } else if ($pid) { // we are the parent //pcntl_wait($status); //Protect against Zombie children exit($pid); } else { // we are the child file_put_contents($this->pidfile, getmypid()); posix_setuid(self::uid); posix_setgid(self::gid); return(getmypid()); } } private function run(){ $this->pool = new Pool(self::MAXCONN, \ExampleWorker::class, []); $this->redis->connect(self::HOST, self::PORT); $channel = array('news', 'login', 'logout'); $this->redis->subscribe($channel, 'callback'); function callback($instance, $channelName, $message) { echo $channelName, "==>", $message,PHP_EOL; //print_r($message); $this->pool->submit(new Fee($message)); } $pool->shutdown(); } private function start(){ $pid = $this->daemon(); $this->run(); } private function onestart(){ $this->run(); } private function stop(){ if (file_exists($this->pidfile)) { $pid = file_get_contents($this->pidfile); posix_kill($pid, 9); unlink($this->pidfile); } } private function help($proc){ printf("%s start | stop | onestart | help \n", $proc); } public function main($argv){ if(count($argv) < 2){ printf("please input help parameter\n"); exit(); } if($argv[1] === 'stop'){ $this->stop(); } else if($argv[1] === 'start'){ $this->start(); } else if ($argv[1] === 'onestart') { $this->onestart(); } else{ $this->help($argv[0]); } } } $example = new Example(); $example->main($argv);
收到任务通过$this->pool->submit将任务分配给线程程序处理