版权 © 2014 http://netkiller.github.io
版权声明
转载请与作者联系,转载时请务必标明文章原始出处和作者信息及本声明。
|
|
|
微信扫描二维码进入 Netkiller 微信订阅号 QQ群:128659835 请注明“读者” |
2014-09-05
订阅代码
<?php
$redis = new Redis();
$redis->connect('192.168.2.1',6379);
$channel = $argv[1]; // channel
$redis->subscribe(array('channel'.$channel), 'callback');
function callback($instance, $channelName, $message) {
echo $channelName, "==>", $message,PHP_EOL;
}
发布代码
<?php
$redis = new Redis();
$redis->connect('192.168.2.1',6379);
$channel = $argv[1]; // channel
$msg = $argv[2]; // msg
$redis->publish('channel'.$channel, $msg);
上面程序发布端运行后始终处在前台,终端会接收来自键盘的消息,可能会终止程序运行。我们采用守护进程方式,使之进入后台运行。
<?php
/*
Homepage: http://netkiller.github.io
Author: neo chan <netkiller@msn.com>
*/
class Example {
/* config */
const HOST = '192.168.2.1';
const PORT = 6379;
const MAXCONN = 2048;
const pidfile = __CLASS__;
const uid = 80;
const gid = 80;
protected $pool = NULL;
protected $redis = NULL;
public function __construct() {
$this->pidfile = '/var/run/'.self::pidfile.'.pid';
$this->redis = new Redis();
}
private function daemon(){
if (file_exists($this->pidfile)) {
echo "The file $this->pidfile exists.\n";
exit();
}
$pid = pcntl_fork();
if ($pid == -1) {
die('could not fork');
} else if ($pid) {
// we are the parent
//pcntl_wait($status); //Protect against Zombie children
exit($pid);
} else {
// we are the child
file_put_contents($this->pidfile, getmypid());
posix_setuid(self::uid);
posix_setgid(self::gid);
return(getmypid());
}
}
private function run(){
$this->pool = new Pool(self::MAXCONN, \ExampleWorker::class, []);
$this->redis->connect(self::HOST, self::PORT);
$channel = array('news', 'login', 'logout');
$this->redis->subscribe($channel, 'callback');
function callback($instance, $channelName, $message) {
echo $channelName, "==>", $message,PHP_EOL;
//print_r($message);
$this->pool->submit(new Fee($message));
}
$pool->shutdown();
}
private function start(){
$pid = $this->daemon();
$this->run();
}
private function onestart(){
$this->run();
}
private function stop(){
if (file_exists($this->pidfile)) {
$pid = file_get_contents($this->pidfile);
posix_kill($pid, 9);
unlink($this->pidfile);
}
}
private function help($proc){
printf("%s start | stop | onestart | help \n", $proc);
}
public function main($argv){
if(count($argv) < 2){
printf("please input help parameter\n");
exit();
}
if($argv[1] === 'stop'){
$this->stop();
} else if($argv[1] === 'start'){
$this->start();
} else if ($argv[1] === 'onestart') {
$this->onestart();
} else{
$this->help($argv[0]);
}
}
}
$example = new Example();
$example->main($argv);
收到任务通过$this->pool->submit将任务分配给线程程序处理