Home | 简体中文 | 繁体中文 | 杂文 | 打赏(Donations) | Github | OSChina 博客 | 云社区 | 云栖社区 | Facebook | Linkedin | 知乎专栏 | 视频教程 | About

第 140 章 Message Queuing & RPC

目录

140.1. RabbitMQ
140.1.1. 安装 RabbitMQ
140.1.1.1. Ubuntu
140.1.1.2. CentOS
140.1.1.3. OSCM 一键安装
140.1.1.4. 检查端口
140.1.2. 配置 RabbitMQ
140.1.2.1. 监听所有适配器地址
140.1.3. rabbitmqctl - command line tool for managing a RabbitMQ broker
140.1.3.1. change_password
140.1.3.2. list_users
140.1.3.3. 虚拟机管理
140.1.3.4. list_queues
140.1.3.5. list_exchanges
140.1.4. rabbitmq-plugins - command line tool for managing RabbitMQ broker plugins
140.1.4.1. rabbitmq_management
140.1.5. Python - Pika
140.1.6. Ruby amqp
140.2. ZeroMQ
140.2.1. python-zeromq
140.2.1.1. pyzmq
140.2.1.2. example
140.2.2. ruby zmq
140.3. nanomsg
140.4. Gearman
140.4.1. Getting Started with Gearman
140.4.1.1. CentOS
140.4.1.2. Ubuntu
140.4.1.3. 防火墙设置
140.4.2. gearman
140.4.3. Gearman PHP Extension
140.5. Apache Kafka is a distributed publish-subscribe messaging system
140.5.1. 安装 Kafka
140.5.1.1. 安装 Kafka用于开发与测试环境
140.5.1.2. 安装 Kafka 适用于 IDC
140.5.1.3. Kafka 日志
140.5.1.4. 检查 Kafka 线程
140.5.2. 测试 Kafka
140.5.3. 配置 Kafka
140.5.3.1. server.properties
140.5.3.1.1. 外网访问
140.5.3.2. consumer.properties
140.5.3.2.1. group.id
140.5.3.3. producer.properties
140.5.4. 管理 Kafka
140.5.5. FAQ
140.5.5.1. WARN Error while fetching metadata with correlation id 1 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
140.5.5.2. Error while executing topic command : Replication factor: 1 larger than available brokers: 0.
140.5.5.3. WARN Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
140.6. Celery
140.7. ActiveMQ
140.8. http://kr.github.io/beanstalkd/
140.9. gRPC

140.1. RabbitMQ

RabbitMQ

140.1.1. 安装 RabbitMQ

running on 127.0.0.1 (localhost) on port 5672 (standard AMQP port).

140.1.1.1. Ubuntu

			
$ sudo apt-get install rabbitmq-server			
			

			

140.1.1.2. CentOS

			
# yum install -y rabbitmq-server
# chkconfig rabbitmq-server on
# service rabbitmq-server start			
			

			

添加用户, 添加权限, 删除guest用户

			
# rabbitmqctl add_user rabbit password
# rabbitmqctl set_permissions -p "/" rabbit ".*" ".*" ".*"
# rabbitmqctl delete_user guest			
			

			

140.1.1.3. OSCM 一键安装

			
curl -s https://raw.githubusercontent.com/oscm/shell/master/mq/rabbitmq/rabbitmq-server-3.6.10.sh | bash

rabbitmqctl add_user admin admin123
rabbitmqctl set_user_tags admin administrator
rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"			
			
			

140.1.1.4. 检查端口

			
[root@netkiller ~]# ss -lnt | grep 5672
LISTEN 0 128 *:25672 *:*
LISTEN 0 128 :::5672 :::*			
						
			

140.1.2. 配置 RabbitMQ

创建配置文件,默认情况/etc/rabbitmq/下面什么都没有。你需要从共享文档中复制一份配置文件过去。

		
cp /usr/share/doc/rabbitmq-server-3.6.10/rabbitmq.config.example /etc/rabbitmq/rabbitmq.config
		
		

140.1.2.1. 监听所有适配器地址

默认 RabbitMQ 监听 localhost 如果你需要让外部机器连接进来,需要配置 tcp_listeners 0.0.0.0

			
 {tcp_listeners, [{"0.0.0.0",5672}]}		
			
			

140.1.3. rabbitmqctl - command line tool for managing a RabbitMQ broker

			rabbitmqctl status
		

140.1.3.1. change_password

			
rabbitmqctl change_password admin <new_password>
			
			

140.1.3.2. list_users

			
# rabbitmqctl list_users
Listing users ...
guest [administrator]
...done.			
			
			

140.1.3.3. 虚拟机管理

			
$ rabbitmqctl add_vhost test
$ rabbitmqctl add_user testuser password
$ rabbitmqctl set_permissions -p test testuser ".*" ".*" ".*"			
						
			

140.1.3.4. list_queues

			
# rabbitmqctl list_queues
Listing queues ...
amq.gen-RhBwbb9EdZ8Fgk_heGZQ2w 0
bb 0
customer 276930
demo 0
email 0
example 0
hello 1
members_id 282
new_members_id 0
q_linvo 0
real 0
...done.
				
			

			
			

140.1.3.5. list_exchanges

			
# rabbitmqctl list_exchanges
Listing exchanges ...
direct
amq.direct direct
amq.fanout fanout
amq.headers headers
amq.match headers
amq.rabbitmq.log topic
amq.rabbitmq.trace topic
amq.topic topic
email direct
...done.
			
			

140.1.4. rabbitmq-plugins - command line tool for managing RabbitMQ broker plugins

启用插件

		
rabbitmq-plugins enable rabbitmq_management		
		
		

140.1.4.1. rabbitmq_management

RabbitMQ Management HTTP API (https://cdn.rawgit.com/rabbitmq/rabbitmq-management/rabbitmq_v3_6_0/priv/www/api/index.html)

启用插件 Management and Monitoring 插件

						
rabbitmq-plugins enable rabbitmq_management
systemctl restart rabbitmq-server
				
			
			
# curl -u guest:guest http://localhost:15672/api/overview
{"management_version":"3.3.5","statistics_level":"fine","exchange_types":[{"name":"topic","description":"AMQP topic exchange, as per the AMQP specification","enabled":true},{"name":"fanout","description":"AMQP fanout exchange, as per the AMQP specification","enabled":true},{"name":"direct","description":"AMQP direct exchange, as per the AMQP specification","enabled":true},{"name":"headers","description":"AMQP headers exchange, as per the
AMQP specification","enabled":true}],"rabbitmq_version":"3.3.5","cluster_name":"rabbit@iZ623qr3xctZ","erlang_version":"R16B03-1","erlang_full_version":"Erlang R16B03-1
(erts-5.10.4) [source] [64-bit] [smp:8:8] [async-threads:30] [hipe]
[kernel-poll:true]","message_stats":{},"queue_totals":{"messages":0,"messages_details":{"rate":0.0},"messages_ready":0,"messages_ready_details":{"rate":0.0},"messages_unacknowledged":0,"messages_unacknowledged_details":{"rate":0.0}},"object_totals":{"consumers":1,"queues":3,"exchanges":10,"connections":1,"channels":1},"node":"rabbit@iZ623qr3xctZ","statistics_db_node":"rabbit@iZ623qr3xctZ","listeners":[{"node":"rabbit@iZ623qr3xctZ","protocol":"amqp","ip_address":"::","port":5672},{"node":"rabbit@iZ623qr3xctZ","protocol":"clustering","ip_address":"::","port":25672}],"contexts":[{"node":"rabbit@iZ623qr3xctZ","description":"RabbitMQ
Management","path":"/","port":15672}
				
			

vhosts

			
	# curl -u guest:guest http://localhost:15672/api/vhosts
				[{"messages":0,"messages_details":{"rate":0.0},"messages_ready":0,"messages_ready_details":{"rate":0.0},"messages_unacknowledged":0,"messages_unacknowledged_details":{"rate":0.0},"recv_oct":617,"recv_oct_details":{"rate":0.0},"send_oct":625,"send_oct_details":{"rate":0.0},"name":"/","tracing":false}]			
			
			

queues

			
# curl -s -u guest:guest http://localhost:15672/api/queues/%2f/example | sed 's/,/,\n/g'
{"message_stats":{"ack":817,
"ack_details":{"rate":0.8},
"deliver":829,
"deliver_details":{"rate":0.8},
"deliver_get":829,
"deliver_get_details":{"rate":0.8},
"publish":33700,
"publish_details":{"rate":22.4},
"redeliver":9,
"redeliver_details":{"rate":0.0}},
"messages":32884,
"messages_details":{"rate":39.2},
"messages_ready":32881,
"messages_ready_details":{"rate":39.2},
"messages_unacknowledged":3,
"messages_unacknowledged_details":{"rate":0.0},
"policy":"",
"exclusive_consumer_tag":"",
"consumers":1,
"consumer_utilisation":0.00005551817727208515,
"memory":34387224,
"backing_queue_status":{"q1":0,
"q2":0,
"delta":["delta",
0,
0,
0],
"q3":0,
"q4":32881,
"len":32881,
"pending_acks":3,
"target_ram_count":"infinity",
"ram_msg_count":32881,
"ram_ack_count":3,
"next_seq_id":33700,
"persistent_count":0,
"avg_ingress_rate":31.071205055112543,
"avg_egress_rate":0.7083061832348867,
"avg_ack_ingress_rate":0.7083061832348867,
"avg_ack_egress_rate":0.7083061832348867},
"state":"running",
"incoming":[{"stats":{"publish":33700,
"publish_details":{"rate":22.4}},
"exchange":{"name":"email",
"vhost":"/"}}],
"deliveries":[{"stats":{"redeliver":3,
"redeliver_details":{"rate":0.0},
"deliver_get":348,
"deliver_get_details":{"rate":0.8},
"deliver":348,
"deliver_details":{"rate":0.8},
"ack":345,
"ack_details":{"rate":0.8}},
"channel_details":{"name":"127.0.0.1:41033 -> 127.0.0.1:5672 (1)",
"number":1,
"connection_name":"127.0.0.1:41033 -> 127.0.0.1:5672",
"peer_port":41033,
"peer_host":"127.0.0.1"}}],
"consumer_details":[{"channel_details":{"name":"127.0.0.1:41033 -> 127.0.0.1:5672 (1)",
"number":1,
"connection_name":"127.0.0.1:41033 -> 127.0.0.1:5672",
"peer_port":41033,
"peer_host":"127.0.0.1"},
"queue":{"name":"example",
"vhost":"/"},
"consumer_tag":"amq.ctag-6BSkZzt3eWgBG5Jn2nl4QA",
"exclusive":false,
"ack_required":true,
"prefetch_count":3,
"arguments":{}}],
"name":"example",
"vhost":"/",
"durable":true,
"auto_delete":false,
"arguments":{},
"node":"rabbit@iZ623qr3xctZ"}		
			
			

140.1.5. Python - Pika

http://pika.github.com/

		
sudo apt-get install python-setuptools python-pip git-core
sudo pip install pika

sudo easy_install pika		
		
			
		

140.1.6. Ruby amqp

		
$ sudo gem install amqp		
		
		

例 140.1. Ruby on RabbitMQ

subscriber.rb

			
$ cat subscriber.rb
require 'rubygems'
require 'amqp'

EM.run {
amq = MQ.new
amq.queue("logins").subscribe do |login|
puts login
end
}			
			
			

producer.rb

			
$ cat producer.rb
require 'rubygems'
require 'amqp'

EM.run {
amq = MQ.new
queue = amq.queue("logins")
%w[scott nic robi].each { |login|
queue.publish(login)
}
}			
			
			

test

			
$ ruby subscriber.rb
$ ruby producer.rb