How to move messages to different RabbitMQ node

Move messages to different RabbitMQ node using shovel plugin which acts as a simple client.

Preliminary information

I will use reindeer and raccoon RabbitMQ nodes to move messages from the former to the latter.

Install RabbitMQ server on every node

Install RabbitMQ message broker on every node.

$ sudo apt update
$ sudo apt install gnupg2 apt-transport-https curl

$ curl https://dl.bintray.com/rabbitmq/Keys/rabbitmq-release-signing-key.asc | sudo apt-key add -

$ echo "deb https://dl.bintray.com/rabbitmq/debian stretch main"              | sudo tee    /etc/apt/sources.list.d/rabbitmq.list
$ echo "deb https://dl.bintray.com/rabbitmq-erlang/debian buster erlang-22.x" | sudo tee -a /etc/apt/sources.list.d/rabbitmq.list 

$ apt update
$ apt install rabbitmq-server

Use rabbitmq-diagnostics to display RabbitMQ version.

$ sudo rabbitmq-diagnostics server_version
Asking node [email protected] for its RabbitMQ version...
3.8.0

Enable management plugin.

$ sudo rabbitmq-plugins enable rabbitmq_management

Define admin user with password password.

.
$ sudo rabbitmqctl add_user admin password
$ sudo rabbitmqctl set_user_tags admin administrator
$ sudo rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"

Delete guest user.

$ sudo rabbitmqctl delete_user guest

Get rabbitmqadmin utility.

$ curl -o rabbitmqadmin http://127.0.0.1:15672/cli/rabbitmqadmin 

Ensure that executable bit is set.

$ chmod +x rabbitmqadmin 

Create shovel_example virtualhost and ensure that admin can manage it.

$ sudo rabbitmqctl add_vhost shovel_example
$ sudo rabbitmqctl set_permissions -p shovel_example admin ".*" ".*" ".*" 

Setup the downstream RabbitMQ node

Setup the downstream RabbitMQ node - raccoon.

Declare shovel_example user with password password, grant permissions to shovel_example virtual host.

$ sudo rabbitmqctl add_user shovel_example password
$ sudo rabbitmqctl set_permissions shovel_example --vhost shovel_example ".*" ".*" ".*"
Notice, shovel_example is not a management user.

Declare exchange.

$ ./rabbitmqadmin --username admin --password password --vhost shovel_example declare exchange name=directives type=fanout

Declare queue.

$ ./rabbitmqadmin --username admin --password password --vhost shovel_example declare queue name=application-directives

Declare bindings.

$ ./rabbitmqadmin --username admin --password password --vhost shovel_example declare binding source=directives destination=application-directives

Setup the upstream RabbitMQ node

Setup the upstream RabbitMQ node - reindeer.

Declare first exchange.

$ ./rabbitmqadmin --username admin --password password --vhost shovel_example declare exchange name=messages type=fanout

Declare first queue.

$ ./rabbitmqadmin --username admin --password password --vhost shovel_example declare queue name=application-messages

Declare first bindings.

$ ./rabbitmqadmin --username admin --password password --vhost shovel_example declare binding source=messages destination=application-messages

Declare second exchange.

$ ./rabbitmqadmin --username admin --password password --vhost shovel_example declare exchange name=directives type=fanout

Declare second queue.

$ ./rabbitmqadmin --username admin --password password --vhost shovel_example declare queue name=application-directives

Declare second bindings.

$ ./rabbitmqadmin --username admin --password password --vhost shovel_example declare binding source=directives destination=application-directives

Setup shovel plugin on the upstream RabbitMQ node

Setup shovel plugin on the upstream RabbitMQ node - reindeer.

Enable shovel plugin.

$ sudo rabbitmq-plugins enable rabbitmq_shovel

Use shovel plugin to move messages from specific queue.

$ sudo rabbitmqctl set_parameter -p shovel_example shovel shovel-queue  '{"src-protocol": "amqp091", "src-uri": "amqp:///shovel_example", "src-queue": "application-messages", "dest-protocol": "amqp091", "dest-uri": "amqp://shovel_example:[email protected]/shovel_example", "dest-queue": "application-messages", "dest-add-forward-headers": true, "dest-add-timestamp-header": true}'

Use shovel plugin to move messages from specific exchange.

$ sudo rabbitmqctl set_parameter -p shovel_example shovel shovel-exchange '{"src-protocol": "amqp091", "src-uri": "amqp:///shovel_example", "src-exchange": "directives", "dest-protocol": "amqp091", "dest-uri": "amqp://shovel_example:[email protected]/shovel_example", "dest-exchange": "directives", "dest-add-forward-headers": true, "dest-add-timestamp-header": true}'

Display defined parameters.

$ sudo rabbitmqctl list_parameters -p shovel_example
Listing runtime parameters for vhost "shovel_example" ...
component       name    value
shovel  shovel-queue    {"dest-add-forward-headers":true,"dest-add-timestamp-header":true,"dest-protocol":"amqp091","dest-queue":"application-messages","dest-uri":"amqp://shovel_example:[email protected]/shovel_example","src-protocol":"amqp091","src-queue":"application-messages","src-uri":"amqp:///shovel_example"}
shovel  shovel-exchange {"dest-add-forward-headers":true,"dest-add-timestamp-header":true,"dest-exchange":"directives","dest-protocol":"amqp091","dest-uri":"amqp://shovel_example:[email protected]/shovel_example","src-exchange":"directives","src-protocol":"amqp091","src-uri":"amqp:///shovel_example"}

Inspect shovel plugin status.

$ sudo rabbitmqctl eval 'rabbit_shovel_status:status().'

[{{<<"shovel_example">>,<<"shovel-queue">>},
  dynamic,
  {running,[{src_uri,<<"amqp:///shovel_example">>},
            {src_protocol,<<"amqp091">>},
            {dest_protocol,<<"amqp091">>},
            {dest_uri,<<"amqp://raccoon/shovel_example">>},
            {src_queue,<<"application-messages">>},
            {dest_queue,<<"application-messages">>}]},
  {{2019,10,16},{23,1,54}}},
 {{<<"shovel_example">>,<<"shovel-exchange">>},
  dynamic,
  {running,[{src_uri,<<"amqp:///shovel_example">>},
            {src_protocol,<<"amqp091">>},
            {dest_protocol,<<"amqp091">>},
            {dest_uri,<<"amqp://raccoon/shovel_example">>},
            {src_exchange,<<"directives">>},
            {src_exchange_key,<<>>},
            {dest_exchange,<<"directives">>}]},
  {{2019,10,16},{23,2,3}}}]

Notice, queue will be automatically created on the downstream node, but the exchange needs to be created by you.

Publish messages and inspect queues

Publish messages to the upstream RabbitMQ node - reindeer.

$ ./rabbitmqadmin --username admin --password password --host reindeer --vhost shovel_example publish exchange=messages routing_key= payload="sample application message"
$ ./rabbitmqadmin --username admin --password password --host reindeer --vhost shovel_example publish exchange=directives routing_key= payload="sample directive"

Inspect queues on the upstream RabbitMQ node.

$ ./rabbitmqadmin --username admin --password password --host reindeer --vhost shovel_example list queues vhost name node messages
+----------------+--------------------------------+-----------------+----------+
|     vhost      |              name              |      node       | messages |
+----------------+--------------------------------+-----------------+----------+
| shovel_example | amq.gen-aeBG5G37EstKujs7rt78lQ | [email protected] | 0        |
| shovel_example | application-directives         | [email protected] | 1        |
| shovel_example | application-messages           | [email protected] | 0        |
+----------------+--------------------------------+-----------------+----------+
Notice, using exchange to shovel messages in this example will create additional queue to consume these.

Inspect queues on the downstream RabbitMQ node.

$ ./rabbitmqadmin --username admin --password password --host raccoon --vhost shovel_example list queues vhost name node messages
+----------------+------------------------+----------------+----------+
|     vhost      |          name          |      node      | messages |
+----------------+------------------------+----------------+----------+
| shovel_example | application-directives | [email protected] | 1        |
| shovel_example | application-messages   | [email protected] | 1        |
+----------------+------------------------+----------------+----------+

Get sample message from queue and re-queue it.

$ ./rabbitmqadmin --username admin --password password --host raccoon --vhost shovel_example get --depth 3 queue=application-messages
+----------------------+----------+---------------+----------------------------+---------------+------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------+-------------+
|     routing_key      | exchange | message_count |          payload           | payload_bytes | payload_encoding |                                                                                                                                properties.headers.x-shovelled                                                                                                                                 | properties.headers.x-shovelled-timestamp | redelivered |
+----------------------+----------+---------------+----------------------------+---------------+------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------+-------------+
| application-messages |          | 0             | sample application message | 26            | string           | [{"dest-queue": "application-messages", "dest-uri": "amqp://raccoon/shovel_example", "shovel-vhost": "shovel_example", "src-queue": "application-messages", "src-uri": "amqp:///shovel_example", "shovel-type": "dynamic", "shovelled-by": "[email protected]", "shovel-name": "shovel-queue"}] | 1571267120                               | True        |
+----------------------+----------+---------------+----------------------------+---------------+------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------+-------------+

Configuration

$ sudo apt install jq

Export upstream configuration.

$ ./rabbitmqadmin --username admin --password password --host reindeer export config-reindeer.json
$ cat config-reindeer.json | jq .
{
  "rabbit_version": "3.8.0",
  "users": [
    {
      "name": "admin",
      "password_hash": "vL/uKvz+7UTFCeJFIySdMZ23AkqsWv+PTRPcT9flgwGNW3p7",
      "hashing_algorithm": "rabbit_password_hashing_sha256",
      "tags": "administrator"
    }
  ],
  "vhosts": [
    {
      "name": "/"
    },
    {
      "name": "shovel_example"
    }
  ],
  "permissions": [
    {
      "user": "admin",
      "vhost": "/",
      "configure": ".*",
      "write": ".*",
      "read": ".*"
    },
    {
      "user": "admin",
      "vhost": "shovel_example",
      "configure": ".*",
      "write": ".*",
      "read": ".*"
    }
  ],
  "topic_permissions": [],
  "parameters": [
    {
      "value": {
        "dest-add-forward-headers": true,
        "dest-add-timestamp-header": true,
        "dest-protocol": "amqp091",
        "dest-queue": "application-messages",
        "dest-uri": "amqp://shovel_example:[email protected]/shovel_example",
        "src-protocol": "amqp091",
        "src-queue": "application-messages",
        "src-uri": "amqp:///shovel_example"
      },
      "vhost": "shovel_example",
      "component": "shovel",
      "name": "shovel-queue"
    },
    {
      "value": {
        "dest-add-forward-headers": true,
        "dest-add-timestamp-header": true,
        "dest-exchange": "directives",
        "dest-protocol": "amqp091",
        "dest-uri": "amqp://shovel_example:[email protected]/shovel_example",
        "src-exchange": "directives",
        "src-protocol": "amqp091",
        "src-uri": "amqp:///shovel_example"
      },
      "vhost": "shovel_example",
     "component": "shovel",
      "name": "shovel-exchange"
    }
  ],
  "global_parameters": [
    {
      "name": "cluster_name",
      "value": "[email protected]"
    }
  ],
  "policies": [],
  "queues": [
    {
      "name": "application-directives",
      "vhost": "shovel_example",
      "durable": true,
      "auto_delete": false,
      "arguments": {}
    },
    {
      "name": "application-messages",
      "vhost": "shovel_example",
      "durable": true,
      "auto_delete": false,
      "arguments": {}
    }
  ],
  "exchanges": [
    {
      "name": "directives",
      "vhost": "shovel_example",
      "type": "fanout",
      "durable": true,
      "auto_delete": false,
      "internal": false,
      "arguments": {}
    },
    {
      "name": "messages",
      "vhost": "shovel_example",
      "type": "fanout",
      "durable": true,
      "auto_delete": false,
      "internal": false,
      "arguments": {}
    }
  ],
  "bindings": [
    {
      "source": "directives",
      "vhost": "shovel_example",
      "destination": "application-directives",
      "destination_type": "queue",
      "routing_key": "",
      "arguments": {}
    },
    {
      "source": "messages",
      "vhost": "shovel_example",
      "destination": "application-messages",
      "destination_type": "queue",
      "routing_key": "",
      "arguments": {}
    }
  ]
}

Export downstream configuration.

$ ./rabbitmqadmin --username admin --password password --host raccoon export config-raccoon.json
$ cat config-raccoon.json | jq .
{
  "rabbit_version": "3.8.0",
  "users": [
    {
      "name": "admin",
      "password_hash": "tkqVY3vb0/TWTkDpPktU/RdmQha64s/++wCwna7M3RY0sqc8",
      "hashing_algorithm": "rabbit_password_hashing_sha256",
      "tags": "administrator"
    },
    {
      "name": "shovel_example",
      "password_hash": "tZqEfBO21CtElO14w3zabxVLIKo1NeLPBpdnsp5dJss4wqkN",
      "hashing_algorithm": "rabbit_password_hashing_sha256",
      "tags": ""
    }
  ],
  "vhosts": [
    {
      "name": "/"
    },
    {
      "name": "shovel_example"
    }
  ],
  "permissions": [
    {
      "user": "admin",
      "vhost": "/",
      "configure": ".*",
      "write": ".*",
      "read": ".*"
    },
    {
      "user": "admin",
      "vhost": "shovel_example",
      "configure": ".*",
      "write": ".*",
      "read": ".*"
    },
    {
      "user": "shovel_example",
      "vhost": "shovel_example",
      "configure": ".*",
      "write": ".*",
      "read": ".*"
    }
  ],
  "topic_permissions": [],
  "parameters": [],
  "global_parameters": [
    {
      "name": "cluster_name",
      "value": "[email protected]"
    }
  ],
  "policies": [],
  "queues": [
    {
      "name": "application-directives",
      "vhost": "shovel_example",
      "durable": true,
      "auto_delete": false,
      "arguments": {}
    },
    {
      "name": "application-messages",
      "vhost": "shovel_example",
      "durable": true,
      "auto_delete": false,
      "arguments": {}
    }
  ],
  "exchanges": [
    {
      "name": "directives",
      "vhost": "shovel_example",
      "type": "fanout",
      "durable": true,
      "auto_delete": false,
      "internal": false,
      "arguments": {}
    }
  ],
  "bindings": [
    {
      "source": "directives",
      "vhost": "shovel_example",
      "destination": "application-directives",
      "destination_type": "queue",
      "routing_key": "",
      "arguments": {}
    }
  ]
}

Additional notes

Use the following command to clear specific parameter.

                                                                                                      
$ sudo rabbitmqctl clear_parameter --vhost shovel_example shovel shovel-queue
Clearing runtime parameter "shovel-queue" for component "shovel" on vhost "shovel_example" ...

Enable rabbitmq_shovel_management to use to define parameters and inspect plugin status using web-gui.

$ sudo rabbitmq-plugins enable rabbitmq_shovel_management
Enabling plugins on node [email protected]:
rabbitmq_shovel_management
The following plugins have been configured:
  rabbitmq_management
  rabbitmq_management_agent
  rabbitmq_shovel
  rabbitmq_shovel_management
  rabbitmq_web_dispatch
Applying plugin configuration to [email protected]
The following plugins have been enabled:
  rabbitmq_shovel_management

started 1 plugins.

References

Shovel Plugin