How to copy messages to different RabbitMQ node

Copy messages to different RabbitMQ node using federation plugin.

Preliminary information

I will copy messages between reindeer and raccoon RabbitMQ nodes.

Install RabbitMQ server on every node

Install RabbitMQ message broker on every node.

$ sudo apt update
$ sudo apt install gnupg2 apt-transport-https curl

$ curl https://dl.bintray.com/rabbitmq/Keys/rabbitmq-release-signing-key.asc | sudo apt-key add -

$ echo "deb https://dl.bintray.com/rabbitmq/debian stretch main"              | sudo tee    /etc/apt/sources.list.d/rabbitmq.list
$ echo "deb https://dl.bintray.com/rabbitmq-erlang/debian buster erlang-22.x" | sudo tee -a /etc/apt/sources.list.d/rabbitmq.list 

$ apt update
$ apt install rabbitmq-server

Use rabbitmq-diagnostics to display RabbitMQ version.

$ sudo rabbitmq-diagnostics server_version
Asking node [email protected] for its RabbitMQ version...
3.8.0

Enable management plugin.

$ sudo rabbitmq-plugins enable rabbitmq_management

Define admin user with password password.

.
$ sudo rabbitmqctl add_user admin password
$ sudo rabbitmqctl set_user_tags admin administrator
$ sudo rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"

Delete guest user.

$ sudo rabbitmqctl delete_user guest

Get rabbitmqadmin utility.

$ curl -o rabbitmqadmin http://127.0.0.1:15672/cli/rabbitmqadmin 

Ensure that executable bit is set.

$ chmod +x rabbitmqadmin 

Setup the dedicated vhost and user

Setup dedicated vhost and user on both nodes.

Create federation_example virtualhost and ensure that admin can manage it.

$ sudo rabbitmqctl add_vhost federation_example
$ sudo rabbitmqctl set_permissions -p federation_example admin ".*" ".*" ".*" 

Declare federation_example user with password password, grant permissions to federation_example virtual host.

$ sudo rabbitmqctl add_user federation_example password
$ sudo rabbitmqctl set_permissions federation_example --vhost federation_example ".*" ".*" ".*"
Notice, federation_example is not a management user.

Setup exchanges

Setup exchanges, queues and bindings on both nodes.

Declare exchange.

$ ./rabbitmqadmin --username admin --password password --vhost federation_example declare exchange name=directives type=fanout

Declare queue.

$ ./rabbitmqadmin --username admin --password password --vhost federation_example declare queue name=application-directives

Declare bindings.

$ ./rabbitmqadmin --username admin --password password --vhost federation_example declare binding source=directives destination=application-directives

Setup federation plugin on the first RabbitMQ node

Setup federation plugin on the first RabbitMQ node - reindeer.

Enable federation plugin.

$ sudo rabbitmq-plugins enable rabbitmq_federation

Define upstream RabbitMQ server.

$ sudo rabbitmqctl set_parameter federation-upstream raccoon-upstream --vhost federation_example '{"uri":"amqp://federation_example:[email protected]"}'

Define upstream set.

$ sudo rabbitmqctl set_parameter  --vhost federation_example federation-upstream-set raccoon '[{"upstream":"raccoon-upstream"}]'

Define federation policy.

$ sudo rabbitmqctl set_policy federate-directives --vhost federation_example directives '{"federation-upstream-set":"raccoon"}' --priority 1 --apply-to exchanges

Display defined parameters.

$ sudo rabbitmqctl list_parameters -p federation_example 
Listing runtime parameters for vhost "federation_example" ...
component       name    value
federation-upstream     raccoon-upstream        {"uri":"amqp://federation_example:[email protected]"}
federation-upstream-set raccoon [{"upstream":"raccoon-upstream"}]

Check federation status.

$ sudo rabbitmqctl federation_status
Listing federation links on node [email protected]
[#{error => <<>>,exchange => <<"directives">>,id => <<"2d543d9d">>,
   last_changed => <<"2019-10-22 21:41:08">>,
   local_connection => <<"<[email protected]>">>,queue => <<>>,
   status => running,type => exchange,upstream => <<"raccoon-upstream">>,
   upstream_exchange => <<"directives">>,upstream_queue => <<>>,
   uri => <<"amqp://raccoon">>,vhost => <<"federation_example">>}]
{%raw%}

Setup federation plugin on the second RabbitMQ node

Setup federation plugin on the second RabbitMQ node - raccoon.

Enable federation plugin.

$ sudo rabbitmq-plugins enable rabbitmq_federation

Define upstream RabbitMQ server.

$ sudo rabbitmqctl set_parameter federation-upstream reindeer-upstream --vhost federation_example '{"uri":"amqp://federation_example:[email protected]"}'

Define upstream set.

$ sudo rabbitmqctl set_parameter  --vhost federation_example federation-upstream-set reindeer '[{"upstream":"reindeer-upstream"}]'

Define federation policy.

$ sudo rabbitmqctl set_policy federate-directives --vhost federation_example directives '{"federation-upstream-set":"reindeer"}' --priority 1 --apply-to exchanges

Display defined parameters.

$ sudo rabbitmqctl list_parameters -p federation_example 
Listing runtime parameters for vhost "federation_example" ...
component       name    value
federation-upstream     reindeer-upstream       {"uri":"amqp://federation_example:[email protected]"}
federation-upstream-set reindeer        [{"upstream":"reindeer-upstream"}]

Check federation status.

{%raw%}
$ sudo rabbitmqctl federation_status
Listing federation links on node [email protected]
[#{error => <<>>,exchange => <<"directives">>,id => <<"58c6f033">>,
   last_changed => <<"2019-10-22 21:39:46">>,
   local_connection => <<"<[email protected]>">>,queue => <<>>,
   status => running,type => exchange,upstream => <<"reindeer-upstream">>,
   upstream_exchange => <<"directives">>,upstream_queue => <<>>,
   uri => <<"amqp://reindeer">>,vhost => <<"federation_example">>}]
{%raw%}

Publish messages and inspect queues

Publish message on the first RabbitMQ node - reindeer.

$ ./rabbitmqadmin --username admin --password password --host reindeer --vhost federation_example publish exchange=directives routing_key= payload="sample application directive"

Inspect queues on the first RabbitMQ node.

$ ./rabbitmqadmin --username admin --password password --host reindeer --vhost federation_example list queues vhost name node messages
+--------------------+------------------------------------------+-----------------+----------+
|       vhost        |                   name                   |      node       | messages |
+--------------------+------------------------------------------+-----------------+----------+
| federation_example | application-directives                   | [email protected] | 1        |
| federation_example | federation: directives -> [email protected] | [email protected] | 0        |
+--------------------+------------------------------------------+-----------------+----------+
Notice, using exchange to federate messages in this example will create additional queue to consume these.

Inspect queues on the second RabbitMQ node.

$ ./rabbitmqadmin --username admin --password password --host raccoon --vhost federation_example list queues vhost name node messages
+--------------------+-------------------------------------------+----------------+----------+
|       vhost        |                   name                    |      node      | messages |
+--------------------+-------------------------------------------+----------------+----------+
| federation_example | application-directives                    | [email protected] | 1        |
| federation_example | federation: directives -> [email protected] | [email protected] | 0        |
+--------------------+-------------------------------------------+----------------+----------+

Publish messages on the second RabbitMQ node - raccoon.

$ ./rabbitmqadmin --username admin --password password --host raccoon --vhost federation_example publish exchange=directives routing_key= payload="sample application directive"

Inspect queues on the first RabbitMQ node.

$ ./rabbitmqadmin --username admin --password password --host reindeer --vhost federation_example list queues vhost name node messages
+--------------------+------------------------------------------+-----------------+----------+
|       vhost        |                   name                   |      node       | messages |
+--------------------+------------------------------------------+-----------------+----------+
| federation_example | application-directives                   | [email protected] | 2        |
| federation_example | federation: directives -> [email protected] | [email protected] | 0        |
+--------------------+------------------------------------------+-----------------+----------+

Inspect queues on the second RabbitMQ node.

$ ./rabbitmqadmin --username admin --password password --host raccoon --vhost federation_example list queues vhost name node messages
+--------------------+-------------------------------------------+----------------+----------+
|       vhost        |                   name                    |      node      | messages |
+--------------------+-------------------------------------------+----------------+----------+
| federation_example | application-directives                    | [email protected] | 2        |
| federation_example | federation: directives -> [email protected] | [email protected] | 0        |
+--------------------+-------------------------------------------+----------------+----------+

Consume a message.

$ ./rabbitmqadmin --username admin --password password --host reindeer --vhost federation_example get  queue=application-directives ackmode=ack_requeue_false
+-------------+------------+---------------+------------------------------+---------------+------------------+------------+-------------+
| routing_key |  exchange  | message_count |           payload            | payload_bytes | payload_encoding | properties | redelivered |
+-------------+------------+---------------+------------------------------+---------------+------------------+------------+-------------+
|             | directives | 1             | sample application directive | 28            | string           |            | True        |
+-------------+------------+---------------+------------------------------+---------------+------------------+------------+-------------+

Inspect queues on the first RabbitMQ node.

$ ./rabbitmqadmin --username admin --password password --host reindeer --vhost federation_example list queues vhost name node messages
+--------------------+------------------------------------------+-----------------+----------+
|       vhost        |                   name                   |      node       | messages |
+--------------------+------------------------------------------+-----------------+----------+
| federation_example | application-directives                   | [email protected] | 1        |
| federation_example | federation: directives -> [email protected] | [email protected] | 0        |
+--------------------+------------------------------------------+-----------------+----------+

Inspect queues on the second RabbitMQ node.

$ ./rabbitmqadmin --username admin --password password --host raccoon --vhost federation_example list queues vhost name node messages
+--------------------+-------------------------------------------+----------------+----------+
|       vhost        |                   name                    |      node      | messages |
+--------------------+-------------------------------------------+----------------+----------+
| federation_example | application-directives                    | [email protected] | 2        |
| federation_example | federation: directives -> [email protected] | [email protected] | 0        |
+--------------------+-------------------------------------------+----------------+----------+

Configuration

$ sudo apt install jq

Export upstream configuration.

$ ./rabbitmqadmin --username admin --password password --host reindeer export config-reindeer.json
$ cat config-reindeer.json | jq .
{                                                                                                                                                                
  "rabbit_version": "3.8.0",                                                                                                                                     
  "users": [                                                                                                                                                     
    {                                                                                                                                                            
      "name": "admin",                                                                                                                                           
      "password_hash": "e/fHqe5PFXC/m68fcBa8W612ko9Jg5Tlo6L6cjiN/jGqXbXA",                                                                                       
      "hashing_algorithm": "rabbit_password_hashing_sha256",                                                                                                     
      "tags": "administrator"                                                                                                                                    
    },                                                                                                                                                           
    {                                                                                                                                                            
      "name": "federation_example",                                                                                                                              
      "password_hash": "p+bEErHIP5+hdD4Vw66GK+8EsR1VkVCg4hdNfm4Sj03k55pa",                                                                                       
      "hashing_algorithm": "rabbit_password_hashing_sha256",                                                                                                     
      "tags": ""                                                                                                                                                 
    }                                                                                                                                                            
  ],                                                                                                                                                             
  "vhosts": [                                                                                                                                                    
    {                                                                                                                                                            
      "name": "/"                                                                                                                                                
    },                                                                                                                                                           
    {                                                                                                                                                            
      "name": "federation_example"                                                                                                                               
    }                                                                                                                                                            
  ],                                                                                                                                                             
  "permissions": [                                                                                                                                               
    {                                                                                                                                                            
      "user": "federation_example",                                                                                                                              
      "vhost": "federation_example",                                                                                                                             
      "configure": ".*",                                                                                                                                         
      "write": ".*",                                                                                                                                             
      "read": ".*"                                                                                                                                               
    },                                                                                                                                                           
    {                                                                                                                                                            
      "user": "admin",                                                                                                                                           
      "vhost": "/",                                                                                                                                              
      "configure": ".*",                                                                                                                                         
      "write": ".*",                                                                                                                                             
      "read": ".*"                                                                                                                                               
    },                                                                                                                                                           
    {                                                                                                                                                            
      "user": "admin",                                                                                                                                           
      "vhost": "federation_example",                                                                                                                             
      "configure": ".*",                                                                                                                                         
      "write": ".*",                                                                                                                                             
      "read": ".*"                                                                                                                                               
    }                                                                                                                                                            
  ],
  "topic_permissions": [],
  "parameters": [                                                                                                                                                
    {
      "value": {
        "uri": "amqp://federation_example:[email protected]"
      },
      "vhost": "federation_example",
      "component": "federation-upstream",
      "name": "raccoon-upstream"
    },
    {
      "value": [
        {
          "upstream": "raccoon-upstream"
        }
      ],
      "vhost": "federation_example",
      "component": "federation-upstream-set",
      "name": "raccoon"
    }
  ],
  "global_parameters": [
    {
      "name": "cluster_name",
      "value": "[email protected]"
    }
  ],
  "policies": [
    {
      "vhost": "federation_example",
      "name": "federate-directives",
      "pattern": "directives",
      "apply-to": "exchanges",
      "definition": {
        "federation-upstream-set": "raccoon"
      },
      "priority": 1
    }
  ],
  "queues": [
    {
      "name": "application-directives",
      "vhost": "federation_example",
      "durable": true,
      "auto_delete": false,
      "arguments": {}
    }
  ],
  "exchanges": [
    {
      "name": "directives",
      "vhost": "federation_example",
      "type": "fanout",
      "durable": true,
      "auto_delete": false,
      "internal": false,
      "arguments": {}
    }
  ],
  "bindings": [
    {
      "source": "directives",
      "vhost": "federation_example",
      "destination": "application-directives",
      "destination_type": "queue",
      "routing_key": "",
      "arguments": {}
    }
  ]
}

Clustering

Define multiple upstream addresses when you connect to a cluster.

$ sudo rabbitmqctl set_parameter federation-upstream cluster-upstream --vhost federation_example '{"uri":["amqp://federation_example:[email protected]","amqp://federation_example:[email protected]","amqp://federation_example:[email protected]"]}' 
Setting runtime parameter "cluster-upstream" for component "federation-upstream" to "{"uri":["amqp://federation_example:[email protected]","amqp://federation_example:[email protected]","amqp://federation_example:[email protected]"]}" in vhost "federation_example" ...

Remember to also mirror ^federation queues, so the downstream can safely reconnect to other cluster nodes.

$ sudo rabbitmqctl set_policy -p federation_example mirror-directives "^application-directives$" '{"ha-mode": "all", "ha-sync-mode":"automatic"}'
Setting policy "mirror-directives" for pattern "^application-directives" to "{"ha-mode": "all", "ha-sync-mode":"automatic"}" with priority "0" for vhost "federation_example" ...
$ sudo rabbitmqctl set_policy -p federation_example mirror-federation "^federation" '{"ha-mode": "all", "ha-sync-mode":"automatic"}'
Setting policy "mirror-federation" for pattern "^federation" to "{"ha-mode": "all", "ha-sync-mode":"automatic"}" with priority "0" for vhost "federation_example" ...

You will get the following error when you forget to mirror dynamically created queues used for federation purposes.

Error detail:

{server_initiated_close,404,
                        <<"NOT_FOUND - home node '[email protected]' of durable queue 'federation: directives -> [email protected]' in vhost 'federation_example' is down or inaccessible">>}

You can restart federation link in case of trouble.

$ sudo rabbitmqctl eval 'rabbit_federation_status:status().' 
[[{exchange,<<"directives">>},
  {upstream_exchange,<<"directives">>},
  {type,exchange},
  {vhost,<<"federation_example">>},
  {upstream,<<"reindeer-upstream">>},
  {id,<<"58c6f033">>},
  {status,running},
  {local_connection,<<"<[email protected]>">>},
  {uri,<<"amqp://fox">>},
  {timestamp,{{2019,10,23},{22,8,39}}}]]
$ sudo rabbitmqctl restart_federation_link 58c6f033
Restarting federation link 58c6f033 on node [email protected]
$ sudo rabbitmqctl eval 'rabbit_federation_status:status().' 
[[{exchange,<<"directives">>},
  {upstream_exchange,<<"directives">>},
  {type,exchange},
  {vhost,<<"federation_example">>},
  {upstream,<<"reindeer-upstream">>},
  {id,<<"58c6f033">>},
  {status,running},
  {local_connection,<<"<[email protected]>">>},
  {uri,<<"amqp://falcon">>},
  {timestamp,{{2019,10,23},{22,8,56}}}]]

Additional notes

Enable rabbitmq_federation_management to define parameters and inspect plugin status using web-gui.

$ sudo rabbitmq-plugins enable rabbitmq_federation_management
Enabling plugins on node [email protected]:
rabbitmq_federation_management
The following plugins have been configured:
  rabbitmq_federation
  rabbitmq_federation_management
  rabbitmq_management
  rabbitmq_management_agent
  rabbitmq_web_dispatch
Applying plugin configuration to [email protected]
The following plugins have been enabled:
  rabbitmq_federation_management

started 1 plugins.

References

Federation Plugin