Home | 简体中文 | 繁体中文 | 杂文 | Github | 知乎专栏 | 51CTO学院 | CSDN程序员研修院 | OSChina 博客 | 腾讯云社区 | 阿里云栖社区 | Facebook | Linkedin | Youtube | 打赏(Donations) | About
知乎专栏多维度架构

2.16. 数据库与应用程序间通信

本章讲解数据库与应用程序间通信,这里会涉及到

2.16.1. 管道通信

你是否想过当数据库中的数据发生变化的时候出发某种操作?但因数据无法与其他进程通信(传递信号)让你放弃,而改用每隔一段时间查询一次数据变化的方法?下面的插件可以解决你的问题。

2.16.1.1. 背景

你是否有这样的需求:

你需要监控访问网站的IP,当同一个IP地址访问次数过多需要做出处理,例如拉黑,直接丢进iptables 防火墙规则连中。你的做法只能每个一段时间查询一次数据库,并且判断是否满足拉黑需求?

你是否需要监控某些数据发生变化,并通知其他程序作出处理。例如新闻内容修改后,需要立即做新页面静态化处理,生成新的静态页面

你使用数据库做队列,例如发送邮件,短信等等。你要通知发送程序对那些手机或者短线发送数据

2.16.1.2. 解决思路

需要让数据库与其他进程通信,传递信号

例如,发送短信这个需求,你只要告诉发短信的机器人发送的手机号码即可,机器人永远守候那哪里,只要命令一下立即工作。

监控数据库变化的需求原理类似,我们需要有一个守护进程等待命令,一旦接到下达命令便立即生成需要的静态页面

这里所提的方案是采用fifo(First In First Out)方案,通过管道相互传递信号,使两个进程协同工作,这样的效率远比定时任务高许多。fifo是用于操作系统内部进程间通信,如果跨越操作系统需要使用Socket,还有一个新名词MQ(Message queue).

这里只做fifo演示, 将本程序改为Socket方案,或者直接集成成熟的MQ也是分分钟可以实现。

2.16.1.3. Mysql plugin

我开发了几个 UDF, 共4个 function

UDF

fifo_create(pipename)

创建管道.成功返回true,失败返回flase.

fifo_remove(pipename)

删除管道.成功返回true,失败返回flase.

fifo_read(pipename)

读操作.

fifo_write(pipename,message)

写操作 pipename管道名,message消息正文.

有了上面的function后你就可以在begin,commit,rollback 直接穿插使用,实现在事物处理期间做你爱做的事。也可以用在触发器与EVENT定时任务中。

2.16.1.4. plugin 的开发与使用

编译UDF你需要安装下面的软件包

sudo apt-get install pkg-config
sudo apt-get install libmysqlclient-dev

sudo apt-get install gcc gcc-c++ make automake autoconf
		

https://github.com/netkiller/mysql-fifo-plugin

编译udf,最后将so文件复制到 /usr/lib/mysql/plugin/

git clone https://github.com/netkiller/mysql-image-plugin.git
cd mysql-image-plugin

gcc -O3  -g  -I/usr/include/mysql -I/usr/include  -fPIC -lm -lz -shared -o fifo.so fifo.c
sudo mv fifo.so /usr/lib/mysql/plugin/
		

装载

create function fifo_create returns string soname 'fifo.so';
create function fifo_remove returns string soname 'fifo.so';
create function fifo_read returns string soname 'fifo.so';
create function fifo_write returns string soname 'fifo.so';
		

卸载

drop function fifo_create;
drop function fifo_remove;
drop function fifo_read;
drop function fifo_write;
		

2.16.1.5. 插件如何使用

插件有很多种用法,这里仅仅一个例

CREATE TABLE `demo` (
	`id` INT(11) NULL DEFAULT NULL,
	`name` CHAR(10) NULL DEFAULT NULL,
	`mobile` VARCHAR(50) NULL DEFAULT NULL
)
COLLATE='utf8_general_ci'
ENGINE=InnoDB;

INSERT INTO `demo` (`id`, `name`, `mobile`) VALUES
	(1, 'neo', '13113668891'),
	(2, 'jam', '13113668892'),
	(3, 'leo', '13113668893');
		

我们假设有一个demo这样的表,我使用shell写了一个守护进程用于处理数据库送过来的数据

		
#!/bin/bash
########################################
# Homepage: http://netkiller.github.io
# Author: neo <netkiller@msn.com>
########################################
NAME=demo
PIPE=/tmp/myfifo
########################################
LOGFILE=/tmp/$NAME.log
PIDFILE=/tmp/${NAME}.pid
########################################

function start(){
	if [ -f "$PIDFILE" ]; then
		exit 2
	fi

        if [ ! -f "$LOGFILE" ]; then
                > ${LOGFILE}
        fi

	for (( ; ; ))
	do
            while read line
            do
				NOW=$(date '+%Y-%m-%d %H:%M:%S')
				
                echo "[${NOW}] [OK] ${line}" >> ${LOGFILE}

            done < $PIPE
	done &
	echo $! > $PIDFILE
}
function stop(){
  	[ -f $PIDFILE ] && kill `cat $PIDFILE` && rm -rf $PIDFILE
}

case "$1" in
  start)
  	start
	;;
  stop)
  	stop
	;;
  status)
  	ps ax | grep ${0} | grep -v grep | grep -v status
	;;
  restart)
  	stop
	start
	;;
  *)
	echo $"Usage: $0 {start|stop|status|restart}"
	exit 2
esac

exit $?
		
			

启动守护进程

$ ./sms.sh start
$ ./sms.sh status
  596 pts/5    S      0:00 /bin/bash ./sms.sh start
		

监控日志,因为守护进程没有输出,完成人户后写入日志。

$ tail -f /tmp/demo.log
		

开始推送任务

		
mysql> select fifo_write('/tmp/myfifo',concat(mobile,'\r\n')) from demo;
+-------------------------------------------------+
| fifo_write('/tmp/myfifo',concat(mobile,'\r\n')) |
+-------------------------------------------------+
| true                                            |
| true                                            |
| true                                            |
+-------------------------------------------------+
3 rows in set (0.00 sec)
		
			

现在看看日志的变化

$ tail -f /tmp/demo.log
[2013-12-16 14:55:48] [OK] 13113668891
[2013-12-16 14:55:48] [OK] 13113668892
[2013-12-16 14:55:48] [OK] 13113668893
		

我们再将上面的例子使用触发器进一步优化

		
CREATE TABLE `demo_sent` (
	`id` INT(10) UNSIGNED NOT NULL AUTO_INCREMENT,
	`mobile` VARCHAR(50) NOT NULL,
	`status` ENUM('true','false') NOT NULL DEFAULT 'false',
	`ctime` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
	PRIMARY KEY (`id`)
)
COLLATE='utf8_general_ci'
ENGINE=InnoDB

CREATE DEFINER=`dba`@`%` TRIGGER `demo_after_insert` AFTER INSERT ON `demo` FOR EACH ROW BEGIN
	insert into demo_sent(mobile,status) select new.mobile,fifo_write('/tmp/myfifo',concat(new.mobile,'')) as status;
END
		
			

测试

		
mysql> insert into demo(name,mobile) values('jerry','13322993040');
Query OK, 1 row affected (0.05 sec)		
		
			

日志变化

$ tail -f /tmp/demo.log 
[2013-12-16 14:55:48] [OK] 13113668891
[2013-12-16 14:55:48] [OK] 13113668892
[2013-12-16 14:55:48] [OK] 13113668893
[2013-12-16 14:55:48] [OK] 13322993040
		

2.16.1.6. 部署相关问题

我们可以采用主从数据库,将任务放在专用的从库上执行

我们可以创建很多个管道,用于做不同的工作,例如插入一个任务,更新一个任务,发短信一个任务,处理模板与静态化一个任务等等。

2.16.2. 消息队列

这里选择使用ZeroMQ的原因主要考虑的是性能问题,其他MQ方案可能会阻塞数据库。

2.16.2.1. 背景

之前我发表过一篇文章 http://netkiller.github.io/journal/mysql.plugin.fifo.html

该文章中提出了通过fifo 管道,实现数据库与其他进程的通信。属于 IPC 机制(同一个OS/服务器内),后我有采用ZeroMQ重新实现了一个 RPC 机制的方案,同时兼容IPC(跨越OS/服务器)

各种缩写的全称 IPC(IPC :Inter-Process Communication 进程间通信),ITC(ITC : Inter Thread Communication 线程间通信)与RPC(RPC: Remote Procedure Calls远程过程调用)。

支持协议

inproc://my_publisher
tcp://server001:5555
ipc:///tmp/feeds/0
		

2.16.2.2. 应用场景

如果你想处理数据,由于各种原因你不能在程序中实现,你可以使用这个插件。当数据库中的数据发生变化的时候出发某种操作,你可以使用这个插件。

有时候你的项目可能是外包的,项目结束后外包方不会在管你,你有无法改动现有代码,或者根本不敢改。你可以使用这个插件

采用MQ技术对数据库无任何压力,与采用程序处理并无不同,省却了写代码

处理方法,可以采用同步或者异步方式

例 2.2. 发送短信

发送短信、邮件,只需要查询出相应手机号码,发送到MQ的服务端,服务端接收到手机号码后,放入队列中,多线程程序从队列中领取任务,发送短信。

select zmq_client('tcp://localhost:5555',mobile) from demo where subscribed='Y' ...;
			

传递多个参数,可以使用符号分隔

select zmq_client('tcp://localhost:5555',concat(name,',',mobile,', news')) from demo;
select zmq_client('tcp://localhost:5555',concat(name,'|',mobile,'|news')) from demo;
			

json格式

select zmq_client('tcp://localhost:5555',concat('{name:',name,', tel:',mobile,', template:news}')) from demo;
			

建议采用异步方式,MQ端接收到任务立即反馈 “成功”信息,因为我们不太关心是否能发送成功,本身就是盲目性的发送,手机号码是否可用我们无从得知,短信或者邮件的发送到达率不是100%,所以当进入队列后,让程序自行处理,将成功或者失败信息记录到日志中即可。


例 2.3. 处理图片

首先查询出需要处理图片,然后将路径与分辨率传递给MQ另一端的处理程序

select zmq_client('tcp://localhost:5555',concat(image,',800x600}')) from demo;
			

建议采用异步方式,MQ端接收到任务立即反馈 “成功”信息


例 2.4. 身份证号码校验

select zmq_client('tcp://localhost:5555',id_number) from demo;
			

可以采用同步方案,因为MQ款处理几乎不会延迟,直接将处理结构反馈


例 2.5. 静态化案例

情景模拟,你的项目是你个电商项目,采用外包模式开发,项目已经开发完成。外包放不再负责维护,你现在要做静态化。增加该功能,你要检查多处与商品表相关的造作。

于其改代码,不如程序从外部处理,这样更保险。我们只要写一个程序将动态 URL 下载保存成静态即可,当数据发生变化的时候重新下载覆盖即可

CREATE DEFINER=`dba`@`%` TRIGGER `demo_after_insert` AFTER INSERT ON `demo` FOR EACH ROW BEGIN
	select zmq_client('tcp://localhost:5555', NEW.id);
END
CREATE DEFINER=`dba`@`%` TRIGGER `demo_after_update` AFTER UPDATE ON `demo` FOR EACH ROW BEGIN
	select zmq_client('tcp://localhost:5555', NEW.id);
END
CREATE DEFINER=`dba`@`%` TRIGGER `demo_after_delete` AFTER DELETE ON `demo` FOR EACH ROW BEGIN
	select zmq_client('tcp://localhost:5555', NEW.id);
END
			

MQ 另一端的服务会下载http://www.example.com/goods.php?cid=111&id=100, 然后生成html页面,http://www.example.com/111/100.html

插入会新建页面,更新会覆盖页面,删除会删除页面

这样无论商品的价格,属性改变,静态化程序都会做出相应的处理。


例 2.6. 数据同步案例

我们有多个数据库,A 库里面的数据发生变化后,要同步书库到B库,或者处理结果,或者数据转换后写入其他数据库中

方法也是采用触发器或者EVENT处理


2.16.2.3. Mysql plugin

我开发了几个 UDF, 共4个 function

UDF

zmq_client(sockt,message)

sockt .成功返回true,失败返回flase.

有了上面的function后你就可以在begin,commit,rollback 直接穿插使用,实现在事物处理期间做你爱做的事。也可以用在触发器与EVENT定时任务中。

2.16.2.4. plugin 的开发与使用

编译UDF你需要安装下面的软件包

sudo apt-get install pkg-config
sudo apt-get install libmysqlclient-dev

sudo apt-get install gcc gcc-c++ make cmake
		

https://github.com/netkiller/mysql-zmq-plugin

编译udf,最后将so文件复制到 /usr/lib/mysql/plugin/

		
git clone https://github.com/netkiller/mysql-zmq-plugin.git
cd mysql-zmq-plugin

cmake .
make && make install
		
			

装载

create function zmq_client returns string soname 'libzeromq.so';
create function zmq_publish returns string soname 'libzeromq.so';
		

卸载

drop function zmq_client;
drop function zmq_publish;
		

确认安装成功

		
mysql> SELECT * FROM `mysql`.`func` where name like 'zmq%';
+-------------+-----+--------------+----------+
| name        | ret | dl           | type     |
+-------------+-----+--------------+----------+
| zmq_client  |   0 | libzeromq.so | function |
| zmq_publish |   0 | libzeromq.so | function |
+-------------+-----+--------------+----------+
2 rows in set (0.00 sec)
		
			

2.16.2.5. 插件如何使用

插件有很多种用法,这里仅仅一个例

编译zeromq server 测试程序

cd test
cmake .
make
		

启动服务进程

./server
		

发送Hello world!

		
mysql> select zmq_client('tcp://localhost:5555','Hello world!');
+---------------------------------------------------+
| zmq_client('tcp://localhost:5555','Hello world!') |
+---------------------------------------------------+
| Hello world! OK                                   |
+---------------------------------------------------+
1 row in set (0.01 sec)
		
			

查看服务器端是否接收到信息。

$ ./server
Received: Hello world!
		

我们再将上面的例子使用触发器进一步优化

		
mysql> select zmq_client('tcp://localhost:5555',mobile) from demo;
+-------------------------------------------+
| zmq_client('tcp://localhost:5555',mobile) |
+-------------------------------------------+
| 13113668891 OK                            |
| 13113668892 OK                            |
| 13113668893 OK                            |
| 13322993040 OK                            |
| 13588997745 OK                            |
+-------------------------------------------+
5 rows in set (0.03 sec)
		
			

服务器端已经接收到数据库发过来的信息

$ ./server
Received: Hello world!
Received: 13113668891
Received: 13113668892
Received: 13113668893
Received: 13322993040
Received: 13588997745
		

我们可以拼装json或者序列化数据,发送给远端

		
mysql> select zmq_client('tcp://localhost:5555',concat('{name:',name,', tel:',mobile,'}')) from demo;
+------------------------------------------------------------------------------+
| zmq_client('tcp://localhost:5555',concat('{name:',name,', tel:',mobile,'}')) |
+------------------------------------------------------------------------------+
| {name:neo, tel:13113668891} OK                                               |
| {name:jam, tel:13113668892} OK                                               |
| {name:leo, tel:13113668893} OK                                               |
| {name:jerry, tel:13322993040} OK                                             |
| {name:tom, tel:13588997745} OK                                               |
+------------------------------------------------------------------------------+
5 rows in set (0.03 sec)
		
			

返回数据取决于你服务端怎么编写处理程序,你可以返回true/false等等。

触发器以及事务处理,这里就不演示了

2.16.3. 数据库与外界文件

你是是不是在开发中常常遇到,删除了数据库记录后,发现该记录对应的图片没有删除,或者删除了图片,数据库中仍有数据存在,你的网站脏数据(图片)成几何数增长,阅读下文这里为你提供了一个完美决方案。

2.16.3.1. 背景

我以电商网站为例,一般的网站产品数据存放在数据库中,商品图片是上传到文件服务器,然后通过http服务器浏览商品图片。这是最基本的也是最常见做法。

稍复杂的方案是,如果图片数量庞大,会使用分布式文件系统方案。但是这些方案都不能保证数据的完整性,极易产生脏数据(垃圾数据)。脏数据是指当你删除了数据库表中的记录后,图片仍然存在,或者手工删除了图片,而数据库中的记录仍然存在。

将图片放入数据库中存放在BLOB的方法可以解决脏数据问题,典型的案例是公安的身份证系统。但这种方案的前提是,图片不能太大,数量不多,访问量不大。 这显然不适合电商网站。

2009年我在走秀网工作,商品图片与缩图文件900GB到2012离职已经有10TB,每天有成百上千的商品上架下架,很多商品下架后永远不会再上架,这些批量下架的商品数据不会删除,仅仅标记为删除,总是期望以后能继续使用,实际上再也不会有人过问,另一方面随着品类经理频繁更换,员工离职,这些商品会石沉大海,再也无人问均。这些商品所对应的图片也就脏数据主要来源。新的品类经理上任后,会重新拍照,上传新图片。

总之,删除数据库中的数据不能将图片删除就会产生脏数据。很多采用删除数据的时候去检查图片如果存在先删除图片,再删除数据的方法。这种方案也非完美解决方案,存在这图片先被删除,程序出错SQL没有运行,或者反之。

2.16.3.2. 解决思路

如果删除图片能够成为事物处理中的一个环节,所有问题都能迎刃而解,可彻底解决脏数据的烦恼。

2.16.3.3. 解决方案

mysql plugin 开发 udf。我写几个function

UDF

image_check(filename)

检查图片是否存在.

image_remove(filename)

删除图片.

image_rename(oldfile,newfile)

更改图片文件名.

image_md5sum(filename)

md5sum 主要用户图片是否被更改过.

image_move(filename,filename)

移动图片的位置

有了上面的function后你就可以在begin,commit,rollback 直接穿插使用,实现在事物处理期间做你爱做的事。

2.16.3.4. plugin 的开发与使用

编译UDF你需要安装下面的软件包

sudo apt-get install pkg-config
sudo apt-get install libmysqlclient-dev

sudo apt-get install gcc gcc-c++ make automake autoconf
		

https://github.com/netkiller/mysql-image-plugin

编译udf,最后将so文件复制到 /usr/lib/mysql/plugin/

git clone https://github.com/netkiller/mysql-image-plugin.git
cd mysql-image-plugin/src

gcc -I/usr/include/mysql -I./ -fPIC -shared -o image.so image.c
sudo mv image.so /usr/lib/mysql/plugin/
		

装载

create function image_check returns boolean soname 'images.so';
create function image_remove returns boolean soname 'images.so';
create function image_rename returns boolean soname 'images.so';
create function image_md5sum returns string soname 'images.so';
create function image_move returns string soname 'images.so';
		

卸载

drop function image_check;
drop function image_remove;
drop function image_rename;
drop function image_md5sum;
drop function image_move;
		

2.16.3.5. 在事务中使用该插件

插入图片流程,上传图片后,通过插件检查图片是否正确上传,然后插入记录

begin;
IF image_check('/path/to/images.jpg') THEN
	insert into images(product_id,thumbnail,original) values(1000,'thumbnail/path/to/images.jpg','original/path/to/images.jpg');
	commit;
ELSE
	image_remove('/path/to/images.jpg');
END IF
rollback;
		

删除商品采用image_move 方案,当出现异常rollback后还可以还原被删除的图片

begin;
IF image_check('/path/to/images.jpg') THEN
	select thumbnail,original into @thumbnail,@original from images where id='1000' for delete;
	delete from images where id='1000';
	select image_move(@thumbnail,'recycle/path/to/');
	select image_move(@original,'recycle/path/to/');
	commit;
END IF

rollback;
select image_move('recycle/path/to/images.jpg','path/to/images.jpg');
		

我们可以使用EVENT定时删除回收站内的图片

image_remove('recycle/path/to/images.jpg');
		

2.16.3.6. 通过触发器调用图片处理函数

通过触发器更能保证数据完整性

1. insert 触发器的任务: 插入记录的时候通过image_check检查图片是否正常上传,如果非没有上传,数据插入失败。
2. delete 触发器的任务: 检查删除记录的时候,首先去删除图片,删除成功再删除该记录。
		

触发器进一步优化

1. insert 触发器的任务: 插入记录的时候通过image_check检查图片是否正常上传,如果非没有上传,数据插入失败。如果上传成功再做image_md5sum 进行校验100% 正确后插入记录
2. delete 触发器的任务: 检查删除记录的时候,首先去改图片文件名,然后删除该记录,最后删除图片,删除成功。如果中间环境失败 记录会rollback,图片会在次修改文件名改回来。100% 保险
		

2.16.4. Socket 方式

TCP 方式还不如使用现在有的消息队列,所以数据库通过 Socket与应用程序通信,我推荐 UDP 方式。

UDP 有个好处,丢出去就不管了,性能非常好。并且可以实现组播,广播。下面是 UDP的例子

2.16.4.1. UDP

https://github.com/netkiller/mysql-udp-plugin

下载 mysql-udp_sendto-plugin 然后编译安装代码

		
# cmake .
# make && make install
		
			

安装

		
create function udp_sendto returns string soname 'libudp_sendto.so';
		
			

卸载

		
drop function udp_sendto;
		
			

使用演示,首先使用nc命令监听一个UDP端口,用来接收数据库发送过来的数据。数据结构请自行定义。这里仅仅是演示,可以采用json, 逗号分隔等等方式。

		
# nc -luv 4000
		
			

在数据库中使用下面SQL发送数据给应用程序

		
select udp_sendto('192.168.2.1','4000','hello');