How to distribute messages to different RabbitMQ node

Distribute messages to different RabbitMQ node using federation plugin.

Preliminary information

I will distribute messages from RabbitMQ node reindeer to raccoon.

Install RabbitMQ server on every node

Install RabbitMQ message broker on every node.

$ sudo apt update
$ sudo apt install gnupg2 apt-transport-https curl

$ curl https://dl.bintray.com/rabbitmq/Keys/rabbitmq-release-signing-key.asc | sudo apt-key add -

$ echo "deb https://dl.bintray.com/rabbitmq/debian stretch main"              | sudo tee    /etc/apt/sources.list.d/rabbitmq.list
$ echo "deb https://dl.bintray.com/rabbitmq-erlang/debian buster erlang-22.x" | sudo tee -a /etc/apt/sources.list.d/rabbitmq.list 

$ apt update
$ apt install rabbitmq-server

Use rabbitmq-diagnostics to display RabbitMQ version.

$ sudo rabbitmq-diagnostics server_version
Asking node [email protected] for its RabbitMQ version...
3.8.0

Enable management plugin.

$ sudo rabbitmq-plugins enable rabbitmq_management

Define admin user with password password.

$ sudo rabbitmqctl add_user admin password
$ sudo rabbitmqctl set_user_tags admin administrator
$ sudo rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"

Delete guest user.

$ sudo rabbitmqctl delete_user guest

Get rabbitmqadmin utility.

$ curl -o rabbitmqadmin http://127.0.0.1:15672/cli/rabbitmqadmin 

Ensure that executable bit is set.

$ chmod +x rabbitmqadmin 

Setup the dedicated vhost and user

Setup dedicated vhost and user on both nodes.

Create federation_example virtualhost and ensure that admin can manage it.

$ sudo rabbitmqctl add_vhost federation_example
$ sudo rabbitmqctl set_permissions -p federation_example admin ".*" ".*" ".*" 

Declare federation_example user with password password, grant permissions to federation_example virtual host.

$ sudo rabbitmqctl add_user federation_example password
$ sudo rabbitmqctl set_permissions federation_example --vhost federation_example ".*" ".*" ".*"
Notice, federation_example is not a management user.

Setup exchanges

Setup exchanges, queues and bindings on both nodes.

Declare exchange.

$ ./rabbitmqadmin --username admin --password password --vhost federation_example declare exchange name=directives type=fanout

Declare queue.

$ ./rabbitmqadmin --username admin --password password --vhost federation_example declare queue name=application-directives

Declare bindings.

$ ./rabbitmqadmin --username admin --password password --vhost federation_example declare binding source=directives destination=application-directives

Setup federation plugin on the downstream RabbitMQ node

Setup federation plugin on the downstream RabbitMQ node - raccoon.

Enable federation plugin.

$ sudo rabbitmq-plugins enable rabbitmq_federation

Define upstream RabbitMQ server.

$ sudo rabbitmqctl set_parameter federation-upstream reindeer-upstream --vhost federation_example '{"uri":"amqp://federation_example:[email protected]","prefetch-count": 1}'

Define upstream set.

$ sudo rabbitmqctl set_parameter  --vhost federation_example federation-upstream-set reindeer '[{"upstream":"reindeer-upstream"}]'

Define federation policy.

$ sudo rabbitmqctl set_policy federate-directives --vhost federation_example application-directives '{"federation-upstream-set":"reindeer"}' --priority 1 --apply-to queues 

Display defined parameters.

$ sudo rabbitmqctl list_parameters -p federation_example 
Listing runtime parameters for vhost "federation_example" ...
component       name    value
federation-upstream     reindeer-upstream       {"prefetch-count":1,"uri":"amqp://federation_example:[email protected]"}
federation-upstream-set reindeer        [{"upstream":"reindeer-upstream"}]

Declare queue on downstream server.

$ ./rabbitmqadmin --username admin --password password --vhost federation_example declare queue name=application-directives    

Check federation status.

$ sudo rabbitmqctl federation_status
Listing federation links on node [email protected]
[#{consumer_tag => <<"federation-link-reindeer-upstream">>,error => <<>>,
   exchange => <<>>,id => <<"c57e258c">>,
   last_changed => <<"2019-10-24 22:12:37">>,
   local_connection => <<"<[email protected]>">>,
   queue => <<"application-directives">>,status => running,type => queue,
   upstream => <<"reindeer-upstream">>,upstream_exchange => <<>>,
   upstream_queue => <<"application-directives">>,
   uri => <<"amqp://reindeer">>,vhost => <<"federation_example">>}]

Create consumer

Use Python to create simple consumer.

#!/usr/bin/env python3
# Consume messages from RabbitMQ server

# import parser for command-line options
import argparse
# import a pure-Python implementation of the AMQP 0-9-1
import pika
# import time
import time

# define callback
def consumer_callback(channel, method, properties, body):
  if type(properties.headers) is dict and "x-received-from" in properties.headers:
    print("Received message %r from %r" % (body.decode(), properties.headers["x-received-from"][0]["uri"]))
  else:
    print("Received message %r " % body.decode())
  # sleep for 1 second to visualize things
  time.sleep(1)
  # ack this message
  channel.basic_ack(delivery_tag=method.delivery_tag)

# define and parse command-line options
parser = argparse.ArgumentParser(description='Check connection to RabbitMQ server')
parser.add_argument('--server', required=True, help='Define RabbitMQ server')
parser.add_argument('--virtual_host', default='/', help='Define virtual host')
parser.add_argument('--port', type=int, default=5672, help='Define port (default: %(default)s)')
parser.add_argument('--username', default='guest', help='Define username (default: %(default)s)')
parser.add_argument('--password', default='guest', help='Define password (default: %(default)s)')
args = vars(parser.parse_args())

# set amqp credentials
credentials = pika.PlainCredentials(args['username'], args['password'])

# set amqp connection parameters
parameters = pika.ConnectionParameters(host=args['server'], port=args['port'], virtual_host=args['virtual_host'], credentials=credentials)

# try to establish connection and consume messages
try:
  connection = pika.BlockingConnection(parameters)
  channel = connection.channel()
  channel.basic_qos(prefetch_count=1)
  channel.basic_consume(queue='application-directives', on_message_callback=consumer_callback)
  channel.start_consuming()
  connection.close()
except KeyboardInterrupt:
  channel.stop_consuming()
connection.close()

Publish messages and inspect queues

Start consumer that will connect to the upstream node.

$ python3 client.py --server reindeer --virtual_host federation_example --username federation_example --password password

Start consumer that will connect to the downstream node.

$ python3 client.py --server raccoon --virtual_host federation_example --username federation_example --password password

Publish fifty messages on the upstream node.

$ for i in $(seq 1 50); do ./rabbitmqadmin --username admin --password password --host reindeer --vhost federation_example publish exchange=directives routing_key= payload="Sample application directive $i"; done

Messages consumed from the upstream node.

Received message 'Sample application directive 1'
Received message 'Sample application directive 4'
Received message 'Sample application directive 7'
Received message 'Sample application directive 8'
Received message 'Sample application directive 11'
Received message 'Sample application directive 12'
Received message 'Sample application directive 15'
Received message 'Sample application directive 16'
Received message 'Sample application directive 19'
Received message 'Sample application directive 20'
Received message 'Sample application directive 23'
Received message 'Sample application directive 24'
Received message 'Sample application directive 27'
Received message 'Sample application directive 28'
Received message 'Sample application directive 31'
Received message 'Sample application directive 32'
Received message 'Sample application directive 35'
Received message 'Sample application directive 36'
Received message 'Sample application directive 38'
Received message 'Sample application directive 41'
Received message 'Sample application directive 42'
Received message 'Sample application directive 45'
Received message 'Sample application directive 46'
Received message 'Sample application directive 49'
Received message 'Sample application directive 50'

Messages consumed from the downstream node.

Received message 'Sample application directive 2' from 'amqp://reindeer'
Received message 'Sample application directive 3' from 'amqp://reindeer'
Received message 'Sample application directive 5' from 'amqp://reindeer'
Received message 'Sample application directive 6' from 'amqp://reindeer'
Received message 'Sample application directive 9' from 'amqp://reindeer'
Received message 'Sample application directive 10' from 'amqp://reindeer'
Received message 'Sample application directive 13' from 'amqp://reindeer'
Received message 'Sample application directive 14' from 'amqp://reindeer'
Received message 'Sample application directive 17' from 'amqp://reindeer'
Received message 'Sample application directive 18' from 'amqp://reindeer'
Received message 'Sample application directive 21' from 'amqp://reindeer'
Received message 'Sample application directive 22' from 'amqp://reindeer'
Received message 'Sample application directive 25' from 'amqp://reindeer'
Received message 'Sample application directive 26' from 'amqp://reindeer'
Received message 'Sample application directive 29' from 'amqp://reindeer'
Received message 'Sample application directive 30' from 'amqp://reindeer'
Received message 'Sample application directive 33' from 'amqp://reindeer'
Received message 'Sample application directive 34' from 'amqp://reindeer'
Received message 'Sample application directive 37' from 'amqp://reindeer'
Received message 'Sample application directive 39' from 'amqp://reindeer'
Received message 'Sample application directive 40' from 'amqp://reindeer'
Received message 'Sample application directive 43' from 'amqp://reindeer'
Received message 'Sample application directive 44' from 'amqp://reindeer'
Received message 'Sample application directive 47' from 'amqp://reindeer'
Received message 'Sample application directive 48' from 'amqp://reindeer'

Configuration

$ sudo apt install jq

Export upstream configuration.

$ ./rabbitmqadmin --username admin --password password --host reindeer export config-reindeer.json
$ cat config-reindeer.json | jq .
{
  "rabbit_version": "3.8.0",
  "users": [
    {
      "name": "admin",
      "password_hash": "Nsl1tZOOHNq2jT3y6vSjTLKW1ttuhxQxoUtRzPaPFkTH+13P",
      "hashing_algorithm": "rabbit_password_hashing_sha256",
      "tags": "administrator"
    },
    {
      "name": "federation_example",
      "password_hash": "DVLe5INRSZIKNAa0Ap48UqjEZjjhW0TrcpBIdxv+mFis9+WO",
      "hashing_algorithm": "rabbit_password_hashing_sha256",
      "tags": ""
    }
  ],
  "vhosts": [
    {
      "name": "/"
    },
    {
      "name": "federation_example"
    }
  ],
  "permissions": [
    {
      "user": "federation_example",
      "vhost": "federation_example",
      "configure": ".*",
      "write": ".*",
      "read": ".*"
    },
    {
      "user": "admin",
      "vhost": "/",
      "configure": ".*",
      "write": ".*",
      "read": ".*"
    },
    {
      "user": "admin",
      "vhost": "federation_example",
      "configure": ".*",
      "write": ".*",
      "read": ".*"
    }
  ],
  "topic_permissions": [],
  "parameters": [],
  "global_parameters": [
    {
      "name": "cluster_name",
      "value": "[email protected]"
    }
  ],
  "policies": [],
  "queues": [
    {
      "name": "application-directives",
      "vhost": "federation_example",
      "durable": true,
      "auto_delete": false,
      "arguments": {}
    }
  ],
  "exchanges": [
    {
      "name": "directives",
      "vhost": "federation_example",
      "type": "fanout",
      "durable": true,
      "auto_delete": false,
      "internal": false,
      "arguments": {}
    }
  ],
  "bindings": [
    {
      "source": "directives",
      "vhost": "federation_example",
      "destination": "application-directives",
      "destination_type": "queue",
      "routing_key": "",
      "arguments": {}
    }
  ]
}
$ ./rabbitmqadmin --username admin --password password --host raccoon export config-raccoon.json
$ cat config-raccoon.json | jq .
{
  "rabbit_version": "3.8.0",
  "users": [
    {
      "name": "admin",
      "password_hash": "tmFTX7O65LjfY9zouR1cZXkMBSBFDD7+6MH6vfMSH9jwx9QR",
      "hashing_algorithm": "rabbit_password_hashing_sha256",
      "tags": "administrator"
    },
    {
      "name": "federation_example",
      "password_hash": "58v85kIDTU5e6voT6OLceCp82tvrMIztTgvky7mQpEXuj9ao",
      "hashing_algorithm": "rabbit_password_hashing_sha256",
      "tags": ""
    }
  ],
  "vhosts": [
    {
      "name": "/"
    },
    {
      "name": "federation_example"
    }
  ],
  "permissions": [
    {
      "user": "federation_example",
      "vhost": "federation_example",
      "configure": ".*",
      "write": ".*",
      "read": ".*"
    },
    {
      "user": "admin",
      "vhost": "/",
      "configure": ".*",
      "write": ".*",
      "read": ".*"
    },
    {
      "user": "admin",
      "vhost": "federation_example",
      "configure": ".*",
      "write": ".*",
      "read": ".*"
    }
  ],
  "topic_permissions": [],
  "parameters": [
    {
      "value": {
        "prefetch-count": 1,
        "uri": "amqp://federation_example:[email protected]"
      },
      "vhost": "federation_example",
      "component": "federation-upstream",
      "name": "reindeer-upstream"
    },
    {
      "value": [
        {
          "upstream": "reindeer-upstream"
        }
      ],
      "vhost": "federation_example",
      "component": "federation-upstream-set",
      "name": "reindeer"
    }
  ],
  "global_parameters": [
    {
      "name": "cluster_name",
      "value": "[email protected]"
    }
  ],
  "policies": [
    {
      "vhost": "federation_example",
      "name": "federate-directives",
      "pattern": "application-directives",
      "apply-to": "queues",
      "definition": {
        "federation-upstream-set": "reindeer"
      },
      "priority": 1
    }
  ],
  "queues": [
    {
      "name": "application-directives",
      "vhost": "federation_example",
      "durable": true,
      "auto_delete": false,
      "arguments": {}
    }
  ],
  "exchanges": [],
  "bindings": []
}

Additional notes

Enable rabbitmq_federation_management to define parameters and inspect plugin status using web-gui.

$ sudo rabbitmq-plugins enable rabbitmq_federation_management
Enabling plugins on node [email protected]:
rabbitmq_federation_management
The following plugins have been configured:
  rabbitmq_federation
  rabbitmq_federation_management
  rabbitmq_management
  rabbitmq_management_agent
  rabbitmq_web_dispatch
Applying plugin configuration to [email protected]
The following plugins have been enabled:
  rabbitmq_federation_management

started 1 plugins.

References

Federation Plugin