菜单 学习猿地 - LMONKEY

VIP

开通学习猿地VIP

尊享10项VIP特权 持续新增

知识通关挑战

打卡带练!告别无效练习

接私单赚外块

VIP优先接,累计金额超百万

学习猿地私房课免费学

大厂实战课仅对VIP开放

你的一对一导师

每月可免费咨询大牛30次

领取更多软件工程师实用特权

入驻
313
0

AMQP 0-9-1 Model Explained Why does the queue memory grow and shrink when publishing/consuming? AMQP和AMQP Protocol的是整体和部分的关系 RabbitMQ speaks multiple protocols.

原创
05/13 14:22
阅读数 2574

 AMQP 0-9-1 Model Explained — RabbitMQ http://next.rabbitmq.com/tutorials/amqp-concepts.html

AMQP 0-9-1 Model Explained

About This Guide

This guide provides an overview of the the AMQP 0-9-1 protocol, one of the protocols supported by RabbitMQ.

High-level Overview of AMQP 0-9-1 and the AMQP Model

What is AMQP 0-9-1?

AMQP 0-9-1 (Advanced Message Queuing Protocol) is a messaging protocol that enables conforming client applications to communicate with conforming messaging middleware brokers.

Brokers and Their Role

Messaging brokers receive messages from publishers (applications that publish them, also known as producers) and route them to consumers (applications that process them).

Since it is a network protocol, the publishers, consumers and the broker can all reside on different machines.

AMQP 0-9-1 Model in Brief

The AMQP 0-9-1 Model has the following view of the world: messages are published toexchanges, which are often compared to post offices or mailboxes. Exchanges then distribute message copies to queues using rules called bindings. Then AMQP brokers either deliver messages to consumers subscribed to queues, or consumers fetch/pull messages from queues on demand.

 

 

 

publisher发布者--->messages消息--->exchages邮局,信箱---->queues队列 ...consumers消费者

 

exchange邮局按照一定的规则rules/bindings将消息message复制copy到队列queue

 

AMQPbrokers将消息发送deliver给订阅队列queue的消费者consumer

或者

消费者consumer按照需要on demand去拉取fetch/pull消息messages

 

【对代理不透明部分】

消息的元数据中对代理brocker:透明opaque、不透明部分

 

When publishing a message, publishers may specify various message attributes (message meta-data). Some of this meta-data may be used by the broker, however, the rest of it is completely opaque to the broker and is only used by applications that receive the message.

message acknowledgements

消费者在收到消息后,可以发送消息确认,也可以不发送;如发送,则代理broker将消息从队列中删除remove

Networks are unreliable and applications may fail to process messages therefore the AMQP model has a notion of message acknowledgements: when a message is delivered to a consumer the consumer notifies the broker, either automatically or as soon as the application developer chooses to do so. When message acknowledgements are in use, a broker will only completely remove a message from a queue when it receives a notification for that message (or group of messages).

【没有接收路由的情况dead letter queue】

在消息message不能被路由routed的情况下,消息被broker投入死信队列dead letter queue;这种情况的处理由发布者publisher决定

In certain situations, for example, when a message cannot be routed, messages may bereturned to publishers, dropped, or, if the broker implements an extension, placed into a so-called "dead letter queue". Publishers choose how to handle situations like this by publishing messages using certain parameters.

Queues, exchanges and bindings are collectively referred to as AMQP entities.

 

【Queues, exchanges and bindings are collectively referred to as AMQP entities

 

 

AMQP is a Programmable Protocol

AMQP 0-9-1 is a programmable protocol in the sense that AMQP 0-9-1 entities and routing schemes are primarily defined by applications themselves, not a broker administrator. Accordingly, provision is made for protocol operations that declare queues and exchanges, define bindings between them, subscribe to queues and so on.

This gives application developers a lot of freedom but also requires them to be aware of potential definition conflicts. In practice, definition conflicts are rare and often indicate a misconfiguration.

【删除邮局、队列的理论依据】

Applications declare the AMQP 0-9-1 entities that they need, define necessary routing schemes and may choose to delete AMQP 0-9-1 entities when they are no longer used.

 

【协议是国际标准】

 Home | AMQP http://www.amqp.org/

Advanced Message Queuing Protocol 1.0 approved as an International Standard

Click above for the press release.  The International Standard (ISO/IEC 19464) can be down loadedhere.

See this presentation to learn more about AMQP and its value.

 

Advanced Message Queuing Protocol 1.0 becomes OASIS Standard

Click above for the press release.  The Standard can be down loaded here.

See the executive briefing paper on the value proposition of OASIS AMQP to learn more.

AMQP_百度百科 https://baike.baidu.com/item/AMQP/8354716?fr=aladdin

 

【动手所得】


1、流程启动server;
2、配置用户权限;
3、外网/内网发布、接收消息;

systemctl start rabbitmq-server

/是rabbitmq默认的虚拟机,之前默认连接的都是它
创建一个用户
rabbitmqctl add_user username password
为用户分配角色
rabbitmqctl set_user_tags username administrator
#Tags 可以是:administrator, monitoring, management
设置访问权限
rabbitmqctl set_permissions -p vhostname username ".*" ".*" ".*"

 

【文档】

http://www.rabbitmq.com/admin-guide.html

http://www.rabbitmq.com/clients.html

http://www.rabbitmq.com/man/rabbitmqctl.8.html http://www.rabbitmq.com/man/rabbitmqctl.8.html

NAME

rabbitmqctl — command line for managing a RabbitMQ broker

SYNOPSIS

rabbitmqctl [-q] [-l] [-n node] [-t timeoutcommand [command_options]

DESCRIPTION

RabbitMQ is a multi-protocol open source messaging broker.

 

rabbitmqctl is a command line tool for managing a RabbitMQ broker. It performs all actions by connecting to one of the broker's nodes.

 

Diagnostic information is displayed if the broker was not running, could not be reached, or rejected the connection due to mismatching Erlang cookies.

OPTIONS

-n node
Default node is “rabbit@server”, where server is the local host. On a host named “myserver.example.com”, the node name of the RabbitMQ Erlang node will usually be “rabbit@myserver” (unless RABBITMQ_NODENAME has been set to some non-default value at broker startup time). The output of “hostname -s” is usually the correct suffix to use after the “@” sign. See rabbitmq-server(8) for details of configuring the RabbitMQ broker.
-q--quiet
Quiet output mode is selected. Informational messages are suppressed when quiet mode is in effect.
--dry-run
Do not run the command. Only print information message.
-t timeout--timeout timeout
Operation timeout in seconds. Only applicable to “list” commands. Default is infinity.
-l--longnames
Use longnames for erlang distribution. If RabbitMQ broker uses long node names for erlang distribution, the option must be specified.
--erlang-cookie cookie
Erlang distribution cookie. If RabbitMQ node is using a custom erlang cookie value, the cookie value must be set vith this parameter.

COMMANDS

help [-l] [command_name]
Prints usage for all available commands.
-l--list-commands
List command usages only, without parameter explanation.
command_name
Prints usage for the specified command.

Application Management

force_reset
Forcefully returns a RabbitMQ node to its virgin state.
 
The force_reset command differs from reset in that it resets the node unconditionally, regardless of the current management database state and cluster configuration. It should only be used as a last resort if the database or cluster configuration has been corrupted.
 
For reset and force_reset to succeed the RabbitMQ application must have been stopped, e.g. with stop_app.
 
For example, to reset the RabbitMQ node:
 
rabbitmqctl force_reset
hipe_compile directory
Performs HiPE-compilation and caches resulting .beam-files in the given directory.
 
Parent directories are created if necessary. Any existing .beam files from the directory are automatically deleted prior to compilation.
 
To use this precompiled files, you should set RABBITMQ_SERVER_CODE_PATH environment variable to directory specified in hipe_compile invokation.
 
For example, to HiPE-compile modules and store them to /tmp/rabbit-hipe/ebin directory:
 
rabbitmqctl hipe_compile /tmp/rabbit-hipe/ebin
reset
Returns a RabbitMQ node to its virgin state.
 
Removes the node from any cluster it belongs to, removes all data from the management database, such as configured users and vhosts, and deletes all persistent messages.
 
For reset and force_reset to succeed the RabbitMQ application must have been stopped, e.g. with stop_app.
 
For example, to resets the RabbitMQ node:
 
rabbitmqctl reset
rotate_logs
Instructs the RabbitMQ node to perform internal log rotation.
 
Log rotation is performed according to lager settings specified in configuration file.
 
Note that there is no need to call this command in case of external log rotation (e.g. from logrotate(8)), because lager detects renames and automatically reopens log files.
 
For example, this command starts internal log rotation process:
 
rabbitmqctl rotate_logs
 
Rotation is performed asynchronously, so there is no guarantee that it will be completed when this command returns.
shutdown
Shuts down the Erlang process on which RabbitMQ is running. The command is blocking and will return after the Erlang process exits. If RabbitMQ fails to stop, it will return a non-zero exit code.
 
Unlike the stop command, the shutdown command:
  • does not require a pid_file to wait for the Erlang process to exit
  • returns a non-zero exit code if RabbitMQ node is not running
 
For example, to shut down the Erlang process on which RabbitMQ is running:
 
rabbitmqctl shutdown
start_app
Starts the RabbitMQ application.
 
This command is typically run after performing other management actions that required the RabbitMQ application to be stopped, e.g. reset.
 
For example, to instruct the RabbitMQ node to start the RabbitMQ application:
 
rabbitmqctl start_app
stop [pid_file]
Stops the Erlang node on which RabbitMQ is running. To restart the node follow the instructions for “Running the Server” in the installation guide.
 
If a pid_file is specified, also waits for the process specified there to terminate. See the description of the wait command for details on this file.
 
For example, to instruct the RabbitMQ node to terminate:
 
rabbitmqctl stop
stop_app
Stops the RabbitMQ application, leaving the Erlang node running.
 
This command is typically run prior to performing other management actions that require the RabbitMQ application to be stopped, e.g. reset.
 
For example, to instruct the RabbitMQ node to stop the RabbitMQ application:
 
rabbitmqctl stop_app
wait pid_filewait --pid pid
Waits for the RabbitMQ application to start.
 
This command will wait for the RabbitMQ application to start at the node. It will wait for the pid file to be created if pidfile is specified, then for a process with a pid specified in the pid file or the --pid argument, and then for the RabbitMQ application to start in that process. It will fail if the process terminates without starting the RabbitMQ application.
 
If the specified pidfile is not created or erlang node is not started within --timeout the command will fail. Default timeout is 10 seconds.
 
A suitable pid file is created by the rabbitmq-server(8) script. By default this is located in the Mnesia directory. Modify the RABBITMQ_PID_FILE environment variable to change the location.
 
For example, this command will return when the RabbitMQ node has started up:
 
rabbitmqctl wait /var/run/rabbitmq/pid

Cluster Management

join_cluster clusternode [--ram]
clusternode
Node to cluster with.
--ram
If provided, the node will join the cluster as a RAM node.
 
Instructs the node to become a member of the cluster that the specified node is in. Before clustering, the node is reset, so be careful when using this command. For this command to succeed the RabbitMQ application must have been stopped, e.g. with stop_app.
 
Cluster nodes can be of two types: disc or RAM. Disc nodes replicate data in RAM and on disc, thus providing redundancy in the event of node failure and recovery from global events such as power failure across all nodes. RAM nodes replicate data in RAM only (with the exception of queue contents, which can reside on disc if the queue is persistent or too big to fit in memory) and are mainly used for scalability. RAM nodes are more performant only when managing resources (e.g. adding/removing queues, exchanges, or bindings). A cluster must always have at least one disc node, and usually should have more than one.
 
The node will be a disc node by default. If you wish to create a RAM node, provide the --ram flag.
 
After executing the join_cluster command, whenever the RabbitMQ application is started on the current node it will attempt to connect to the nodes that were in the cluster when the node went down.
 
To leave a cluster, reset the node. You can also remove nodes remotely with the forget_cluster_node command.
 
For more details see the Clustering guide.
 
For example, this command instructs the RabbitMQ node to join the cluster that “hare@elena” is part of, as a ram node:
 
rabbitmqctl join_cluster hare@elena --ram
cluster_status
Displays all the nodes in the cluster grouped by node type, together with the currently running nodes.
 
For example, this command displays the nodes in the cluster:
 
rabbitmqctl cluster_status
change_cluster_node_type type
Changes the type of the cluster node.
 
The type must be one of the following:
 
The node must be stopped for this operation to succeed, and when turning a node into a RAM node the node must not be the only disc node in the cluster.
 
For example, this command will turn a RAM node into a disc node:
 
rabbitmqctl change_cluster_node_type disc
forget_cluster_node [--offline]
--offline
Enables node removal from an offline node. This is only useful in the situation where all the nodes are offline and the last node to go down cannot be brought online, thus preventing the whole cluster from starting. It should not be used in any other circumstances since it can lead to inconsistencies.
 
Removes a cluster node remotely. The node that is being removed must be offline, while the node we are removing from must be online, except when using the --offline flag.
 
When using the --offline flag , rabbitmqctl will not attempt to connect to a node as normal; instead it will temporarily become the node in order to make the change. This is useful if the node cannot be started normally. In this case the node will become the canonical source for cluster metadata (e.g. which queues exist), even if it was not before. Therefore you should use this command on the latest node to shut down if at all possible.
 
For example, this command will remove the node “rabbit@stringer” from the node “hare@mcnulty”:
 
rabbitmqctl -n hare@mcnulty forget_cluster_node rabbit@stringer
rename_cluster_node oldnode1 newnode1 [oldnode2 newnode2 ...]
Supports renaming of cluster nodes in the local database.
 
This subcommand causes rabbitmqctl to temporarily become the node in order to make the change. The local cluster node must therefore be completely stopped; other nodes can be online or offline.
 
This subcommand takes an even number of arguments, in pairs representing the old and new names for nodes. You must specify the old and new names for this node and for any other nodes that are stopped and being renamed at the same time.
 
It is possible to stop all nodes and rename them all simultaneously (in which case old and new names for all nodes must be given to every node) or stop and rename nodes one at a time (in which case each node only needs to be told how its own name is changing).
 
For example, this command will rename the node “rabbit@misshelpful” to the node “rabbit@cordelia”
 
rabbitmqctl rename_cluster_node rabbit@misshelpful rabbit@cordelia
update_cluster_nodes clusternode
clusternode
The node to consult for up-to-date information.
 
Instructs an already clustered node to contact clusternode to cluster when waking up. This is different from join_cluster since it does not join any cluster - it checks that the node is already in a cluster with clusternode.
 
The need for this command is motivated by the fact that clusters can change while a node is offline. Consider the situation in which node A and B are clustered. A goes down, C clusters with B, and then B leaves the cluster. When Awakes up, it'll try to contact B, but this will fail since B is not in the cluster anymore. The following command will solve this situation:
 
update_cluster_nodes -n A C
force_boot
Ensures that the node will start next time, even if it was not the last to shut down.
 
Normally when you shut down a RabbitMQ cluster altogether, the first node you restart should be the last one to go down, since it may have seen things happen that other nodes did not. But sometimes that's not possible: for instance if the entire cluster loses power then all nodes may think they were not the last to shut down.
 
In such a case you can invoke force_boot while the node is down. This will tell the node to unconditionally start next time you ask it to. If any changes happened to the cluster after this node shut down, they will be lost.
 
If the last node to go down is permanently lost then you should use forget_cluster_node --offline in preference to this command, as it will ensure that mirrored queues which were mastered on the lost node get promoted.
 
For example, this will force the node not to wait for other nodes next time it is started:
 
rabbitmqctl force_boot
sync_queue [-p vhostqueue
queue
The name of the queue to synchronise.
 
Instructs a mirrored queue with unsynchronised slaves to synchronise itself. The queue will block while synchronisation takes place (all publishers to and consumers from the queue will block). The queue must be mirrored for this command to succeed.
 
Note that unsynchronised queues from which messages are being drained will become synchronised eventually. This command is primarily useful for queues which are not being drained.
cancel_sync_queue [-p vhostqueue
queue
The name of the queue to cancel synchronisation for.
 
Instructs a synchronising mirrored queue to stop synchronising itself.
purge_queue [-p vhostqueue
queue
The name of the queue to purge.
 
Purges a queue (removes all messages in it).
set_cluster_name name
Sets the cluster name to name. The cluster name is announced to clients on connection, and used by the federation and shovel plugins to record where a message has been. The cluster name is by default derived from the hostname of the first node in the cluster, but can be changed.
 
For example, this sets the cluster name to “london”:
 
rabbitmqctl set_cluster_name london

User Management

Note that rabbitmqctl manages the RabbitMQ internal user database. Users from any alternative authentication backend will not be visible to rabbitmqctl.

add_user username password
username
The name of the user to create.
password
The password the created user will use to log in to the broker.
 
For example, this command instructs the RabbitMQ broker to create a (non-administrative) user named “tonyg” with (initial) password “changeit”:
 
rabbitmqctl add_user tonyg changeit
delete_user username
username
The name of the user to delete.
 
For example, this command instructs the RabbitMQ broker to delete the user named “tonyg”:
 
rabbitmqctl delete_user tonyg
change_password username newpassword
username
The name of the user whose password is to be changed.
newpassword
The new password for the user.
 
For example, this command instructs the RabbitMQ broker to change the password for the user named “tonyg” to “newpass”:
 
rabbitmqctl change_password tonyg newpass
clear_password username
username
The name of the user whose password is to be cleared.
 
For example, this command instructs the RabbitMQ broker to clear the password for the user named “tonyg”:
 
rabbitmqctl clear_password tonyg
 
This user now cannot log in with a password (but may be able to through e.g. SASL EXTERNAL if configured).
authenticate_user username password
username
The name of the user.
password
The password of the user.
 
For example, this command instructs the RabbitMQ broker to authenticate the user named “tonyg” with password “verifyit”:
 
rabbitmqctl authenticate_user tonyg verifyit
set_user_tags username [tag ...]
username
The name of the user whose tags are to be set.
tag
Zero, one or more tags to set. Any existing tags will be removed.
 
For example, this command instructs the RabbitMQ broker to ensure the user named “tonyg” is an administrator:
 
rabbitmqctl set_user_tags tonyg administrator
 
This has no effect when the user logs in via AMQP, but can be used to permit the user to manage users, virtual hosts and permissions when the user logs in via some other means (for example with the management plugin).
 
This command instructs the RabbitMQ broker to remove any tags from the user named “tonyg”:
 
rabbitmqctl set_user_tags tonyg
list_users
Lists users. Each result row will contain the user name followed by a list of the tags set for that user.
 
For example, this command instructs the RabbitMQ broker to list all users:
 
rabbitmqctl list_users

Access Control

Note that rabbitmqctl manages the RabbitMQ internal user database. Permissions for users from any alternative authorisation backend will not be visible to rabbitmqctl.

add_vhost vhost
vhost
The name of the virtual host entry to create.
 
Creates a virtual host.
 
For example, this command instructs the RabbitMQ broker to create a new virtual host called “test”:
 
rabbitmqctl add_vhost test
delete_vhost vhost
vhost
The name of the virtual host entry to delete.
 
Deletes a virtual host.
 
Deleting a virtual host deletes all its exchanges, queues, bindings, user permissions, parameters and policies.
 
For example, this command instructs the RabbitMQ broker to delete the virtual host called “test”:
 
rabbitmqctl delete_vhost test
list_vhosts [vhostinfoitem ...]
Lists virtual hosts.
 
The vhostinfoitem parameter is used to indicate which virtual host information items to include in the results. The column order in the results will match the order of the parameters. vhostinfoitem can take any value from the list that follows:
name
The name of the virtual host with non-ASCII characters escaped as in C.
tracing
Whether tracing is enabled for this virtual host.
 
If no vhostinfoitem are specified then the vhost name is displayed.
 
For example, this command instructs the RabbitMQ broker to list all virtual hosts:
 
rabbitmqctl list_vhosts name tracing
set_permissions [-p vhostuser conf write read
vhost
The name of the virtual host to which to grant the user access, defaulting to “/”.
user
The name of the user to grant access to the specified virtual host.
conf
A regular expression matching resource names for which the user is granted configure permissions.
write
A regular expression matching resource names for which the user is granted write permissions.
read
A regular expression matching resource names for which the user is granted read permissions.
 
Sets user permissions.
 
For example, this command instructs the RabbitMQ broker to grant the user named “tonyg” access to the virtual host called “/myvhost”, with configure permissions on all resources whose names starts with “tonyg-”, and write and read permissions on all resources:
 
rabbitmqctl set_permissions -p /myvhost tonyg “^tonyg-.*” “.*” “.*”
clear_permissions [-p vhostusername
vhost
The name of the virtual host to which to deny the user access, defaulting to “/”.
username
The name of the user to deny access to the specified virtual host.
 
Sets user permissions.
 
For example, this command instructs the RabbitMQ broker to deny the user named “tonyg” access to the virtual host called “/myvhost”:
 
rabbitmqctl clear_permissions -p /myvhost tonyg
list_permissions [-p vhost]
vhost
The name of the virtual host for which to list the users that have been granted access to it, and their permissions. Defaults to “/”.
 
Lists permissions in a virtual host.
 
For example, this command instructs the RabbitMQ broker to list all the users which have been granted access to the virtual host called “/myvhost”, and the permissions they have for operations on resources in that virtual host. Note that an empty string means no permissions granted:
 
rabbitmqctl list_permissions -p /myvhost
list_user_permissions username
username
The name of the user for which to list the permissions.
 
Lists user permissions.
 
For example, this command instructs the RabbitMQ broker to list all the virtual hosts to which the user named “tonyg” has been granted access, and the permissions the user has for operations on resources in these virtual hosts:
 
rabbitmqctl list_user_permissions tonyg
set_topic_permissions [-p vhostuser exchange write read
vhost
The name of the virtual host to which to grant the user access, defaulting to “/”.
user
The name of the user the permissions apply to in the target virtual host.
exchange
The name of the topic exchange the authorisation check will be applied to.
write
A regular expression matching the routing key of the published message.
read
A regular expression matching the routing key of the consumed message.
 
Sets user topic permissions.
 
For example, this command instructs the RabbitMQ broker to let the user named “tonyg” publish and consume messages going through the “amp.topic” exchange of the “/myvhost” virtual host with a routing key starting with “tonyg-”:
 
rabbitmqctl set_topic_permissions -p /myvhost tonyg amq.topic “^tonyg-.*” “^tonyg-.*”
 
Topic permissions support variable expansion for the following variables: username, vhost, and client_id. Note that client_id is expanded only when using MQTT. The previous example could be made more generic by using “^{username}-.*”:
 
rabbitmqctl set_topic_permissions -p /myvhost tonyg amq.topic “^{username}-.*” “^{username}-.*”
clear_topic_permissions [-p vhostusername [exchange]
vhost
The name of the virtual host to which to clear the topic permissions, defaulting to “/”.
username
The name of the user to clear topic permissions to the specified virtual host.
exchange
The name of the topic exchange to clear topic permissions, defaulting to all the topic exchanges the given user has topic permissions for.
 
Clear user topic permissions.
 
For example, this command instructs the RabbitMQ broker to remove topic permissions for user named “tonyg” for the topic exchange “amq.topic” in the virtual host called “/myvhost”:
 
rabbitmqctl clear_topic_permissions -p /myvhost tonyg amq.topic
list_topic_permissions [-p vhost]
vhost
The name of the virtual host for which to list the users topic permissions. Defaults to “/”.
 
Lists topic permissions in a virtual host.
 
For example, this command instructs the RabbitMQ broker to list all the users which have been granted topic permissions in the virtual host called “/myvhost:”
 
rabbitmqctl list_topic_permissions -p /myvhost
list_user_topic_permissions username
username
The name of the user for which to list the topic permissions.
 
Lists user topic permissions.
 
For example, this command instructs the RabbitMQ broker to list all the virtual hosts to which the user named “tonyg” has been granted access, and the topic permissions the user has in these virtual hosts:
 
rabbitmqctl list_topic_user_permissions tonyg

Parameter Management

Certain features of RabbitMQ (such as the federation plugin) are controlled by dynamic, cluster-wide parameters. There are 2 kinds of parameters: parameters scoped to a virtual host and global parameters. Each vhost-scoped parameter consists of a component name, a name and a value. The component name and name are strings, and the value is an Erlang term. A global parameter consists of a name and value. The name is a string and the value is an Erlang term. Parameters can be set, cleared and listed. In general you should refer to the documentation for the feature in question to see how to set parameters.

set_parameter [-p vhostcomponent_name name value
Sets a parameter.
component_name
The name of the component for which the parameter is being set.
name
The name of the parameter being set.
value
The value for the parameter, as a JSON term. In most shells you are very likely to need to quote this.
 
For example, this command sets the parameter “node01” for the “federation-upstream” component in the default virtual host to following JSON:
 
rabbitmqctl set_parameter federation-upstream node01 '{"uri":"amqp://user:password@server/%2F","ack-mode":"on-publish"}'
clear_parameter [-p vhostcomponent_name key
Clears a parameter.
component_name
The name of the component for which the parameter is being cleared.
name
The name of the parameter being cleared.
 
For example, this command clears the parameter “node01” for the “federation-upstream” component in the default virtual host:
 
rabbitmqctl clear_parameter federation-upstream node01
list_parameters [-p vhost]
Lists all parameters for a virtual host.
 
For example, this command lists all parameters in the default virtual host:
 
rabbitmqctl list_parameters
set_global_parameter name value
Sets a global runtime parameter. This is similar to set_parameter but the key-value pair isn't tied to a virtual host.
name
The name of the global runtime parameter being set.
value
The value for the global runtime parameter, as a JSON term. In most shells you are very likely to need to quote this.
 
For example, this command sets the global runtime parameter “mqtt_default_vhosts” to the JSON term {"O=client,CN=guest":"/"}:
 
rabbitmqctl set_global_parameter mqtt_default_vhosts '{"O=client,CN=guest":"/"}'
clear_global_parameter name
Clears a global runtime parameter. This is similar to clear_parameter but the key-value pair isn't tied to a virtual host.
name
The name of the global runtime parameter being cleared.
 
For example, this command clears the global runtime parameter “mqtt_default_vhosts”:
 
rabbitmqctl clear_global_parameter mqtt_default_vhosts
list_global_parameters
Lists all global runtime parameters. This is similar to list_parameters but the global runtime parameters are not tied to any virtual host.
 
For example, this command lists all global parameters:
 
rabbitmqctl list_global_parameters

Policy Management

Policies are used to control and modify the behaviour of queues and exchanges on a cluster-wide basis. Policies apply within a given vhost, and consist of a name, pattern, definition and an optional priority. Policies can be set, cleared and listed.

set_policy [-p vhost] [--priority priority] [--apply-to apply-toname pattern definition
Sets a policy.
name
The name of the policy.
pattern
The regular expression, which when matches on a given resources causes the policy to apply.
definition
The definition of the policy, as a JSON term. In most shells you are very likely to need to quote this.
priority
The priority of the policy as an integer. Higher numbers indicate greater precedence. The default is 0.
apply-to
Which types of object this policy should apply to. Possible values are: The default is all ..
 
For example, this command sets the policy “federate-me” in the default virtual host so that built-in exchanges are federated:
 
rabbitmqctl set_policy federate-me ^amq. '{"federation-upstream-set":"all"}'
clear_policy [-p vhostname
Clears a policy.
name
The name of the policy being cleared.
 
For example, this command clears the “federate-me” policy in the default virtual host:
 
rabbitmqctl clear_policy federate-me
list_policies [-p vhost]
Lists all policies for a virtual host.
 
For example, this command lists all policies in the default virtual host:
 
rabbitmqctl list_policies
set_operator_policy [-p vhost] [--priority priority] [--apply-to apply-toname pattern definition
Sets an operator policy that overrides a subset of arguments in user policies. Arguments are identical to those of set_policy.
 
Supported arguments are:
  • expires
  • message-ttl
  • max-length
  • max-length-bytes
clear_operator_policy [-p vhostname
Clears an operator policy. Arguments are identical to those of clear_policy.
list_operator_policies [-p vhost]
Lists operator policy overrides for a virtual host. Arguments are identical to those of list_policies.

Virtual Host Limits

It is possible to enforce certain limits on virtual hosts.

set_vhost_limits [-p vhostdefinition
Sets virtual host limits.
definition
The definition of the limits, as a JSON term. In most shells you are very likely to need to quote this.
 
Recognised limits are:
  • max-connections
  • max-queues
 
Use a negative value to specify "no limit".
 
For example, this command limits the max number of concurrent connections in vhost “qa_env” to 64:
 
rabbitmqctl set_vhost_limits -p qa_env '{"max-connections": 64}'
 
This command limits the max number of queues in vhost “qa_env” to 256:
 
rabbitmqctl set_vhost_limits -p qa_env '{"max-queues": 256}'
 
This command clears the max number of connections limit in vhost “qa_env”:
 
rabbitmqctl set_vhost_limits -p qa_env '{"max-connections": -1}'
 
This command disables client connections in vhost “qa_env”:
 
rabbitmqctl set_vhost_limits -p qa_env '{"max-connections": 0}'
clear_vhost_limits [-p vhost]
Clears virtual host limits.
 
For example, this command clears vhost limits in vhost “qa_env”:
 
rabbitmqctl clear_vhost_limits -p qa_env
list_vhost_limits [-p vhost] [--global]
Displays configured virtual host limits.
--global
Show limits for all vhosts. Suppresses the -p parameter.

Server Status

The server status queries interrogate the server and return a list of results with tab-delimited columns. Some queries ( list_queueslist_exchangeslist_bindings and list_consumers) accept an optional vhost parameter. This parameter, if present, must be specified immediately after the query.

 

The list_queueslist_exchanges and list_bindings commands accept an optional virtual host parameter for which to display results. The default value is “/”.

list_queues [-p vhost] [--offline | --online | --local] [queueinfoitem ...]
Returns queue details. Queue details of the “/” virtual host are returned if the -p flag is absent. The -p flag can be used to override this default.
 
Displayed queues can be filtered by their status or location using one of the following mutually exclusive options:
--offline
List only those durable queues that are not currently available (more specifically, their master node isn't).
--online
List queues that are currently available (their master node is).
--local
List only those queues whose master process is located on the current node.
 
The queueinfoitem parameter is used to indicate which queue information items to include in the results. The column order in the results will match the order of the parameters. queueinfoitem can take any value from the list that follows:
name
The name of the queue with non-ASCII characters escaped as in C.
durable
Whether or not the queue survives server restarts.
auto_delete
Whether the queue will be deleted automatically when no longer used.
arguments
Queue arguments.
policy
Policy name applying to the queue.
pid
Id of the Erlang process associated with the queue.
owner_pid
Id of the Erlang process representing the connection which is the exclusive owner of the queue. Empty if the queue is non-exclusive.
exclusive
True if queue is exclusive (i.e. has owner_pid), false otherwise.
exclusive_consumer_pid
Id of the Erlang process representing the channel of the exclusive consumer subscribed to this queue. Empty if there is no exclusive consumer.
exclusive_consumer_tag
Consumer tag of the exclusive consumer subscribed to this queue. Empty if there is no exclusive consumer.
messages_ready
Number of messages ready to be delivered to clients.
messages_unacknowledged
Number of messages delivered to clients but not yet acknowledged.
messages
Sum of ready and unacknowledged messages (queue depth).
messages_ready_ram
Number of messages from messages_ready which are resident in ram.
messages_unacknowledged_ram
Number of messages from messages_unacknowledged which are resident in ram.
messages_ram
Total number of messages which are resident in ram.
messages_persistent
Total number of persistent messages in the queue (will always be 0 for transient queues).
message_bytes
Sum of the size of all message bodies in the queue. This does not include the message properties (including headers) or any overhead.
message_bytes_ready
Like message_bytes but counting only those messages ready to be delivered to clients.
message_bytes_unacknowledged
Like message_bytes but counting only those messages delivered to clients but not yet acknowledged.
message_bytes_ram
Like message_bytes but counting only those messages which are in RAM.
message_bytes_persistent
Like message_bytes but counting only those messages which are persistent.
head_message_timestamp
The timestamp property of the first message in the queue, if present. Timestamps of messages only appear when they are in the paged-in state.
disk_reads
Total number of times messages have been read from disk by this queue since it started.
disk_writes
Total number of times messages have been written to disk by this queue since it started.
consumers
Number of consumers.
consumer_utilisation
Fraction of the time (between 0.0 and 1.0) that the queue is able to immediately deliver messages to consumers. This can be less than 1.0 if consumers are limited by network congestion or prefetch count.
memory
Bytes of memory consumed by the Erlang process associated with the queue, including stack, heap and internal structures.
slave_pids
If the queue is mirrored, this gives the IDs of the current slaves.
synchronised_slave_pids
If the queue is mirrored, this gives the IDs of the current slaves which are synchronised with the master - i.e. those which could take over from the master without message loss.
state
The state of the queue. Normally “running”, but may be “{syncing, message_count}” if the queue is synchronising.
 
Queues which are located on cluster nodes that are currently down will be shown with a status of “down” (and most other queueinfoitem will be unavailable).
 
If no queueinfoitem are specified then queue name and depth are displayed.
 
For example, this command displays the depth and number of consumers for each queue of the virtual host named “/myvhost”
 
rabbitmqctl list_queues -p /myvhost messages consumers
list_exchanges [-p vhost] [exchangeinfoitem ...]
Returns exchange details. Exchange details of the “/” virtual host are returned if the -p flag is absent. The -p flag can be used to override this default.
 
The exchangeinfoitem parameter is used to indicate which exchange information items to include in the results. The column order in the results will match the order of the parameters. exchangeinfoitem can take any value from the list that follows:
name
The name of the exchange with non-ASCII characters escaped as in C.
type
The exchange type, such as:
  • direct
  • topic
  • headers
  • fanout
durable
Whether or not the exchange survives server restarts.
auto_delete
Whether the exchange will be deleted automatically when no longer used.
internal
Whether the exchange is internal, i.e. cannot be directly published to by a client.
arguments
Exchange arguments.
policy
Policy name for applying to the exchange.
 
If no exchangeinfoitem are specified then exchange name and type are displayed.
 
For example, this command displays the name and type for each exchange of the virtual host named “/myvhost”:
 
rabbitmqctl list_exchanges -p /myvhost name type
list_bindings [-p vhost] [bindinginfoitem ...]
Returns binding details. By default the bindings for the “/” virtual host are returned. The -p flag can be used to override this default.
 
The bindinginfoitem parameter is used to indicate which binding information items to include in the results. The column order in the results will match the order of the parameters. bindinginfoitem can take any value from the list that follows:
source_name
The name of the source of messages to which the binding is attached. With non-ASCII characters escaped as in C.
source_kind
The kind of the source of messages to which the binding is attached. Currently always exchange. With non-ASCII characters escaped as in C.
destination_name
The name of the destination of messages to which the binding is attached. With non-ASCII characters escaped as in C.
destination_kind
The kind of the destination of messages to which the binding is attached. With non-ASCII characters escaped as in C.
routing_key
The binding's routing key, with non-ASCII characters escaped as in C.
arguments
The binding's arguments.
 
If no bindinginfoitem are specified then all above items are displayed.
 
For example, this command displays the exchange name and queue name of the bindings in the virtual host named “/myvhost”
 
rabbitmqctl list_bindings -p /myvhost exchange_name queue_name
list_connections [connectioninfoitem ...]
Returns TCP/IP connection statistics.
 
The connectioninfoitem parameter is used to indicate which connection information items to include in the results. The column order in the results will match the order of the parameters. connectioninfoitem can take any value from the list that follows:
pid
Id of the Erlang process associated with the connection.
name
Readable name for the connection.
port
Server port.
host
Server hostname obtained via reverse DNS, or its IP address if reverse DNS failed or was disabled.
peer_port
Peer port.
peer_host
Peer hostname obtained via reverse DNS, or its IP address if reverse DNS failed or was not enabled.
ssl
Boolean indicating whether the connection is secured with SSL.
ssl_protocol
SSL protocol (e.g. “tlsv1”).
ssl_key_exchange
SSL key exchange algorithm (e.g. “rsa”).
ssl_cipher
SSL cipher algorithm (e.g. “aes_256_cbc”).
ssl_hash
SSL hash function (e.g. “sha”).
peer_cert_subject
The subject of the peer's SSL certificate, in RFC4514 form.
peer_cert_issuer
The issuer of the peer's SSL certificate, in RFC4514 form.
peer_cert_validity
The period for which the peer's SSL certificate is valid.
state
Connection state; one of:
  • starting
  • tuning
  • opening
  • running
  • flow
  • blocking
  • blocked
  • closing
  • closed
channels
Number of channels using the connection.
protocol
Version of the AMQP protocol in use; currently one of:
  • {0,9,1}
  • {0,8,0}
 
Note that if a client requests an AMQP 0-9 connection, we treat it as AMQP 0-9-1.
auth_mechanism
SASL authentication mechanism used, such as “PLAIN”.
user
Username associated with the connection.
vhost
Virtual host name with non-ASCII characters escaped as in C.
timeout
Connection timeout / negotiated heartbeat interval, in seconds.
frame_max
Maximum frame size (bytes).
channel_max
Maximum number of channels on this connection.
client_properties
Informational properties transmitted by the client during connection establishment.
recv_oct
Octets received.
recv_cnt
Packets received.
send_oct
Octets send.
send_cnt
Packets sent.
send_pend
Send queue size.
connected_at
Date and time this connection was established, as timestamp.
 
If no connectioninfoitem are specified then user, peer host, peer port, time since flow control and memory block state are displayed.
 
For example, this command displays the send queue size and server port for each connection:
 
rabbitmqctl list_connections send_pend port
list_channels [channelinfoitem ...]
Returns information on all current channels, the logical containers executing most AMQP commands. This includes channels that are part of ordinary AMQP connections, and channels created by various plug-ins and other extensions.
 
The channelinfoitem parameter is used to indicate which channel information items to include in the results. The column order in the results will match the order of the parameters. channelinfoitem can take any value from the list that follows:
pid
Id of the Erlang process associated with the connection.
connection
Id of the Erlang process associated with the connection to which the channel belongs.
name
Readable name for the channel.
number
The number of the channel, which uniquely identifies it within a connection.
user
Username associated with the channel.
vhost
Virtual host in which the channel operates.
transactional
True if the channel is in transactional mode, false otherwise.
confirm
True if the channel is in confirm mode, false otherwise.
consumer_count
Number of logical AMQP consumers retrieving messages via the channel.
messages_unacknowledged
Number of messages delivered via this channel but not yet acknowledged.
messages_uncommitted
Number of messages received in an as yet uncommitted transaction.
acks_uncommitted
Number of acknowledgements received in an as yet uncommitted transaction.
messages_unconfirmed
Number of published messages not yet confirmed. On channels not in confirm mode, this remains 0.
prefetch_count
QoS prefetch limit for new consumers, 0 if unlimited.
global_prefetch_count
QoS prefetch limit for the entire channel, 0 if unlimited.
 
If no channelinfoitem are specified then pid, user, consumer_count, and messages_unacknowledged are assumed.
 
For example, this command displays the connection process and count of unacknowledged messages for each channel:
 
rabbitmqctl list_channels connection messages_unacknowledged
list_consumers [-p vhost]
Lists consumers, i.e. subscriptions to a queue´s message stream. Each line printed shows, separated by tab characters, the name of the queue subscribed to, the id of the channel process via which the subscription was created and is managed, the consumer tag which uniquely identifies the subscription within a channel, a boolean indicating whether acknowledgements are expected for messages delivered to this consumer, an integer indicating the prefetch limit (with 0 meaning “none”), and any arguments for this consumer.
status
Displays broker status information such as the running applications on the current Erlang node, RabbitMQ and Erlang versions, OS name, memory and file descriptor statistics. (See the cluster_status command to find out which nodes are clustered and running.)
 
For example, this command displays information about the RabbitMQ broker:
 
rabbitmqctl status
node_health_check
Health check of the RabbitMQ node. Verifies the rabbit application is running, list_queues and list_channels return, and alarms are not set.
 
For example, this command performs a health check on the RabbitMQ node:
 
rabbitmqctl node_health_check -n rabbit@stringer
environment
Displays the name and value of each variable in the application environment for each running application.
report
Generate a server status report containing a concatenation of all server status information for support purposes. The output should be redirected to a file when accompanying a support request.
 
For example, this command creates a server report which may be attached to a support request email:
 
rabbitmqctl report > server_report.txt
eval expr
Evaluate an arbitrary Erlang expression.
 
For example, this command returns the name of the node to which rabbitmqctl has connected:
 
rabbitmqctl eval “node().”

Miscellaneous

close_connection connectionpid explanation
connectionpid
Id of the Erlang process associated with the connection to close.
explanation
Explanation string.
 
Instructs the broker to close the connection associated with the Erlang process id connectionpid (see also the list_connections command), passing the explanation string to the connected client as part of the AMQP connection shutdown protocol.
 
For example, this command instructs the RabbitMQ broker to close the connection associated with the Erlang process id “<rabbit@tanto.4262.0>”, passing the explanation “go away” to the connected client:
 
rabbitmqctl close_connection “<rabbit@tanto.4262.0>” “go away”
close_all_connections [-p vhost] [--global] [--per-connection-delay delay] [--limit limitexplanation
-p vhost
The name of the virtual host for which connections should be closed. Ignored when --global is specified.
--global
If connections should be close for all vhosts. Overrides -p
--per-connection-delay delay
Time in milliseconds to wait after each connection closing.
--limit limit
Number of connection to close. Only works per vhost. Ignored when --global is specified.
explanation
Explanation string.
 
Instructs the broker to close all connections for the specified vhost or entire RabbitMQ node.
 
For example, this command instructs the RabbitMQ broker to close 10 connections on “qa_env” vhost, passing the explanation “Please close”:
 
rabbitmqctl close_all_connections -p qa_env --limit 10 'Please close'
 
This command instructs broker to close all connections to the node:
 
rabbitmqctl close_all_connections --global
 
trace_on [-p vhost]
vhost
The name of the virtual host for which to start tracing.
 
Starts tracing. Note that the trace state is not persistent; it will revert to being off if the server is restarted.
trace_off [-p vhost]
vhost
The name of the virtual host for which to stop tracing.
 
Stops tracing.
set_vm_memory_high_watermark fraction
fraction
The new memory threshold fraction at which flow control is triggered, as a floating point number greater than or equal to 0.
set_vm_memory_high_watermark absolute memory_limit
memory_limit
The new memory limit at which flow control is triggered, expressed in bytes as an integer number greater than or equal to 0 or as a string with memory units (e.g. 512M or 1G). Available units are:
kkiB
kibibytes (2^10 bytes)
MMiB
mebibytes (2^20 bytes)
GGiB
gibibytes (2^30 bytes)
kB
kilobytes (10^3 bytes)
MB
megabytes (10^6 bytes)
GB
gigabytes (10^9 bytes)
set_disk_free_limit disk_limit
disk_limit
Lower bound limit as an integer in bytes or a string with memory units (see vm_memory_high_watermark), e.g. 512M or 1G. Once free disk space reaches the limit, a disk alarm will be set.
set_disk_free_limit mem_relative fraction
fraction
Limit relative to the total amount available RAM as a non-negative floating point number. Values lower than 1.0 can be dangerous and should be used carefully.
encode value passphrase [--cipher cipher] [--hash hash] [--iterations iterations]
value passphrase
Value to encrypt and passphrase.
 
For example:
 
rabbitmqctl encode '<<"guest">>' mypassphrase
--cipher cipher --hash hash --iterations iterations
Options to specify the encryption settings. They can be used independently.
 
For example:
 
rabbitmqctl encode --cipher blowfish_cfb64 --hash sha256 --iterations 10000 '<<"guest">>' mypassphrase
decode value passphrase [--cipher cipher] [--hash hash] [--iterations iterations]
value passphrase
Value to decrypt (as produced by the encode command) and passphrase.
 
For example:
 
rabbitmqctl decode '{encrypted, <<"...">>}' mypassphrase
--cipher cipher --hash hash --iterations iterations
Options to specify the decryption settings. They can be used independently.
 
For example:
 
rabbitmqctl decode --cipher blowfish_cfb64 --hash sha256 --iterations 10000 '{encrypted,<<"...">>} mypassphrase
list_hashes
Lists hash functions supported by encoding commands.
 
For example, this command instructs the RabbitMQ broker to list all hash functions supported by encoding commands:
 
rabbitmqctl list_hashes
list_ciphers
Lists cipher suites supported by encoding commands.
 
For example, this command instructs the RabbitMQ broker to list all cipher suites supported by encoding commands:
 
rabbitmqctl list_ciphers

PLUGIN COMMANDS

RabbitMQ plugins can extend rabbitmqctl tool to add new commands when enabled. Currently available commands can be found in rabbitmqctl help output. Following commands are added by RabbitMQ plugins, available in default distribution:

Shovel plugin

shovel_status
Prints a list of configured shovels
delete_shovel [-p vhostname
Instructs the RabbitMQ node to delete the configured shovel by name.

Federation plugin

federation_status [--only-down]
Prints a list of federation links.
--only-down
Only list federation links which are not running.
restart_federation_link link_id
Instructs the RabbitMQ node to restart the federation link with specified link_id.

AMQP-1.0 plugin

list_amqp10_connections [amqp10_connectioninfoitem ...]
Similar to the list_connections command, but returns fields which make sense for AMQP-1.0 connections. amqp10_connectioninfoitem parameter is used to indicate which connection information items to include in the results. The column order in the results will match the order of the parameters. amqp10_connectioninfoitem can take any value from the list that follows:
pid
Id of the Erlang process associated with the connection.
auth_mechanism
SASL authentication mechanism used, such as “PLAIN”.
host
Server hostname obtained via reverse DNS, or its IP address if reverse DNS failed or was disabled.
frame_max
Maximum frame size (bytes).
timeout
Connection timeout / negotiated heartbeat interval, in seconds.
user
Username associated with the connection.
state
Connection state; one of:
  • starting
  • waiting_amqp0100
  • securing
  • running
  • blocking
  • blocked
  • closing
  • closed
recv_oct
Octets received.
recv_cnt
Packets received.
send_oct
Octets send.
send_cnt
Packets sent.
ssl
Boolean indicating whether the connection is secured with SSL.
ssl_protocol
SSL protocol (e.g. “tlsv1”).
ssl_key_exchange
SSL key exchange algorithm (e.g. “rsa”).
ssl_cipher
SSL cipher algorithm (e.g. “aes_256_cbc”).
ssl_hash
SSL hash function (e.g. “sha”).
peer_cert_subject
The subject of the peer's SSL certificate, in RFC4514 form.
peer_cert_issuer
The issuer of the peer's SSL certificate, in RFC4514 form.
peer_cert_validity
The period for which the peer's SSL certificate is valid.
node
The node name of the RabbitMQ node to which connection is established.

MQTT plugin

list_mqtt_connections [mqtt_connectioninfoitem]
Similar to the list_connections command, but returns fields which make sense for MQTT connections. mqtt_connectioninfoitem parameter is used to indicate which connection information items to include in the results. The column order in the results will match the order of the parameters. mqtt_connectioninfoitem can take any value from the list that follows:
host
Server hostname obtained via reverse DNS, or its IP address if reverse DNS failed or was disabled.
port
Server port.
peer_host
Peer hostname obtained via reverse DNS, or its IP address if reverse DNS failed or was not enabled.
peer_port
Peer port.
protocol
MQTT protocol version, which can be on of the following:
  • {'MQTT', N/A}
  • {'MQTT', 3.1.0}
  • {'MQTT', 3.1.1}
channels
Number of channels using the connection.
channel_max
Maximum number of channels on this connection.
frame_max
Maximum frame size (bytes).
client_properties
Informational properties transmitted by the client during connection establishment.
ssl
Boolean indicating whether the connection is secured with SSL.
ssl_protocol
SSL protocol (e.g. “tlsv1”).
ssl_key_exchange
SSL key exchange algorithm (e.g. “rsa”).
ssl_cipher
SSL cipher algorithm (e.g. “aes_256_cbc”).
ssl_hash
SSL hash function (e.g. “sha”).
conn_name
Readable name for the connection.
connection_state
Connection state; one of:
  • starting
  • running
  • blocked
connection
Id of the Erlang process associated with the internal amqp direct connection.
consumer_tags
A tuple of consumer tags for QOS0 and QOS1.
message_id
The last Packet ID sent in a control message.
client_id
MQTT client identifier for the connection.
clean_sess
MQTT clean session flag.
will_msg
MQTT Will message sent in CONNECT frame.
exchange
Exchange to route MQTT messages configured in rabbitmq_mqtt application environment.
ssl_login_name
SSL peer cert auth name
retainer_pid
Id of the Erlang process associated with retain storage for the connection.
user
Username associated with the connection.
vhost
Virtual host name with non-ASCII characters escaped as in C.

STOMP plugin

list_stomp_connections [stomp_connectioninfoitem]
Similar to the list_connections command, but returns fields which make sense for STOMP connections. stomp_connectioninfoitem parameter is used to indicate which connection information items to include in the results. The column order in the results will match the order of the parameters. stomp_connectioninfoitem can take any value from the list that follows:
conn_name
Readable name for the connection.
connection
Id of the Erlang process associated with the internal amqp direct connection.
connection_state
Connection state; one of:
  • running
  • blocking
  • blocked
session_id
STOMP protocol session identifier
channel
AMQP channel associated with the connection
version
Negotiated STOMP protocol version for the connection.
implicit_connect
Indicates if the connection was established using implicit connect (without CONNECT frame)
auth_login
Effective username for the connection.
auth_mechanism
STOMP authorization mechanism. Can be one of:
  • config
  • ssl
  • stomp_headers
port
Server port.
host
Server hostname obtained via reverse DNS, or its IP address if reverse DNS failed or was not enabled.
peer_port
Peer port.
peer_host
Peer hostname obtained via reverse DNS, or its IP address if reverse DNS failed or was not enabled.
protocol
STOMP protocol version, which can be on of the following:
  • {'STOMP', 0}
  • {'STOMP', 1}
  • {'STOMP', 2}
channels
Number of channels using the connection.
channel_max
Maximum number of channels on this connection.
frame_max
Maximum frame size (bytes).
client_properties
Informational properties transmitted by the client during connection
ssl
Boolean indicating whether the connection is secured with SSL.
ssl_protocol
SSL protocol (e.g. “tlsv1”).
ssl_key_exchange
SSL key exchange algorithm (e.g. “rsa”).
ssl_cipher
SSL cipher algorithm (e.g. “aes_256_cbc”).
ssl_hash
SSL hash function (e.g. “sha”).

Management agent plugin

reset_stats_db [--all]
Reset management stats database for the RabbitMQ node.
--all
Reset stats database for all nodes in the cluster.

SEE ALSO

rabbitmq-env.conf(5)rabbitmq-echopid(8)rabbitmq-plugins(8)rabbitmq-server(8)rabbitmq-service(8)

AUTHOR

The RabbitMQ Team <info@rabbitmq.com>

 

【参考】

rabbitmq 启动报错 - 粽先生 - 博客园 https://www.cnblogs.com/straycats/p/7719933.html

【测试代码】

Asynchronous publisher example — pika 0.12.0 documentation https://pika.readthedocs.io/en/stable/examples/asynchronous_publisher_example.html

 

# -*- coding: utf-8 -*-

import logging
import pika
import json

LOG_FORMAT = ('%(levelname) -10s %(asctime)s %(name) -30s %(funcName) '
              '-35s %(lineno) -5d: %(message)s')
LOGGER = logging.getLogger(__name__)


class ExamplePublisher(object):
    """This is an example publisher that will handle unexpected interactions
    with RabbitMQ such as channel and connection closures.

    If RabbitMQ closes the connection, it will reopen it. You should
    look at the output, as there are limited reasons why the connection may
    be closed, which usually are tied to permission related issues or
    socket timeouts.

    It uses delivery confirmations and illustrates one way to keep track of
    messages that have been sent and if they've been confirmed by RabbitMQ.

    """
    EXCHANGE = 'messageTEST'
    EXCHANGE_TYPE = 'topic'
    PUBLISH_INTERVAL = 1
    QUEUE = 'textTEST'
    ROUTING_KEY = 'example.text'

    def __init__(self, amqp_url):
        """Setup the example publisher object, passing in the URL we will use
        to connect to RabbitMQ.

        :param str amqp_url: The URL for connecting to RabbitMQ

        """
        self._connection = None
        self._channel = None

        self._deliveries = None
        self._acked = None
        self._nacked = None
        self._message_number = None

        self._stopping = False
        self._url = amqp_url

    def connect(self):
        """This method connects to RabbitMQ, returning the connection handle.
        When the connection is established, the on_connection_open method
        will be invoked by pika. If you want the reconnection to work, make
        sure you set stop_ioloop_on_close to False, which is not the default
        behavior of this adapter.

        :rtype: pika.SelectConnection

        """
        LOGGER.info('Connecting to %s', self._url)
        return pika.SelectConnection(pika.URLParameters(self._url),
                                     on_open_callback=self.on_connection_open,
                                     on_close_callback=self.on_connection_closed,
                                     stop_ioloop_on_close=False)

    def on_connection_open(self, unused_connection):
        """This method is called by pika once the connection to RabbitMQ has
        been established. It passes the handle to the connection object in
        case we need it, but in this case, we'll just mark it unused.

        :type unused_connection: pika.SelectConnection

        """
        LOGGER.info('Connection opened')
        self.open_channel()

    def on_connection_closed(self, connection, reply_code, reply_text):
        """This method is invoked by pika when the connection to RabbitMQ is
        closed unexpectedly. Since it is unexpected, we will reconnect to
        RabbitMQ if it disconnects.

        :param pika.connection.Connection connection: The closed connection obj
        :param int reply_code: The server provided reply_code if given
        :param str reply_text: The server provided reply_text if given

        """
        self._channel = None
        if self._stopping:
            self._connection.ioloop.stop()
        else:
            LOGGER.warning('Connection closed, reopening in 5 seconds: (%s) %s',
                           reply_code, reply_text)
            self._connection.add_timeout(5, self._connection.ioloop.stop)

    def open_channel(self):
        """This method will open a new channel with RabbitMQ by issuing the
        Channel.Open RPC command. When RabbitMQ confirms the channel is open
        by sending the Channel.OpenOK RPC reply, the on_channel_open method
        will be invoked.

        """
        LOGGER.info('Creating a new channel')
        self._connection.channel(on_open_callback=self.on_channel_open)

    def on_channel_open(self, channel):
        """This method is invoked by pika when the channel has been opened.
        The channel object is passed in so we can make use of it.

        Since the channel is now open, we'll declare the exchange to use.

        :param pika.channel.Channel channel: The channel object

        """
        LOGGER.info('Channel opened')
        self._channel = channel
        self.add_on_channel_close_callback()
        self.setup_exchange(self.EXCHANGE)

    def add_on_channel_close_callback(self):
        """This method tells pika to call the on_channel_closed method if
        RabbitMQ unexpectedly closes the channel.

        """
        LOGGER.info('Adding channel close callback')
        self._channel.add_on_close_callback(self.on_channel_closed)

    def on_channel_closed(self, channel, reply_code, reply_text):
        """Invoked by pika when RabbitMQ unexpectedly closes the channel.
        Channels are usually closed if you attempt to do something that
        violates the protocol, such as re-declare an exchange or queue with
        different parameters. In this case, we'll close the connection
        to shutdown the object.

        :param pika.channel.Channel channel: The closed channel
        :param int reply_code: The numeric reason the channel was closed
        :param str reply_text: The text reason the channel was closed

        """
        LOGGER.warning('Channel was closed: (%s) %s', reply_code, reply_text)
        self._channel = None
        if not self._stopping:
            self._connection.close()

    def setup_exchange(self, exchange_name):
        """Setup the exchange on RabbitMQ by invoking the Exchange.Declare RPC
        command. When it is complete, the on_exchange_declareok method will
        be invoked by pika.

        :param str|unicode exchange_name: The name of the exchange to declare

        """
        LOGGER.info('Declaring exchange %s', exchange_name)
        self._channel.exchange_declare(self.on_exchange_declareok,
                                       exchange_name,
                                       self.EXCHANGE_TYPE)

    def on_exchange_declareok(self, unused_frame):
        """Invoked by pika when RabbitMQ has finished the Exchange.Declare RPC
        command.

        :param pika.Frame.Method unused_frame: Exchange.DeclareOk response frame

        """
        LOGGER.info('Exchange declared')
        self.setup_queue(self.QUEUE)

    def setup_queue(self, queue_name):
        """Setup the queue on RabbitMQ by invoking the Queue.Declare RPC
        command. When it is complete, the on_queue_declareok method will
        be invoked by pika.

        :param str|unicode queue_name: The name of the queue to declare.

        """
        LOGGER.info('Declaring queue %s', queue_name)
        self._channel.queue_declare(self.on_queue_declareok, queue_name)

    def on_queue_declareok(self, method_frame):
        """Method invoked by pika when the Queue.Declare RPC call made in
        setup_queue has completed. In this method we will bind the queue
        and exchange together with the routing key by issuing the Queue.Bind
        RPC command. When this command is complete, the on_bindok method will
        be invoked by pika.

        :param pika.frame.Method method_frame: The Queue.DeclareOk frame

        """
        LOGGER.info('Binding %s to %s with %s',
                    self.EXCHANGE, self.QUEUE, self.ROUTING_KEY)
        self._channel.queue_bind(self.on_bindok, self.QUEUE,
                                 self.EXCHANGE, self.ROUTING_KEY)

    def on_bindok(self, unused_frame):
        """This method is invoked by pika when it receives the Queue.BindOk
        response from RabbitMQ. Since we know we're now setup and bound, it's
        time to start publishing."""
        LOGGER.info('Queue bound')
        self.start_publishing()

    def start_publishing(self):
        """This method will enable delivery confirmations and schedule the
        first message to be sent to RabbitMQ

        """
        LOGGER.info('Issuing consumer related RPC commands')
        self.enable_delivery_confirmations()
        self.schedule_next_message()

    def enable_delivery_confirmations(self):
        """Send the Confirm.Select RPC method to RabbitMQ to enable delivery
        confirmations on the channel. The only way to turn this off is to close
        the channel and create a new one.

        When the message is confirmed from RabbitMQ, the
        on_delivery_confirmation method will be invoked passing in a Basic.Ack
        or Basic.Nack method from RabbitMQ that will indicate which messages it
        is confirming or rejecting.

        """
        LOGGER.info('Issuing Confirm.Select RPC command')
        self._channel.confirm_delivery(self.on_delivery_confirmation)

    def on_delivery_confirmation(self, method_frame):
        """Invoked by pika when RabbitMQ responds to a Basic.Publish RPC
        command, passing in either a Basic.Ack or Basic.Nack frame with
        the delivery tag of the message that was published. The delivery tag
        is an integer counter indicating the message number that was sent
        on the channel via Basic.Publish. Here we're just doing house keeping
        to keep track of stats and remove message numbers that we expect
        a delivery confirmation of from the list used to keep track of messages
        that are pending confirmation.

        :param pika.frame.Method method_frame: Basic.Ack or Basic.Nack frame

        """
        confirmation_type = method_frame.method.NAME.split('.')[1].lower()
        LOGGER.info('Received %s for delivery tag: %i',
                    confirmation_type,
                    method_frame.method.delivery_tag)
        if confirmation_type == 'ack':
            self._acked += 1
        elif confirmation_type == 'nack':
            self._nacked += 1
        self._deliveries.remove(method_frame.method.delivery_tag)
        LOGGER.info('Published %i messages, %i have yet to be confirmed, '
                    '%i were acked and %i were nacked',
                    self._message_number, len(self._deliveries),
                    self._acked, self._nacked)

    def schedule_next_message(self):
        """If we are not closing our connection to RabbitMQ, schedule another
        message to be delivered in PUBLISH_INTERVAL seconds.

        """
        LOGGER.info('Scheduling next message for %0.1f seconds',
                    self.PUBLISH_INTERVAL)
        self._connection.add_timeout(self.PUBLISH_INTERVAL,
                                     self.publish_message)

    def publish_message(self):
        """If the class is not stopping, publish a message to RabbitMQ,
        appending a list of deliveries with the message number that was sent.
        This list will be used to check for delivery confirmations in the
        on_delivery_confirmations method.

        Once the message has been sent, schedule another message to be sent.
        The main reason I put scheduling in was just so you can get a good idea
        of how the process is flowing by slowing down and speeding up the
        delivery intervals by changing the PUBLISH_INTERVAL constant in the
        class.

        """
        if self._channel is None or not self._channel.is_open:
            return

        hdrs = {u'مفتاح': u' قيمة',
                u'键': u'值',
                u'キー': u'値'}
        properties = pika.BasicProperties(app_id='example-publisher',
                                          content_type='application/json',
                                          headers=hdrs)

        message = u'مفتاح قيمة 键 值 キー 値'
        self._channel.basic_publish(self.EXCHANGE, self.ROUTING_KEY,
                                    json.dumps(message, ensure_ascii=False),
                                    properties)
        self._message_number += 1
        self._deliveries.append(self._message_number)
        LOGGER.info('Published message # %i', self._message_number)
        self.schedule_next_message()

    def run(self):
        """Run the example code by connecting and then starting the IOLoop.

        """
        while not self._stopping:
            self._connection = None
            self._deliveries = []
            self._acked = 0
            self._nacked = 0
            self._message_number = 0

            try:
                self._connection = self.connect()
                self._connection.ioloop.start()
            except KeyboardInterrupt:
                self.stop()
                if (self._connection is not None and
                        not self._connection.is_closed):
                    # Finish closing
                    self._connection.ioloop.start()

        LOGGER.info('Stopped')

    def stop(self):
        """Stop the example by closing the channel and connection. We
        set a flag here so that we stop scheduling new messages to be
        published. The IOLoop is started because this method is
        invoked by the Try/Catch below when KeyboardInterrupt is caught.
        Starting the IOLoop again will allow the publisher to cleanly
        disconnect from RabbitMQ.

        """
        LOGGER.info('Stopping')
        self._stopping = True
        self.close_channel()
        self.close_connection()

    def close_channel(self):
        """Invoke this command to close the channel with RabbitMQ by sending
        the Channel.Close RPC command.

        """
        if self._channel is not None:
            LOGGER.info('Closing the channel')
            self._channel.close()

    def close_connection(self):
        """This method closes the connection to RabbitMQ."""
        if self._connection is not None:
            LOGGER.info('Closing connection')
            self._connection.close()


def main():
    logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT)

    # Connect to localhost:5672 as guest with the password guest and virtual host "/" (%2F)
    # example = ExamplePublisher('amqp://guest:guest@localhost:5672/%2F?connection_attempts=3&heartbeat_interval=3600')
    rmqAddr, rmqPort, rmqQueue = '101.201.41.72', 5672, 'video_article'
    rmqU, rmqP = 'liaohy', 'liaohy'

    C = 'amqp://{}:{}@{}:{}/%2F?connection_attempts=3&heartbeat_interval=3600'.format(rmqU, rmqP, rmqAddr, rmqPort)
    example = ExamplePublisher(C)
    example.run()


if __name__ == '__main__':
    main()

  

 

 

Why does the queue memory grow and shrink when publishing/consuming?

http://www.rabbitmq.com/memory-use.html#queue-memory-usage http://www.rabbitmq.com/memory-use.html#queue-memory-usage

 

 

AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现有 RabbitMQ等。

中文名
高级消息队列协议
外文名
Advanced Message Queuing Protocol
属    性
应用层标准协议
应用领域
计算机

简介

编辑
AMQP协议
第1章 概述
1.3. 摘要
1.3.1. 什么是AMQP
高级消息队列协议使得遵从该规范的客户端应用和消息中间件服务器的全功能互操作成为可能。
1.3.2. 为什么要用AMQP
我们的目标是实现一种在全行业广泛使用的标准消息中间件技术,以便降低企业和系统集成的开销,并且向大众提供工业级的集成服务。
我们的宗旨是通过AMQP,让消息中间件的能力最终被网络本身所具有,并且通过消息中间件的广泛使用发展出一系列有用的应用程序。
1.3.3. AMQP的范围
为了完全实现消息中间件的互操作性,需要充分定义网络协议和消息代理服务的功能语义。
因此,AMQP定义网络协议和代理服务如下:
一套确定的消息交换功能,也就是“高级消息交换协议模型”。AMQP模型包括一套用于路由和存储消息的功能模块,以及一套在这些模块之间交换消息的规则。
一个网络线级协议(数据传输格式),客户端应用可以通过这个协议与消息代理和它实现的AMQP模型进行交互通信。
可以只实现AMQP协议规范中的的部分语义,但是我们相信明确的描述这些语义有助于理解这个协议。

相关协议

编辑
1.3.4.1. AMQP模型
我们需要明确的定义服务器的语义,因为所有服务器实现都应该保持这些语义的一致性,否则就无法进行互操作。
因此AMQP模型描述了一套模块化的组件以及这些组件之间进行连接的标准规则。
在服务器中,三个主要功能模块连接成一个处理链完成预期的功能:
“exchange”接收发布应用程序发送的消息,并根据一定的规则将这些消息路由到“消息队列”。
“message queue”存储消息,直到这些消息被消费者安全处理完为止。
“binding”定义了exchange和message queue之间的关联,提供路由规则。
使用这个模型我们可以很容易的模拟出存储转发队列和主题订阅这些典型的消息中间件概念。
一个AMQP服务器类似于邮件服务器,exchange类似于消息传输代理(email里的概念),message queue类似于邮箱。Binding定义了每一个传输代理中的消息路由表,发布者将消息发给特定的传输代理,然后传输代理将这些消息路由到邮箱中,消费者从这些邮箱中取出消息。
在以前的中间件系统的应用场景中,发布者直接将消息发送给邮箱或者邮件列表。
区别就在于用户可以控制message queue与exchage的连接规则,这可以做很多有趣的事情,比如定义一条规则:“将所有包含这样这样的消息头的消息都复制一份再发送到消息队列中”。
AMQP模型有以下目标:
支持金融服务领域的语义要求。
支持金融服务领域所要求的性能要求。
能够很方便的扩展新的消息路由和队列。
通过AMQP协议(AMQP和AMQP Protocol的是整体和部分的关系),服务器应用可以通过编程的方式来实现具体的功能语义。
简单而灵活。
1.3.4.2. AMQP协议
AMQP协议是一个二进制协议,拥有一些现代特点:多信道、协商式、异步、安全、跨平台、中立、高效。
AMQP通常被划分为三层:
模型层定义了一套命令(按功能分类),客户端应用可以利用这些命令来实现它的业务功能。
会话层负责将命令从客户端应用传递给服务器,再将服务器的应答传递给客户端应用,会话层为这个传递过程提供可靠性、同步机制和错误处理。
传输层提供帧处理、信道复用、错误检测和数据表示。
实现者可以将传输层替换成任意传输协议,只要不改变AMQP协议中与客户端应用程序相关的功能。实现者还可以使用其他高层协议中的会话层。
AMQP模型的设计由以下几个需求所驱动:
保证遵从AMQP规范的服务器实现之间能够进行互操作。
为服务质量提供显示控制。
支持所有消息中间件的功能:消息交换、文件传输、流传输、远程进程调用等。
兼容已有的消息API规范(比如Sun公司的JMS规范)。
形成一致和明确的命名。
通过AMQP协议可以完整的配置服务器线路(TODO:server wiring是啥意思?答:服务器连接)。
使用命令符号可以很容易的映射成应用级别的API。
明确定义每一个操作只做一件事情。
AMQP传输层的设计由以下几个主要的需求所驱动,这些需求不分先后次序:
使用能够快速打包解包的二进制编码来保证数据的紧凑性。
能够处理任意大小的消息。
允许零拷贝数据传输(比如远程DMA)。
一个连接支持多个会话。
保证会话能够从网络错误、服务器失效中恢复。
为了长期存在,没有隐含的内置限制(TODO:To be long-lived,with no significant in-built limitations)。
异步传输消息。
能够很容易的处理新的和变化的需求。
高版本的AMQP规范能够兼容低版本的规范。
使用强断言模型来保证应用程序的可修复性。
保持编程语言的中立性。
适宜使用代码生成工具生成协议处理模块。

功能范围

编辑
我们支持各种消息交换的体系结构:
存储转发(多个消息发送者,单个消息接收者)。
分布式事务(多个消息发送者,多个消息接收者)。
发布订阅(多个消息发送者,多个消息接收者)。
基于内容的路由(多个消息发送者,多个消息接收者)。
文件传输队列(多个消息发送者,多个消息接收者)。
点对点连接(单个消息发送者,单个消息接收者)。

文档结构

编辑
本文档分成两个部分:
“概念”部分将对AMQP的概念做一个简单的介绍,描述AMQP怎么工作,以及AMQP的用途。
“标准”部分将对AMQP的模型层、会话层的每个组成部分做精确的定义,还将定义AMQP在网络上传输的二进制消息结构。
我们用IETF RFC2119中的术语定义:必须、不必、应该、不应该和可以(详见参考资料 [1]  )。
当我们讨论遵从AMQP规范的服务器的具体行为时,我们使用术语“服务器”来表示这些服务器。
当我们讨论遵从AMQP规范的客户端应用的具体行为时,我们使用术语“客户端”来表示这些客户端应用。
我们使用“端点”来表示“服务器或者客户端”。
除非另有说明,所有数字都是十进制的。
协议中的常量都用大写字母的名字来表示。AMQP的实现如果需要在代码或者文档中定义和使用这些常量,必须用这些名字来表示。
属性名、命令或者控制参数,以及帧字段都用小写字母的名字来表示。AMQP的实现必须在代码或者文档中与之保持一致。
1.5. 约定
1.5.1. 定义
1.5.2. 版本号
AMQP版本用两个版本号表示——主版本号和次版本号。我们约定版本由主版本号后面加小数点再加上次版本号组成(比如1-3表示主版本号为1,次版本号为3)。
主版本号和次版本号可以用0到255之内的所有值。
主版本号保持不变,次版本号递增。当AMQP工作组提升主版本号时,次版本号将被设置为0。因此,有可能出现这样的版本序列:1-2,1-3,1-4,2-0,2-1……
一旦本协议发布之后(主版本号大于1),应尽量防止次版本号递增到9。不过在发布之前(版本0-x),由于会对本协议进行频繁的修订,可以不遵守这条约定。
一旦本协议发布之后(主版本号大于1),同一个主版本不同次版本的实现必须向后兼容。而在发布之前,这些次版本的实现不需要兼容。
大于或者等于99的主版本号用于测试和开发目的。

技术术语

编辑
AMQP模型(AMQP Model):一个由关键实体和语义表示的逻辑框架,遵从AMQP规范的服务器必须提供这些实体和语义。为了实现本规范中定义的语义,客户端可以发送命令来控制AMQP服务器。
连接(Connection):一个网络连接,比如TCP/IP套接字连接。
会话(Session):端点之间的命名对话。在一个会话上下文中,保证“恰好传递一次”。
信道(Channel):多路复用连接中的一条独立的双向数据流通道。为会话提供物理传输介质。
客户端(Client):AMQP连接或者会话的发起者。AMQP是非对称的,客户端生产和消费消息,服务器存储和路由这些消息。
服务器(Server):接受客户端连接,实现AMQP消息队列和路由功能的进程。也称为“消息代理”。
端点(Peer):AMQP对话的任意一方。一个AMQP连接包括两个端点(一个是客户端,一个是服务器)。
搭档(Partner):当描述两个端点之间的交互过程时,使用术语“搭档”来表示“另一个”端点的简记法。比如我们定义端点A和端点B,当它们进行通信时,端点B是端点A的搭档,端点A是端点B的搭档。
片段集(Assembly):段的有序集合,形成一个逻辑工作单元。
段(Segment):帧的有序集合,形成片段集中一个完整子单元。
帧(Frame):AMQP传输的一个原子单元。一个帧是一个段中的任意分片。
控制(Control):单向指令,AMQP规范假设这些指令的传输是不可靠的。
命令(Command):需要确认的指令,AMQP规范规定这些指令的传输是可靠的。
异常(Exception):在执行一个或者多个命令时可能发生的错误状态。
类(Class):一批用来描述某种特定功能的AMQP命令或者控制。
消息头(Header):描述消息数据属性的一种特殊段。
消息体(Body):包含应用程序数据的一种特殊段。消息体段对于服务器来说完全透明——服务器不能查看或者修改消息体。
消息内容(Content):包含在消息体段中的的消息数据。
交换器(Exchange):服务器中的实体,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
交换器类型(Exchange Type):基于不同路由语义的交换器类。
消息队列(Message Queue):一个命名实体,用来保存消息直到发送给消费者。
绑定器(Binding):消息队列和交换器之间的关联。
绑定器关键字(Binding Key):绑定的名称。一些交换器类型可能使用这个名称作为定义绑定器路由行为的模式。
路由关键字(Routing Key):一个消息头,交换器可以用这个消息头决定如何路由某条消息。
持久存储(Durable):一种服务器资源,当服务器重启时,保存的消息数据不会丢失。
临时存储(Transient):一种服务器资源,当服务器重启时,保存的消息数据会丢失。
持久化(Persistent):服务器将消息保存在可靠磁盘存储中,当服务器重启时,消息不会丢失。
非持久化(Non-Persistent):服务器将消息保存在内存中,当服务器重启时,消息可能丢失。
消费者(Consumer):一个从消息队列中请求消息的客户端应用程序。
生产者(Producer):一个向交换器发布消息的客户端应用程序。
虚拟主机(Virtual Host):一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。客户端应用程序在登录到服务器之后,可以选择一个虚拟主机。
下面这些术语在AMQP规范的上下文中没有特别的意义:
主题:通常指发布消息;AMQP规范用一种或多种交换器来实现主题。
服务:通常等同于服务器。AMQP规范使用“服务器”这个术语来兼容IETF的标准术语,并且明确了协议中每个部分的角色(两方也可能是AMQP服务)。
消息代理:等同于服务器。AMQP规范使用术语“客户端”和“服务器”来兼容IETF的标准术语。
 
 

RabbitMQ libraries

RabbitMQ speaks multiple protocols. This tutorial uses AMQP 0-9-1, which is an open, general-purpose protocol for messaging. There are a number of clients for RabbitMQ in many different languages. In this tutorial series we're going to use Pika 1.0.0, which is the Python client recommended by the RabbitMQ team. 

 

In RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange. But let's not get dragged down by the details ‒ you can read more about exchanges in the third part of this tutorial. All we need to know now is how to use a default exchange identified by an empty string. This exchange is special ‒ it allows us to specify exactly to which queue the message should go. The queue name needs to be specified in the routing_key parameter:

channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello World!')
print(" [x] Sent 'Hello World!'")

Before exiting the program we need to make sure the network buffers were flushed and our message was actually delivered to RabbitMQ. We can do it by gently closing the connection.

connection.close()

 

消息不能直接发送至队列,需要通过exchange。

初始默认的exchange ""  Exchange: (AMQP default)  

 

publish/subscrib  发布订阅

同一个消息多个消费者场景

https://www.rabbitmq.com/tutorials/tutorial-three-python.html

What This Tutorial Focuses On

In the previous tutorial we created a work queue. The assumption behind a work queue is that each task is delivered to exactly one worker. In this part we'll do something completely different -- we'll deliver a message to multiple consumers. This pattern is known as "publish/subscribe".

To illustrate the pattern, we're going to build a simple logging system. It will consist of two programs -- the first will emit log messages and the second will receive and print them.

In our logging system every running copy of the receiver program will get the messages. That way we'll be able to run one receiver and direct the logs to disk; and at the same time we'll be able to run another receiver and see the logs on the screen.

Essentially, published log messages are going to be broadcast to all the receivers.

 

 发布者不知道队列queue ,只知道交易所 exchange,exchange决定发至哪个队列

The core idea in the messaging model in RabbitMQ is that the producer never sends any messages directly to a queue. Actually, quite often the producer doesn't even know if a message will be delivered to any queue at all.

 Instead, the producer can only send messages to an exchange. An exchange is a very simple thing. On one side it receives messages from producers and the other side it pushes them to queues. The exchange must know exactly what to do with a message it receives. Should it be appended to a particular queue? Should it be appended to many queues? Or should it get discarded. The rules for that are defined by the exchange type.

There are a few exchange types available: direct, topic, headers and fanout. We'll focus on the last one -- the fanout. Let's create an exchange of that type, and call it logs:

channel.exchange_declare(exchange='logs',
                         exchange_type='fanout')

The fanout exchange is very simple. As you can probably guess from the name, it just broadcasts all the messages it receives to all the queues it knows. And that's exactly what we need for our logger.

数据多写  交易所类型  

exchange_type='fanout'

 注意

 

Add binding from this exchange

 交易所exchange可以绑定交易所或队列queue

 

创建顺序

NOT_FOUND - no queue 'queue_19_ehr_analysis_log' in vhost '/' 

Close

 

并行/发 创建 exchange queue

之后做绑定

 

 

 

Exchange: exchange_19_ehr_analysis_log

This exchange

ToRouting keyArguments 
queue_19_ehr_analysis_log routing_key_19_ehr_analysis_log  

 

 

'''
exchange_19_ehr_analysis_log
Details
Type direct
Parameters
durable: true
Policy

注意双写/多写Type可以改为 fanout

queue_19_ehr_analysis_log 配置
Parameters
durable: true
Policy
Exclusive owner None

路由关系
From Routing key Arguments
(Default exchange binding)
exchange_19_ehr_analysis_log
routing_key_19_ehr_analysis_log


This queue

'''

 

#!/usr/bin/env python
# coding: utf-8
import pika
import time

RABBITMQ_DEV_SERVER_ANALYSI = '6.7.9.65'

username, password = "guest", "guest"
host, port = RABBITMQ_DEV_SERVER_ANALYSI, 5672
virtual_host = None
"""A credentials object for the default authentication methodology with
RabbitMQ.

If you do not pass in credentials to the ConnectionParameters object, it
will create credentials for 'guest' with the password of 'guest'.

If you pass True to erase_on_connect the credentials will not be stored
in memory after the Connection attempt has been made.

:param str username: The username to authenticate with
:param str password: The password to authenticate with
:param bool erase_on_connect: erase credentials on connect.

"""
mq_user = pika.PlainCredentials(username=username, password=password)
"""Connection parameters object that is passed into the connection adapter
upon construction.

:param str host: Hostname or IP Address to connect to
:param int port: TCP port to connect to
:param str virtual_host: RabbitMQ virtual host to use
:param pika.credentials.Credentials credentials: auth credentials
:param int channel_max: Maximum number of channels to allow
:param int frame_max: The maximum byte size for an AMQP frame
:param int heartbeat_interval: How often to send heartbeats
:param bool ssl: Enable SSL
:param dict ssl_options: Arguments passed to ssl.wrap_socket as
:param int connection_attempts: Maximum number of retry attempts
:param int|float retry_delay: Time to wait in seconds, before the next
:param int|float socket_timeout: Use for high latency networks
:param str locale: Set the locale value
:param bool backpressure_detection: Toggle backpressure detection

"""
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host=host,
    port=port,
    virtual_host=virtual_host,
    credentials=mq_user
))
channel = connection.channel()

analysi_task = {"queue": 'queue_19_ehr_analysis_log', 'exchange': 'exchange_19_ehr_analysis_log',
                'routing_key': 'routing_key_19_ehr_analysis_log'}  # 日志接口系统配置
'''
exchange_19_ehr_analysis_log
Details
Type    direct
Parameters    
durable:    true
Policy    

注意双写/多写Type可以改为 fanout

queue_19_ehr_analysis_log 配置
Parameters    
durable:    true
Policy    
Exclusive owner    None

路由关系
From    Routing key    Arguments    
(Default exchange binding)
exchange_19_ehr_analysis_log
routing_key_19_ehr_analysis_log    
⇓

This queue

'''

exchange, routing_key, queue = analysi_task["exchange"], analysi_task["routing_key"], analysi_task["queue"]
"""Declare queue, create if needed. This method creates or checks a
queue. When creating a new queue the client can specify various
properties that control the durability of the queue and its contents,
and the level of sharing for the queue.

Leave the queue name empty for a auto-named queue in RabbitMQ

:param method callback: The method to call on Queue.DeclareOk
:param queue: The queue name
:type queue: str or unicode
:param bool passive: Only check to see if the queue exists
:param bool durable: Survive reboots of the broker
:param bool exclusive: Only allow access by the current connection
:param bool auto_delete: Delete after consumer cancels or disconnects
:param bool nowait: Do not wait for a Queue.DeclareOk
:param dict arguments: Custom key/value arguments for the queue

"""
msgBody = "HelloWorld{}-UCAN".format(time.ctime())

channel.basic_publish(exchange=exchange, routing_key=routing_key, body=msgBody)
channel.basic_publish(exchange="", routing_key=routing_key, body=msgBody)

'''
https://www.rabbitmq.com/tutorials/tutorial-one-python.html
The core idea in the messaging model in RabbitMQ is that the producer never sends any messages directly to a queue. Actually, quite often the producer doesn't even know if a message will be delivered to any queue at all.

Instead, the producer can only send messages to an exchange. An exchange is a very simple thing. On one side it receives messages from producers and the other side it pushes them to queues. The exchange must know exactly what to do with a message it receives. Should it be appended to a particular queue? Should it be appended to many queues? Or should it get discarded. The rules for that are defined by the exchange type.
发布者不知道队列queue
'''
connection.close()

if False:
    # https://www.rabbitmq.com/uri-spec.html
    BROKER_URL = 'amqp://guest:guest@%s:5672/' % RABBITMQ_DEV_SERVER_ANALYSI  # 配置 rabbitmq
    params = pika.URLParameters(BROKER_URL)
    params.socket_timeout = 5
    connection = pika.BlockingConnection(params)

    msgBody = "HW----HelloWorld{}-UCAN--".format(time.ctime())
    channel = connection.channel()
    channel.basic_publish(exchange=exchange, routing_key=routing_key, body=msgBody)
    connection.close()

 

 

requeue  yes 不从队列删除

 

 






发表评论

0/200
313 点赞
0 评论
收藏