Home | 简体中文 | 繁体中文 | 杂文 | 打赏(Donations) | ITEYE 博客 | OSChina 博客 | Facebook | Linkedin | 知乎专栏 | Search | About

第 33 章 Message Queuing & RPC

目录

33.1. RabbitMQ
33.1.1. 安装 RabbitMQ
33.1.1.1. Ubuntu
33.1.1.2. CentOS
33.1.1.3. OSCM 一键安装
33.1.1.4. 检查端口
33.1.2. rabbitmqctl - command line tool for managing a RabbitMQ broker
33.1.2.1. change_password
33.1.2.2. list_users
33.1.2.3. 虚拟机管理
33.1.2.4. list_queues
33.1.2.5. list_exchanges
33.1.3. rabbitmq-plugins - command line tool for managing RabbitMQ broker plugins
33.1.3.1. rabbitmq_management
33.1.4. Python - Pika
33.1.5. Ruby amqp
33.2. ZeroMQ
33.2.1. python-zeromq
33.2.1.1. pyzmq
33.2.1.2. example
33.2.2. ruby zmq
33.3. nanomsg
33.4. Gearman
33.4.1. Getting Started with Gearman
33.4.1.1. CentOS
33.4.1.2. Ubuntu
33.4.1.3. 防火墙设置
33.4.2. gearman
33.4.3. Gearman PHP Extension
33.5. Apache Kafka is a distributed publish-subscribe messaging system
33.5.1. 安装 Kafka用于开发与测试环境
33.5.2. 安装 Kafka 适用于 IDC
33.5.3. Kafka 日志
33.5.4. 测试 Kafka
33.5.5. 配置 Kafka
33.5.5.1. 外网访问
33.5.5.2. group.id
33.5.6. 管理 Kafka
33.5.7. FAQ
33.5.7.1. WARN Error while fetching metadata with correlation id 1 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
33.6. Celery
33.7. ActiveMQ
33.8. http://kr.github.io/beanstalkd/
33.9. gRPC

33.1. RabbitMQ

RabbitMQ

33.1.1. 安装 RabbitMQ

running on 127.0.0.1 (localhost) on port 5672 (standard AMQP port).

33.1.1.1. Ubuntu

$ sudo apt-get install rabbitmq-server
			

33.1.1.2. CentOS

# yum install -y rabbitmq-server
# chkconfig rabbitmq-server on
# service rabbitmq-server start
			

添加用户, 添加权限, 删除guest用户

# rabbitmqctl add_user rabbit password
# rabbitmqctl set_permissions -p "/" rabbit ".*" ".*" ".*"
# rabbitmqctl delete_user guest
			

33.1.1.3. OSCM 一键安装

curl -s https://raw.githubusercontent.com/oscm/shell/master/mq/rabbitmq/rabbitmq-server-3.6.10.sh | bash

rabbitmqctl add_user admin admin123
rabbitmqctl set_user_tags admin administrator
rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
			

33.1.1.4. 检查端口

[root@netkiller ~]# ss -lnt | grep 5672
LISTEN     0      128          *:25672                    *:*                  
LISTEN     0      128         :::5672                    :::*
			

33.1.2. rabbitmqctl - command line tool for managing a RabbitMQ broker

rabbitmqctl status
		

33.1.2.1. change_password

			
rabbitmqctl change_password admin <new_password>
			
			

33.1.2.2. list_users

# rabbitmqctl list_users
Listing users ...
guest	[administrator]
...done.
			

33.1.2.3. 虚拟机管理

$ rabbitmqctl add_vhost test
$ rabbitmqctl add_user testuser password
$ rabbitmqctl set_permissions -p test testuser ".*" ".*" ".*"			
			

33.1.2.4. list_queues

# rabbitmqctl list_queues
Listing queues ...
amq.gen-RhBwbb9EdZ8Fgk_heGZQ2w	0
bb	0
customer	276930
demo	0
email	0
example	0
hello	1
members_id	282
new_members_id	0
q_linvo	0
real	0
...done.
			

			
			

33.1.2.5. list_exchanges

# rabbitmqctl list_exchanges
Listing exchanges ...
	direct
amq.direct	direct
amq.fanout	fanout
amq.headers	headers
amq.match	headers
amq.rabbitmq.log	topic
amq.rabbitmq.trace	topic
amq.topic	topic
email	direct
...done.
			

33.1.3. rabbitmq-plugins - command line tool for managing RabbitMQ broker plugins

启用插件

rabbitmq-plugins enable rabbitmq_management		
		

33.1.3.1. rabbitmq_management

RabbitMQ Management HTTP API (https://cdn.rawgit.com/rabbitmq/rabbitmq-management/rabbitmq_v3_6_0/priv/www/api/index.html)

启用插件 Management and Monitoring 插件

rabbitmq-plugins enable rabbitmq_management
systemctl restart rabbitmq-server			
			
# curl -u guest:guest http://localhost:15672/api/overview
{"management_version":"3.3.5","statistics_level":"fine","exchange_types":[{"name":"topic","description":"AMQP topic exchange, as per the AMQP specification","enabled":true},{"name":"fanout","description":"AMQP fanout exchange, as per the AMQP specification","enabled":true},{"name":"direct","description":"AMQP direct exchange, as per the AMQP specification","enabled":true},{"name":"headers","description":"AMQP headers exchange, as per the AMQP specification","enabled":true}],"rabbitmq_version":"3.3.5","cluster_name":"rabbit@iZ623qr3xctZ","erlang_version":"R16B03-1","erlang_full_version":"Erlang R16B03-1 (erts-5.10.4) [source] [64-bit] [smp:8:8] [async-threads:30] [hipe] [kernel-poll:true]","message_stats":{},"queue_totals":{"messages":0,"messages_details":{"rate":0.0},"messages_ready":0,"messages_ready_details":{"rate":0.0},"messages_unacknowledged":0,"messages_unacknowledged_details":{"rate":0.0}},"object_totals":{"consumers":1,"queues":3,"exchanges":10,"connections":1,"channels":1},"node":"rabbit@iZ623qr3xctZ","statistics_db_node":"rabbit@iZ623qr3xctZ","listeners":[{"node":"rabbit@iZ623qr3xctZ","protocol":"amqp","ip_address":"::","port":5672},{"node":"rabbit@iZ623qr3xctZ","protocol":"clustering","ip_address":"::","port":25672}],"contexts":[{"node":"rabbit@iZ623qr3xctZ","description":"RabbitMQ Management","path":"/","port":15672}]}			
			

vhosts

# curl -u guest:guest http://localhost:15672/api/vhosts
[{"messages":0,"messages_details":{"rate":0.0},"messages_ready":0,"messages_ready_details":{"rate":0.0},"messages_unacknowledged":0,"messages_unacknowledged_details":{"rate":0.0},"recv_oct":617,"recv_oct_details":{"rate":0.0},"send_oct":625,"send_oct_details":{"rate":0.0},"name":"/","tracing":false}]			
			

queues

			
# curl -s -u guest:guest http://localhost:15672/api/queues/%2f/example | sed 's/,/,\n/g'
{"message_stats":{"ack":817,
"ack_details":{"rate":0.8},
"deliver":829,
"deliver_details":{"rate":0.8},
"deliver_get":829,
"deliver_get_details":{"rate":0.8},
"publish":33700,
"publish_details":{"rate":22.4},
"redeliver":9,
"redeliver_details":{"rate":0.0}},
"messages":32884,
"messages_details":{"rate":39.2},
"messages_ready":32881,
"messages_ready_details":{"rate":39.2},
"messages_unacknowledged":3,
"messages_unacknowledged_details":{"rate":0.0},
"policy":"",
"exclusive_consumer_tag":"",
"consumers":1,
"consumer_utilisation":0.00005551817727208515,
"memory":34387224,
"backing_queue_status":{"q1":0,
"q2":0,
"delta":["delta",
0,
0,
0],
"q3":0,
"q4":32881,
"len":32881,
"pending_acks":3,
"target_ram_count":"infinity",
"ram_msg_count":32881,
"ram_ack_count":3,
"next_seq_id":33700,
"persistent_count":0,
"avg_ingress_rate":31.071205055112543,
"avg_egress_rate":0.7083061832348867,
"avg_ack_ingress_rate":0.7083061832348867,
"avg_ack_egress_rate":0.7083061832348867},
"state":"running",
"incoming":[{"stats":{"publish":33700,
"publish_details":{"rate":22.4}},
"exchange":{"name":"email",
"vhost":"/"}}],
"deliveries":[{"stats":{"redeliver":3,
"redeliver_details":{"rate":0.0},
"deliver_get":348,
"deliver_get_details":{"rate":0.8},
"deliver":348,
"deliver_details":{"rate":0.8},
"ack":345,
"ack_details":{"rate":0.8}},
"channel_details":{"name":"127.0.0.1:41033 -> 127.0.0.1:5672 (1)",
"number":1,
"connection_name":"127.0.0.1:41033 -> 127.0.0.1:5672",
"peer_port":41033,
"peer_host":"127.0.0.1"}}],
"consumer_details":[{"channel_details":{"name":"127.0.0.1:41033 -> 127.0.0.1:5672 (1)",
"number":1,
"connection_name":"127.0.0.1:41033 -> 127.0.0.1:5672",
"peer_port":41033,
"peer_host":"127.0.0.1"},
"queue":{"name":"example",
"vhost":"/"},
"consumer_tag":"amq.ctag-6BSkZzt3eWgBG5Jn2nl4QA",
"exclusive":false,
"ack_required":true,
"prefetch_count":3,
"arguments":{}}],
"name":"example",
"vhost":"/",
"durable":true,
"auto_delete":false,
"arguments":{},
"node":"rabbit@iZ623qr3xctZ"}		
				
			

33.1.4. Python - Pika

http://pika.github.com/

sudo apt-get install python-setuptools python-pip git-core
sudo pip install pika

sudo easy_install pika
		

33.1.5. Ruby amqp

$ sudo gem install amqp
		

例 33.1. Ruby on RabbitMQ

subscriber.rb

$ cat subscriber.rb
require 'rubygems'
require 'amqp'

EM.run {
  amq = MQ.new
  amq.queue("logins").subscribe do |login|
    puts login
  end
}
			

producer.rb

$ cat producer.rb
require 'rubygems'
require 'amqp'

EM.run {
  amq = MQ.new
  queue = amq.queue("logins")
  %w[scott nic robi].each { |login|
      queue.publish(login)
  }
}
			

test

$ ruby subscriber.rb

$ ruby producer.rb