Advanced use of OpenAMQ

Configuration, Management and Tuning

Server configuration

Built-in help

The amq_server command provides built-in help on all command-line options and configuration settings. To get a summary of options, type this command:

amq_server -h

amq_server --help | more

These are the basic command options:

-w directory     Working directory for server (current)
-s filename      Load custom settings from file (amq_server.cfg)
-X comment       Comment, has no effect
-q               Quiet mode: no messages (no)
-b               Run as background server process (no)
-f               Run as foreground console process (yes)
-i               Show program statistics when ending (no)
-v               Show version information
-h               Show summary of command-line options
--help           Show detailed configuration help

Main command-line options

You can also set configuration options directly from the command line. These are the most commonly used configuration options and their command-line syntax:

--port 5672         Server port for clients
--listen *          Address (local network interface) to listen on
--monitor 0         Monitor interval, seconds
--dump_state 60     Dump state interval, seconds
--debug_route 0     Debug message routing?
--debug_queue 0     Debug queue activity?
--debug_peering 0   Debug peering messages?
--heartbeat 2       Heartbeat timer, seconds
/config/server/port
Specifies the port on which the server should open its connections. Default value is '5672'
/config/server/listen
Specifies the network interface on which the server should listen for connections. By default this is *, meaning all interfaces. You would choose to set an address when you use OpenAMQ on a server with multiple network interfaces, e.g. routing between two networks. Default value is '*'
/config/resources/monitor
Specifies the interval in seconds at which the server will report its message rates. If zero, no monitoring is shown. The recommended value is 1, if monitoring is desired. Default value is 0.
/config/resources/dump_state
Specifies the interval at which the server will report its state. This shows the number of messages, queues, consumers, etc. used by the server. If zero, no state is logged. Default value is 60.
/config/logging/debug_route
Specifies whether exchange routing is logged or not. Set this option when you are debugging a message routing design. For production use, we recommend you do not set this option. Default value is 0.
/config/logging/debug_queue
Specifies whether queue dispatching is logged or not. Set this option when you are debugging message processing in the server. For production use, we recommend you do not set this option. Default value is 0.
/config/logging/debug_peering
Specifies whether peering activity is logged or not. Set this option if you need to debug exchange federation and failover. For production use, we recommend you do not set this option. Default value is 0.
/config/tuning/heartbeat
Defines the timeout for connection heartbeating. Default value is 2.

Creating a configuration file

OpenAMQ lets you set options in several ways:

  1. On the command-line, which has an immediate effect on that instance of the server.
  2. In the default configuration file, 'amq_server.cfg', which has an effect on all instances of the server started after that file is edited.
  3. In a per-server configuration file, specified using the -s option when you run amq_server.

You can also edit amq_server_base.cfg but this is bad practice, since new versions of that file are installed with each release, and you would thus lose configuration settings after an update.

In general we recommend that you use the command line to test desired configuration options and when you are satisfied with them, place them into a configuration file.

OpenAMQ configuration files use an XML syntax, consisting of sections that are easy to understand and edit. When you run "amq_server —help" it explains for each option how to add it to a config file.

For example:

/config/server/port - Server port for clients
    --port newvalue
    Specifies the port on which the server should open its connections.
    Current value is '5672'. Default value is '5672'

Is saved as:

amq_server.cfg:
------------------------------------------------------------------------
<?xml?>
<config>
    <server port = "5672" />
</config>

You must merge sections together, e.g.:

amq_server.cfg:
------------------------------------------------------------------------
<?xml?>
<config>
    <server
        port = "5672"
        listen = "*"
        queue_timeout = "0"
        vhost = "/"
    />
    <resources
        monitor = "0"
        dump_state = "60"
    />
    <logging
        debug_route = "0"
        debug_queue = "0"
        debug_peering = "0"
    />
</config>

Configuration file path

OpenAMQ will search for configuration files in:

  1. The current directory (the server working directory if you use the -w option).
  2. Each directory on the PATH environment variable.

It will first search for and load (if found) the amq_server_base.cfg file. It will then search for and load (if found) the amq_server.cfg file. Finally it will override any setting taken from these files with the options specified on the command line.

Debugging complex configuration files

You can debug configuration files if you become unsure which one(s) are being used by a server process:

  • Add <echo>Some message</echo> into the file inside the <config> item. * Start the server and see what messages are echoed.

For example:

amq_server.cfg:
------------------------------------------------------------------------
<?xml?>
<config>
    <echo>Config file for cluster testing</echo>
</config>

Then, running the server:

> amq_server
...
2008-05-02 16:58:25: I: amq_server.cfg: Config file for cluster testing
...

Logging subsystem

General description

amq_server holds its logs in /var/log/openamq. To put them somewhere else, set the 'log_path' configuration option, or use the —log_path command-line option:

<!-- Keep logs in ./logs directory -->
<logging
    log_path = "./logs"
/>

The OpenAMQ server keeps three levels of logs in the /var/log/openamq directory:

  1. Alert logs, which contain all errors and alerts.
  2. Daily logs, which contain normal activity data, as well as all the contents of the alert logs.
  3. Debug logs, which contain debugging and tracing output as requested by runtime or configuration options, as well as all the contents of the daily logs.

Each server process opens three log files, which are named thus:

alert_[portnumber].log
daily_[portnumber].log
debug_[portnumber].log

The log files are cycled when the server restarts, or at midnight. The cycle process does the following:

  1. It closes the current log files (if the server is still running).
  2. It optionally executes a user-configurable archiving command.
  3. It reopens new log files for the application.

To disable logging

Use the —keep_logs 0 command-line option to disable logging, or this fragment in amq_server.cfg:

<logging
    keep_logs = "0"
/>

Logged data

The OpenAMQ log files are text, intended for human readability rather than formalised scanning.

A user or script never needs to scan multiple log files from one server since they are hierarchical: thus the debug logs contain all logged data.

This is an example the logs produced by a short server run.

The alert log:

2006-05-14 18:30:04: I: amq_server binding to 192.168.55.64:5672
2006-05-14 18:30:04: I: amq_server binding to 192.168.55.107:5672
2006-05-14 18:30:04: I: amq_server binding to 127.0.0.1:5672
2006-05-14 18:30:05: I: server ready for incoming AMQ connections
2006-05-14 18:30:10: I: cnn=1 msg=2 mem=2K/10439K exc=7 que=1 csm=1 bnd=2

The daily log:

2006-05-14 18:30:04: I: starting virtual host '/'
2006-05-14 18:30:04: I: amq_server binding to 192.168.55.64:5672
2006-05-14 18:30:04: I: amq_server binding to 192.168.55.107:5672
2006-05-14 18:30:04: I: amq_server binding to 127.0.0.1:5672
2006-05-14 18:30:05: I: server ready for incoming AMQ connections
2006-05-14 18:30:07: I: start login from=127.0.0.1:40441 -
                        product=OpenAMQ Kernel Client version=1.4d1
2006-05-14 18:30:07: I: valid login from=127.0.0.1:40441 user=console -
                        group=console
2006-05-14 18:30:10: I: cnn=1 msg=2 mem=2K/10439K exc=7 que=1 csm=1 bnd=2
2006-05-14 18:30:12: I: start login from=127.0.0.1:40442 -
                        product=OpenAMQ Kernel Client version=1.4d1

The debug log:

2006-05-14 18:30:04: ###########  Process Environment Variables  ###########
2006-05-14 18:30:04: KDE_MULTIHEAD=false
2006-05-14 18:30:04: SSH_AGENT_PID=1821
2006-05-14 18:30:04: TERM=vt220
2006-05-14 18:30:04: ...
2006-05-14 18:30:04: ##############  Configuration Settings  ###############
2006-05-14 18:30:04: port=5672
2006-05-14 18:30:04: background=0
2006-05-14 18:30:04: queue_timeout=0
2006-05-14 18:30:04: max_memory_mb=512
2006-05-14 18:30:04: per_client=0
2006-05-14 18:30:04: ...
2006-05-14 18:30:07: I: start login from=127.0.0.1:40441 -
                     product=OpenAMQ Kernel Client version=1.4d1
2006-05-14 18:30:07: I: valid login from=127.0.0.1:40441 user=console -
                     group=console
2006-05-14 18:30:07: X: bind     $default$: queue=#0
2006-05-14 18:30:07: X: compile  $default$: routing_key=#0
2006-05-14 18:30:07: X: bind     amq.direct: queue=#0
2006-05-14 18:30:07: X: compile  amq.direct: routing_key=#0
2006-05-14 18:30:07: X: publish  amq.system: routing_key=amq.console
2006-05-14 18:30:07: X: publish  amq.direct: routing_key=#0
2006-05-14 18:30:07: X: route    amq.direct: routing_key=#0
2006-05-14 18:30:07: X: deliver  queue=#0
2006-05-14 18:30:07: X: publish  amq.system: routing_key=amq.console
2006-05-14 18:30:07: X: publish  amq.direct: routing_key=#0
2006-05-14 18:30:07: X: route    amq.direct: routing_key=#0
2006-05-14 18:30:07: X: deliver  queue=#0
2006-05-14 18:30:07: X: publish  amq.system: routing_key=amq.console
2006-05-14 18:30:07: X: publish  amq.direct: routing_key=#0
2006-05-14 18:30:07: X: route    amq.direct: routing_key=#0
2006-05-14 18:30:07: X: deliver  queue=#0
2006-05-14 18:30:08: I: incoming rate=10 mean=10 peak=10
2006-05-14 18:30:08: I: outgoing rate=5 mean=5 peak=5 iomean=15
2006-05-14 18:30:12: I: start login from=127.0.0.1:40442 -
                     product=OpenAMQ Kernel Client version=1.4d1

Custom log file names

You can override the names of the log files using these command-line options:

--alert_log alert.log          Error log file name
--daily_log daily.log          Daily log file name
--debug_log debug.log          Debug log file name

You can also specify these in the amq_server.cfg configuration file.

Custom log file cycling

The built-in cycling mechanism renames old log files using the current date and time.

You can customise the cycling mechanism by specifying your own cycling command, which is a shell command that amq_server will execute on each log file. The log file name is passed to this command as its first and only argument:

--archive_cmd value            Archive log file command

You can also specify this in the amq_server.cfg configuration file.

Server tracing options

You can set various server debug and trace levels using these command-line options:

--debug_route 0                Debug message routing?
--debug_queue 0                Debug queue activity?
--debug_cluster 0              Debug cluster messages?
--debug_console 0              Debug console I/O?
--trace 0                      Protocol trace level

You can also specify these in the amq_server.cfg configuration file.

Log file format

Logged data always shows the date and time, then a single letter to indicate the type or severity of the message. E is an error, W is a warning, I indicates an information message, and other letters are used to trace different types of activity.

Syslog support

OpenAMQ supports syslog on systems where it is available (Unix and Linux, but not Windows). If you use syslog, the normal text log files are not produced, and amq_server instead sends its logging data to the syslog LOG_DAEMON facility. The priority of the logged data is 'warning' for the error log, 'info' for the daily log, and 'debug' for the debug log.

Monitoring the server

How to monitor an OpenAMQ server

There are several ways to monitor a running OpenAMQ server:

  1. Using the operating systems' process monitoring tools (like 'top').
  2. Using the server's own monitoring output (like '—dump_state 5').
  3. Using the OpenAMQ console shell ('amq_shell'), described below.

When using the operating system monitoring tools, you will want to look mainly at the server's CPU and memory consumption.

The simplest way is to run the server using the —dump_state option. The following example asks for output every five seconds:

amq_server --dump_state 5

You can redirect the output to a dump file, and monitor the dump file using, on a Unix, Linux or Mac OS/X system:

tail -f name-of-dump-file

Console shell

The amq_shell provides a command-line administration tool for OpenAMQ. You can use this tool by hand, or automatically in shell scripts. Normal OpenAMQ users can view information; super users can also change the server's state, e.g. killing blocked connections or over-full queues.

To run the shell, do this:

amq_shell -u username -p password

These are the command-line options:

-s hostname      Server hostname and :port (localhost)

If the amq_server is running on a different system and/or non-standard port, use the -s option to specify the correct servername:port.

-V virtualhost   Specify cluster virtual host

You need this when working with servers that are in a cluster configuration. The cluster virtual host you specify must match that specified in the cluster configuration.

-u user          User name for console access (guest)

Specify the user name for the connection.

-p password      Password for console access (guest)

Specify the password for the connection.

-e "commands"    Run shell commands, delimited by ;

Specify a list of commands to run, which can be any commands that you may type when in the amq_shell prompt (see below).

-x filename      Save all status data as XML

Saves all printed data as an XML file, useful if you want to re-process the data mechanically afterwards.

-t level         Set trace level (default = 0)

Used to debug the communications between the Console and the OpenAMQ server.

-b               Show server status and then exit

Show a summary of the server status, without entering the prompt.

-r               Report all active local servers

Scan the current system for all OpenAMQ servers running on ports 4096-8192. Will not attempt to look for servers above or below that range.

-q               Show all server queues and exit

Show a summary list of all the server's queues, without entering the prompt.

-c               Show all server connections and exit

Show a summary list of all the server's connections, without entering the prompt.

-d               Show date and time in shell output

Add the date and time to all printed messages.

The console shell prompt

When the Console connects successfully to the default or specified OpenAMQ server it will display a prompt so that you can enter commands:

amq_shell/1.2d0 - Management Console for OpenAMQ Brokers
Copyright (c) 2008 iMatix Corporation
Connected to OpenAMQ Server/1.2d0 on 62.176.172.196:5672
 server = "OpenAMQ 1.2d0"
 Date, time server started ............. 2007-03-15T12:55+01:00
 Broker is locked? ..................... no
 Memory used for all data .............. 10604K
 Memory used for messages .............. 1K
 Number of queued messages ............. 2
 Number of queue consumers ............. 1
 Number of queue bindings .............. 2
 Number of message exchanges ........... 8 [ls exchange]
 Number of shared queues ............... 0 [ls queue]
 Number of connections ................. 1 [ls connection]
 [shutdown] [lock]
/62.176.172.196:5672>

Note that:

  • The available actions are listed in square brackets. For example when you are looking at a server these are the available actions:
[shutdown] [lock]
  • Type 'help' at any prompt to get explanations. These commands are available at all times:
Command            Has this effect
-------            -------------------
ls | dir           Show server and all children
nnn                Look at item [nnn] (nnn is a number)
?text              Look at item matching text
/                  Return to server item
.                  Refresh current item
..                 Move back to previous item
set name value     Set object property
help               Show this text
exit | quit        Leave the OpenAMQ shell

Automated restarts

In some scenarios, you may want to stop and restart a server daily. Here is a simple way to do this:

  • Run the server inside a command shell script that, when the server ends, logs the error, waits a short time (5 seconds) and then loops to restart the server. An example (using Unix bash shell language):
while true; do
    amq_server --whatever-options you need >> somelogfile
    sleep 5
done
  • On the same, or a different server, run a daemon script that uses the amq_shell to remotely stop the server. An example:
amq_shell -e "shutdown" -u super -p topsecret

Tuning OpenAMQ

General principles

OpenAMQ provides a wide set of tuning options. Before you start tuning your server, please note that:

  • The default installation is already tuned for general performance. While we encourage you to experiment to understand how your OpenAMQ server behaves, tuning is not an essential part of normal OpenAMQ usage.
  • When tuning, make sure you have a good test platform so that you can measure the impact of each choice. It is quite easy to make the performance of a server worse by making the wrong kind of tuning.
  • Test each option independently on the command-line before adding it to a configuration file.

Process tuning

These are the options that affect the server process, each can be specified on the command line or in a configuration file:

/config/tuning/polling_threads
On multithreaded builds, defines the number of OS threads dedicated to socket polling. Default value is 4.
/config/tuning/working_threads
On multithreaded builds, defines the number of OS threads dedicated to processing, that is, tasks other than socket polling. Default value is 4.

Performance tuning

These are the options that can directly affect performance:

/config/tuning/heartbeat
Defines the timeout for connection heartbeating. Default value is 2. This option can be changed at runtime.
/config/tuning/tcp_nodelay
If this value is 1, socket data is written immediately, which is usually good for latency. If this value is 0, data is buffered until there is a full packet, which is usually good for throughput. Default value is 1. This option can be changed at runtime.
/config/tuning/tcp_rcvbuf
If this value is greater than zero, all client connections will use the specified value. Note: setting this value is delicate, do not use this option unless you know what you are doing. Default value is 0. This option can be changed at runtime.
/config/tuning/tcp_sndbuf
If this value is greater than zero, all client connections will use the specified value. Note: setting this value is delicate, do not use this option unless you know what you are doing. Default value is 0. This option can be changed at runtime.
/config/tuning/direct
If this value is 1, all client connections will use Direct Mode for message transfer, unless they explicitly override it. Direct Mode is explained below.

Note: we recommend that rather than tuning the tcp_rcvbuf and tcp_sndbuf options at the application level, you should rely on the default values of "0" and [tune your operating system's TCP stack appropriately.

Tuning queue limits

You can tune the number of messages that queues will accept, and how queues respond when they become 'full'. This is useful to ensure that your server does not run out of virtual memory when you have fast publishers and slow clients, and a very high rate of data.

OpenAMQ provides a mechanism called "queue profiles" to let you control the limits on a per-queue basis. By default (in amq_server_base.cfg), we define two queue profiles as follows:

<queue_profile name = "private">
    <limit name = "warn" value = "10000" />
    <limit name = "trim" value = "40000" />
</queue_profile>

<queue_profile name = "shared">
    <limit name = "warn" value = "1000" />
    <limit name = "kill" value = "5000" />
</queue_profile>

These profiles define the behaviour of private and shared queues respectively. In each profile we can define up to 10 limits, which specify a number of messages, and an action to perform when that limit is reached:

  • warn - issue a warning to the console and accept the message onto the queue. The warning is only issued once when the limit is crossed.
  • trim - delete an old message from the queue to make space for the new message.
  • drop - drop the new message, do not delete existing queued messages.
  • kill - issue a warning and kill the connection and queue. This handles the case when publishers are extremely unbalanced.

To override these limits, edit amq_server.cfg (not the base config file) e.g.:

amq_server.cfg:
<?xml version="1.0"?>
<config>
    <queue_profile name = "shared">
        <limit name = "warn" value = "500" />
        <limit name = "drop" value = "1000" />
    </queue_profile>
    <queue_profile name = "private">
        <limit name = "warn" value = "500" />
        <limit name = "trim" value = "1000" />
    </queue_profile>
</config>

In Direct Mode, private queue limits are applied on the server, for cases when slow networks cause backlogs. However, only the warn and drop actions are implemented on such backlogs. To set server-side queue limits for Direct Mode backlogs, edit amq_server.cfg e.g.:

amq_server.cfg:
<?xml version="1.0"?>
<config>
    <queue_profile name = "private">
        <limit name = "warn" value = "5000" />
        <limit name = "drop" value = "10000" />
    </queue_profile>
</config>

OpenAMQ also lets you define per-queue profiles. When your application creates a queue, using the Queue.Declare method (the amq_queue_declare() method in the WireAPI interface), it can specify a profile name, as follows:

  • The application must be capable of constructing and passing an arguments table.
  • It creates an argument field called "profile" with the value of the profile it wants to use for that queue.
  • OpenAMQ then takes the profile definition from the configuration data.

This only happens when the queue is created; if the Queue.Declare is specified for an existing queue, it's profile is not modified. If no profile is specified in the Queue.Declare method, OpenAMQ uses 'private' for exclusive queues and 'shared for non-exclusive queues.

Note that to use the queue limits properly you must enable window control, as explained below.

Server queue window control

The OpenAMQ server by default sends messages off queues and through to the protocol layer as fast as it can. This is fast but it means that message queues are usually empty, and so queue limits will not be applied. You can enable a credit-based window control which ensures that only a certain number of messages are sent in advance to the protocol layers, and the rest remain on the queue.

There are two options for window control, one for private queues and one for shared queues. Each can be specified on the command line or in a configuration file:

/config/tuning/private_credit
Defines the credit window for private queues. The credit window regulates the flow of messages internally. A higher value will move messages faster off private message queues and into the protocol transport threads. This will increase server memory consumption (before configured queue limits are applied) but can improve performance. For high performance applications we recommend using Direct Mode. To disable credit based flow control on private queues, set to zero. Default value is 1024.
/config/tuning/shared_credit
Defines the credit window for shared queues. The credit window regulates the flow of messages internally. The default value of 1 will provide the best response to blocked/slow clients working on shared queues. Set to zero to disable credit based flow control on shared queues. Default value is 1.

Managing socket exhaustion

An OpenAMQ server can be vulnerable to socket exhaustion, depending on the number of clients and the operating system configuration. The server's strategy in case all available sockets are used is to reject new incoming connections during a configurable period, and then to retry.

You can tune this period via the —accept_retry_timeout command-line argument (/config/tuning/accept_retry_timeout), which defaults to 60 seconds.

Tuning the memory allocator

OpenAMQ uses a memory subsystem that can be tuned for different purposes. The tradeoff is between performance, and reporting of errors for debugging and to detect memory leaks.

The memory subsystem selects an 'allocator' depending on the value of the ALLOCATOR environment variable:

ALLOCATOR=fat
The fat allocator tracks memory leaks and reports the source file and line number for any leak. This is the default allocator for debug builds. (when OpenAMQ is built with BOOM_MODEL=debug).
ALLOCATOR=direct
The direct allocator does no memory leak detection, and no debug tracking. It is a very thin layer over the system memory allocator (malloc). This is the fastest allocator on Linux. This is the default allocator for production builds (when OpenAMQ is built with BOOM_MODEL=release).
ALLOCATOR=thin
The thin allocator tracks memory leaks and reports these when the server stops. This is the fastest allocator on Windows and Solaris, and probably other POSIX systems.

We recommend that in a high-performance scenario (over 20k messages per second) you use ALLOCATOR=direct, while in normal scenarios you leave the default setting. As with all tuning options, test before and after using throughput and latency tests.

The choice of memory allocator, as well as build model (release vs debug) can also have a significant impact on the performance of client applications that do over 20k messages in or out per second. The fastest configuration on recent Linux is ALLOCATOR=direct and BOOM_MODEL=mt,release. On other platforms with less efficient memory management subsystems, ALLOCATOR=thin may be faster.

Testing throughput

The standard test tool for performance is amq_client. This sends a number of messages to a private temporary queue, and reads the messages back off that queue.

Start the server with monitoring enabled (so that it displays the traffic rate):

amq_server --monitor 1

Start multiple instances of amq_client on one or a series of test systems:

amq_client -s !server:port! -n 20000 -x 500 -r 0

This is an example of the monitor output produced by the server:

I: incoming rate=2545 mean=2392 peak=4115
I: outgoing rate=2545 mean=2392 peak=4114 iomean=4784
  • The incoming rate represents the number of messages (AMQP contents) read off the network each second.
  • The outgoing rate represents the number of messages written to the network each second.
  • The mean rates represent a rolling average per second calculated over the previous ten seconds.
  • The peak rates represent the highest value over the previous ten seconds.
  • the iomean rate represents the combined input and output average per second calculated over the last ten seconds.

When you tune the server performance, it is the iomean that you should be aiming to improve.

Direct Mode

Overview

OpenAMQ/1.3 and later releases support Direct Mode, a protocol for specific kinds of high-speed applications, as explained later. Direct Mode runs over Direct Messaging Protocol (gro.pqma.ikiw|PMD-4#gro.pqma.ikiw|PMD-4), a specification developed by wiki.amqp.org. In Direct Mode, OpenAMQ will transfer messages as much as 5x faster. The following table shows the typical differences between normal AMQP publishing and Direct Mode in OpenAMQ:

Normal AMQP Direct Mode
Latency 250 usecs 185 usecs
Client capacity 19,000 msg/sec 130,000 msg/sec
Broker capacity 120,000 msg/sec 600,000 msg/sec

The speed improvement comes from a simpler wire-level encoding for messages, message batching, and the ability to push messages through the OpenAMQ server with less processing. Client applications will also benefit from reduced CPU usage.

How to use

Direct Mode works for the following scenarios:

  • Publishers that work primarily with one or a few exchanges.
  • Subscribers that consume from one or a few private queues.

These are typical high-volume scenarios, for topic-based publish/subscribe, and service based request/response.

Your WireAPI applications can use Direct Mode with no application changes. You can test it simply by starting the broker with this option:

amq_server --direct 1

To use Direct Mode, you must do at least one of these:

  • Enable all compatible clients by setting the tuning/direct option to 1 in the OpenAMQ amq_server.cfg configuration file or using the "—direct 1" command line option. We recommend this as being the easiest option.
  • Enable specific clients by modifying the wireapi.cfg configuration file for that client application.
  • Enable specific connections from your WireAPI application or framework by setting connection->direct to TRUE on all new connections.

To enable Direct Mode for specific clients (without any program changes), create a file 'wireapi.cfg' in the working directory of your client applications:

wireapi.cfg:
------------
<config>
    <tuning direct = "1" />
</config>

If either the server was started with "—direct 1", or the wireapi.cfg file specifies this setting, connections will be created with the direct property set to true by default. Applications can still override this.

Tuning Direct Mode

You can tune the batching size used in Direct Mode connections. For the server, use the —batching command line option, or this setting in amq_server.cfg:

You can tune the batching size using this option, on the command line or in the amq_server.cfg or wireapi.cfg configuration file:

/config/direct/batching
Defines the maximum batch size for Direct Mode opportunistic batching on message sends. Setting this higher will improve throughput, and usually with lower latency, but will cause higher memory consumption. Setting this to zero will switch off batching. Any value less than 2048 is treated as zero. Default value is 32768. Maximum value is 2097152.

Server-side backlogs

When the client-side network is too slow to handle incoming data, messages will back-up on the server. These backlogs can be managed using the server-side configured queue limits. Only the warn and drop actions are implemented on such backlogs. To set server-side queue limits for Direct Mode backlogs, edit amq_server.cfg e.g.:

amq_server.cfg:
<?xml version="1.0"?>
<config>
    <queue_profile name = "private">
        <limit name = "warn" value = "5000" />
        <limit name = "drop" value = "10000" />
    </queue_profile>
</config>

Prerequisites

To use Direct Mode you need a full OpenAMQ/1.3 build or later. You must relink your applications with the new WireAPI libraries and you must run a 1.3 or later version of the amq_server. You should use the most recent OpenAMQ/1.3 release.

Interoperability

Direct Mode requires a compatible client library. Currently, only OpenAMQ's WireAPI library implements Direct Mode, and with other clients the OpenAMQ server will work in normal AMQP mode.

You can freely mix applications using Direct Mode and applications using normal AMQP message transfer. Messages are fully interoperable between these two protocols and the amq_server broker will automatically use the appropriate message transfer protocol depending on the client's capabilities and configuration.

Technical specifications

For technical details of on the Direct Messaging Protocol, please refer to http://wiki.amqp.org/spec:4.

Direct Mode allows up to six unique exchanges or unique private queues per connection and limits a message to 2Mb. Applications that try to use Direct Mode with a higher number of exchanges or private queues will fall back to using normal AMQP message transfer.

The latency-measurement Chronometer

OpenAMQ/1.3c3 and later include a latency-measurement feature called the "Chronometer". The Chronometer calculates the cost of each layer a message passes through from sender to recipient:

  • The time it waits in WireAPI before being sent.
  • The time it spends on the network from sender to server.
  • The time it spends inside the server.
  • The time it spends on the network from server to recipient.
  • The time it spends in WireAPI before being placed in the recipient's arrived queue.

The Chronometer works as follows. Outgoing messages are stamped with an initial timestamp. This is placed into the Correlation-Id property of messages. Each of the layers that wishes to measure latency adds a time delta, if the message has a timestamp. The final receipient calculates the real deltas for each layer and sends a pingback to an exchange on the AMQP server. We'll explain how to collect pingbacks, but first we explain how to get the Chronometer working.

Enabling and configuring the Chronometer

To enable and configure the Chronometer you must set configuration options in the sender's wireapi.cfg configuration file:

/config/chrono/enabled
Specifies whether chrono pingbacks are enabled or not. If not, then any any chrono information in contents will be ignored. By default chrono pingbacks are disabled. Default value is 0.
/config/chrono/floor
Specifies the floor for pingbacks, in milliseconds. Only pingbacks that show an end-to-end latency greater or equal to the floor will be reported. If you set this to zero, all pingbacks will be reported. Default value is 1.
/config/chrono/density
Specifies the sampling density, as a permille. Valid values are 1 to 1000. By default 1/1000th of messages will be sampled. The sampling is randomized if the density is less than 1000 permille. Default value is 1.
/config/chrono/batch
Specifies the batching size in bytes for chrono pingback messages. If you change the floor, density, or batch size, test carefully to ensure you do not create too much chatter due to many small pingbacks. A single pingback is about 15 bytes. The default batch size is 150 bytes.

You should set the density so that the Chronometer measures about 10 messages per second. This will detect spikes of a fraction of a second without creating excessive chatter. The default settings are appropriate for high volume scenarios of 10k messages per second and upwards.

Notes:

  • The resolution of the Chronometer is 1msec. Its accuracy depends on accurately synchronized clocks across all systems - sender, server, and recipient. Any latency that exceeds 60 seconds is not tracked.
  • The cost of the Chronometer in terms of message size is about 20 bytes.
  • Applications that already use the Correlation-Id property cannot use the Chronometer.

Collecting Chronometer Pingbacks

The Chronometer publishes pingbacks to the amq.status exchange with the routing key "chrono". You can collect pingbacks using a simple AMQP application. We provide an example in wireapi/amq_client_chrono.pal:

<?xml?>
<pal script = "amq_pal_gen">
    <session>
        <queue_declare exclusive = "1" />
        <queue_bind exchange = "amq.status" routing_key = "chrono" />
        <basic_consume queue = "$queue" />
        <repeat>
            <wait />
            <basic_arrived>
                <echo>$body_text</echo>
            </basic_arrived>
        </repeat>
    </session>
</pal>

A Chronometer pingback message is a string containing five numbers, which represent the cost of processing in each of these layers:

  1. The time it waits in WireAPI before being sent.
  2. The time it spends on the network from sender to server.
  3. The time it spends inside the server.
  4. The time it spends on the network from server to recipient.
  5. The time it spends in WireAPI before being placed in the recipient's arrived queue.

You can process pingbacks using a spreadsheet, Perl script, or other number crunching tool.

High-availability failover

OpenAMQ provides two orthogonal clustering functionalities:

  1. High-availability failover in which a pair of servers act as primary and backup so that if one server crashes, the other will still be available.
  2. Federation, in which servers are built into wide area networks known as 'federations'.

In this section we explain how to use high-availability failover.

Using failover

General

OpenAMQ's failover model consists of two dedicated servers (OpenAMQ server processes) in a primary-backup pair. At any given time, one of these accepts connections from client applications (it is the "master") and one does not (it is the "slave"). Each server monitors the other. If the master disappears from the network, the slave takes over as master. This happens after a configurable timeout, so that transient problems can be handled without failover.

The failover model is designed to solve these problems:

  • To provide a straight-forward high-availability solution.
  • To be simple enough to use without trouble.
  • To failover reliably when needed, and only when needed.
  • To be simple to understand and use for client applications.

Failover scenarios

Assuming we have a failover pair running, here are the different scenarios that will result in failover happening:

  1. The hardware running the primary server has a fatal problem (power supply explodes, machine catches fire, or someone simply unplugs it by mistake), and disappears. Applications see this, and reconnect to the backup server.
  2. The network segment on which the primary server sits crashes - perhaps a router gets hit by a power spike - and applications start to reconnect to the backup server.
  3. The primary server crashes or is killed by the operator.

Recovery process

Recovery from failover works as follows:

  1. The operators restart the primary server and fix whatever problems were causing it to disappear from the network.
  2. The operators stop the backup server, at a moment that will cause minimal disruption to applications.
  3. When applications have reconnected to the primary server, the operators restart the backup server.

Recovery (to using the primary server as master) is a manual operation. In our experience, automatic recovery is undesirable. The failover of an OpenAMQ network creates an interruption of service to applications, possibly lasting 10-30 seconds. If there is a real emergency, this is much better than total outage. But if recovery creates a further 10-30 second outage, it is better that this happens off-peak, when users have gone off the network.

When there is an emergency, we also prefer to create predictability for those trying to fix things. Automatic recovery creates uncertainty for operators, who can no longer be certain which server is in charge, without double-checking.

Lastly, we have seen situations with automatic recovery where networks will fail over, and recover, and operators are then placed in a difficult position to analyse what happened.

Note that OpenAMQ's failover feature will fail back to the primary server if this is running (again) and the backup server should fail.

Normal shutdown process

The shutdown process for a failover pair is to either:

  1. Stop the passive server and then stop the active server, or
  2. Stop both servers in any order but within a few seconds of each other.

Stopping the active and then the passive server with any intervening delay will force applications to disconnect, then reconnect, then disconnect again, which may disturb users.

Split-Brain prevention

"Split-brain" is the syndrome in which different parts of a cluster thing they are 'master'. The OpenAMQ failover mechanism has an algorithm for detecting and eliminating split brain, based on a three-way decision mechanism (a server will not decide to become master until it gets application connection requests and it cannot see its peer server).

However it is possible to (mis)design a network to fool this algorithm. A typical scenario would a failover pair distributed between two buildings, where each building also had a set of applications, and there was a single network link between both buildings. Breaking this link would create two sets of client applications, each with half of the failover pair, and each failover server would become active.

To prevent split-brain situations, we must connect failover peers using a dedicated network link, which can be as simple as plugging them both into the same switch or better, using a cross-over cable directly between two machines.

We must not split a failover pair into two islands, each with a set of applications. While this may be a common type of network architecture, we use federation (see later), not high-availability failover, in such cases.

A suitably paranoid network configuration would use two private cluster interconnects, rather than a single one. Further, the network cards used for the cluster would be different to those used for message in/out, and possibly even on different PCI paths on the server hardware. The goal being to separate possible failures in the network from possible failures in the cluster. Network ports have a relatively high failure rate.

Alternatives to failover

In general, OpenAMQ is designed to never crash. Further, if it crashes it is designed to be restartable very rapidly using a simple shell script as explained in the section "Automated restarts" above. In other words, simply restarting a single server may be more appropriate to the level of reliability you need than failover.

How it works

Implementation

We made OpenAMQ's failover functionality as simple as it could be. In fact the current implementation is the third complete redesign. Each of the previous designs we found to be too complex, trying to do too much, and we stripped out functionality until we came to a design that was easy to understand and use, and reliable enough to be worth using.

These are our requirements for a high-availability architecture:

  1. The failover is meant to provide insurance against catastrophic system failures, such as hardware breakdown, fire, accident, etc. To guard aganst software crashes (in which the OpenAMQ server crashes) there are simpler ways to recover.
  2. Failover time should be under 60 seconds and preferrably under 10 seconds.
  3. Failover has to happen automatically, whereas recover must happen manually. We want applications to switch over to the backup server automatically but we do not want them to switch back to the primary server except when the operators have fixed whatever problem there was, and decided that it is a good time to interrupt applications again.
  4. The semantics for client applications should be simple and easy for developers to understand. Ideally they should be hidden in the client API.
  5. There should be clear instructions for network architects on how to avoid designs that could lead to "split brain" syndrome in which both servers in a failover pair think they are the master server.
  6. There should be no dependencies on the order in which the two servers are started.
  7. It must be possible to make planned stops and restarts of either server without stopping client applications (though they may be forced to reconnect and reregister).
  8. Operators must be able to monitor both servers at all times.
  9. It must be possible to connect the two servers using a high-speed dedicated network connection. That is, failover synchronization must be able to use a a specific IP route.

We make these assumptions:

  1. A single backup server provides enough insurance, we don't need multiple levels of backup.
  2. The primary and backup server are equally capable of carrying the application load. We do not attempt to balance load across the servers.
  3. There is sufficient funding to pay for a fully redundant backup server that does nothing almost all the time.

Out of scope

What we do not attempt to do includes:

  1. The use of an active backup server or load balancing. In a failover pair, the backup server is inactive and does no useful work until the primary server goes offline.
  2. The handling of persistent messages or transactions in any way. Our failover design is incompatible with server-side persistence and AMQP transactions. This is consequent with iMatix's view on how to implement persistence and transactions, which is end-to-end, assuming a network of unreliable (and probably untrusted) servers or failover pairs.
  3. Any automatic exploration of the network. The failover pair is manually and explicitly defined in the network and is known to applications (at least in their configuration data).
  4. Replication of exchanges, queues, bindings, or messages between servers. All server-side state much be recreated by applications when they fail over.

Terminology

Primary
The primary server is the one that is normally 'master'.
Backup
The backup server is the one that is normally 'slave', it will become master if and when the primary server disappears from the network, and when client applications ask the backup server to connect.
Master
The master server is the one of a failover pair that accepts client connections. There is always exactly one master server.
Slave
The slave server is the one that takes over if the master disappers. Note that when a failover pair is running normally, the primary server is master, and the backup is slave. When a failover has happened, the roles are switched.
Peering
A peering is the relationship between two servers. A failover pair uses two peerings, one in each direction.

Failover configuration

We designed the failover mechanism to be simple to configure and use. You can configure failover either from the command line or from a configuration file.

To configure a failover pair, you need to:

  1. Tell the primary server where the backup server is.
  2. Tell the backup server where the primary server is.
  3. Optionally, tune the failover response times.

You can configure the failover pair using these command line options:

--backup                Failover backup host:port, OR
--primary               Failover primary host:port
--failover_timeout      Failover timeout, in secs

Or, you can set these properties in the amq_server.cfg file for each server:

amq_server.cfg for the primary server:
------------------------------------------------------------------------
<?xml?>
<config>
    <failover
        backup = "backup-host:port"
        timeout = "seconds"
        />
    />
amq_server.cfg for the backup server:
------------------------------------------------------------------------
<?xml?>
<config>
    <failover
        primary = "primary-host:port"
        timeout = "seconds"
        />
    />

Notes:

  • Do not mix 'primary' and 'backup' settings, or the results will be bogus.
  • Use the same timeout setting on both servers, otherwise client applications may fail to failover properly.
/config/failover/backup
Used when running the primary server, specifies the failover backup server for the high-availability pair. Use the internet name of the backup server as 'host' or 'host:port' if it is not running on port 5672. Do not specify this option together with the 'primary' option. Default value is ''
/config/failover/primary
Used when running the backup server, specifies the failover primary server for the high-availability pair. Use the internet name of the primary server as 'host' or 'host:port' if it is not running on port 5672. Do not specify this option together with the 'backup' option. Default value is ''
/config/failover/timeout
Specifies the delay in seconds after which the backup peer will become the primary peer. This switch over will happen only if and when an application connects to the backup peer after the failover timeout has expired. Reducing this value will allow failover to happen faster but will increase the risk of unnecessary failover due to transient network issues. Default value is 5.

Credentials

Failover peers connect to each other using the username 'peering'. The password for this defaults to "peering". If you wish to change this password you must change it in two place:

  • In the amq_server security section, as you would change the password for any connecting client.
  • In the peering configuration file, amq_peering.cfg, as follows:
<config>
    <security name = "plain">
        <user name = "peering" password = "peering" />
    </security>
</config>

Simple example

This PAL program demonstrates failover:

failover.pal:
------------------------------------------------------------------------
<?xml?>
<!--
    Demonstration of failover
    This script connects to a high-availability pair and reports the
    current status of the failover pair.
    -->
<pal script = "amq_pal_gen"5
    <session server = "localhost:5555 localhost:6666" failover = "5000">
        <if name = "server_port" value = "5555">
            <echo>Connected to primary server</echo>
        </if>
        <else>
            <echo>Connected to backup server</echo>
        </else>
        <wait />
    </session>
</pal>

To build this, run the command 'pal failover'. We start two OpenAMQ servers as follows, in two separate windows:

amq_server --port 5555 --backup localhost:6666
amq_server --port 6666 --primary localhost:5555

We demonstrate failover by starting the 'failover' test program and then killing the primary server:

$ failover
Connected to primary server
14:17:14: W: connection to server was lost, failing over
14:17:15: E: connection to server failed: Socket error: Connection refused (localhost:5555)
Connected to backup server

Client-side support

The current WireAPI implementation does not hide failover semantics, and any application that needs to use this must have some explicit functionality:

  1. Client applications need to know both failover server addresses, usually best taken from configuration data.
  2. Client applications must try to connect to the primary server, and if that fails, to the backup server.
  3. Client applications should retry this connection at least twice, and with a delay between retries. The delay should be equal to or greater than the sum of the server's failover timeout setting.
  4. Client applications should be able to recreate all exchanges, queues, and bindings when (re)connecting to a server.
  5. Client applications should be able to retransmit messages lost during a failover, if messages need to be reliable.

We hope to be able to fully support these semantics in a future WireAPI version. If you urgently need such functionality, contact us.

To detect failover, client applications must (this is taken from the PAL framework for scripting OpenAMQ test applications, which implements failover):

  • Detect a failed connection, indicated by the connection>alive property being false, and the connection->reply_code being 100.
  • Wait for some short interval (for example, five seconds).
  • Connect to the first available server.

To do this properly, an application needs to know the primary and the backup servers. Typically it knows this by splitting the server name into two tokens.

Here is an example C program that demonstrates failover handling:

/*===========================================================================
    failover_test.c - demo of high-availability failover in C
    To run, start two OpenAMQ servers, one as primary on port 5672 and one
    as backup on port 6666.  You can then stop and restart servers as you
    like and this program will report the status of the failover pair.
    By iMatix Corporation, April 2008.  Code released into the public domain.
 *===========================================================================*/

#include "asl.h"
#include "amq_client_connection.h"
#include "amq_client_session.h"

//  Establish or reestablish connection and session
static int s_establish_session (char *server_name);

//  Static source-global variables
static amq_client_connection_t
    *s_connection = NULL;               //  Current connection
static amq_client_session_t
    *s_session = NULL;                  //  Current session

int
main (int argc, char *argv [])
{
    int
        status = -1;                    //  Show status changes only
    //  Initialise iCL system
    icl_system_initialise (argc, argv);

    FOREVER {
        if (s_establish_session ("localhost:5672 localhost:6666")) {
            if (status != 0) {
                status = 0;
                puts ("Neither server is reachable at present");
            }
            apr_sleep (1000 * 1000);    //  Wait one second and retry
        }
        else
        if (streq (s_connection->server_port, "5672")) {
            if (status != 1) {
                puts ("Connected to primary server");
                status = 1;
            }
        }
        else {
            if (status != 2) {
                puts ("Now connected to backup server");
                status = 2;
            }
        }
        //  Wait for something to happen on the session, if it's active
        if (s_session)
            amq_client_session_wait (s_session, 0);
        //  Now check if the server disappeared (without an error)
        if (s_connection && !s_connection->alive) {
            if (s_connection->reply_code == 100) {
                puts ("W: connection to server was lost, failing over");
                amq_client_session_destroy (&s_session);
                amq_client_connection_destroy (&s_connection);
            }
            else {
                if (s_session)
                    printf ("E: %d - %s\n", s_session->reply_code, s_session->reply_text);
                else
                    printf ("E: %d - %s\n", s_connection->reply_code, s_connection->reply_text);
                break;                      //  Exit if we got a real error
            }
        }
    }
    //  Clean up and exit
    amq_client_session_destroy (&s_session);
    amq_client_connection_destroy (&s_connection);
    icl_system_terminate ();
    return (0);
}

//  Establish connection and session
//
static int s_establish_session (char *server_name)
{
    icl_longstr_t
        *auth_data;                     //  Login authorisation
    ipr_token_list_t
        *host_list;                     //  List of known hosts
    ipr_token_t
        *token;                         //  Next host to try
    int
        rc = 0;                         //  Return code

    //  Both connection and session must be null when we start
    assert (!s_connection && !s_session);
    //  Login using default guest credentials
    auth_data = amq_client_connection_auth_plain ("guest", "guest");
    //  Split host name into tokens, and check we have one or two names
    host_list = ipr_token_split (server_name);
    assert (ipr_token_list_count (host_list) == 1
         || ipr_token_list_count (host_list) == 2);

    token = ipr_token_list_first (host_list);
    while (token) {
        s_connection = amq_client_connection_new (
            token->value, "/", auth_data, "failover test", 0, 30000);
        if (s_connection) {
            ipr_token_unlink (&token);
            break;
        }
        token = ipr_token_list_next (&token);
    }
    ipr_token_list_destroy (&host_list);
    icl_longstr_destroy (&auth_data);

    if (s_connection) {
        s_session = amq_client_session_new (s_connection);
        if (!s_session) {
            puts ("E: could not open session to server");
            rc = -1;
        }
    }
    else
        rc = -1;

    return (rc);
}

Known limitations

In the current OpenAMQ product, federation and failover do not work together. That is, you can use failover to create a high-availability server pair for a set of applications, or you can use federation to create a wide-area network of servers for distributed applications, but you cannot make federations out of high-availability pairs. This limitation, which affects only the very largest deployments, is scheduled to be resolved in a near future release. If you need this feature urgently, please contact us.

Other limitations:

  • A server process cannot be part of more than one failover pair.
  • A primary server can have a single backup server, no more.
  • The backup server cannot do useful work while in slave mode.
  • The backup server must be capable of handling full application loads.
  • Failover configuration cannot be modified at runtime.
  • Client applications must do some work to benefit from failover.

Debugging failover

If you use a failover pair, you should test it. To do this, start it as normal and then simulate various faults:

  • Unplug one or both server systems from the network
  • Kill one or both of the server processes and restart one or both of them

OpenAMQ prints a summary of its failover status to the console. If you need to get more information, run the server with this option:

amq_server ... --debug_peering 1 ...

This will show more verbose messages for what is happening between the two servers. Use this option if you are capturing output to send to iMatix for technical support.

Tuning failover

The main tuning concern is how frequently you want the servers to check their peering status, and how quickly you want to activate failover. You tune using this setting:

  1. The failover timeout value. This defaults to 1 second. If you reduce this, the backup server will take over as master more rapidly but may take over in cases where the primary server could recover. You may for example have wrapped the primary server in a shell script that restarts it if it crashes. In that case the timeout should be higher than the time to restart the primary server.

The application behavior can also have an impact. In general we recommend you stick with the defaults of five-second timeout, and in applications use a 10-second pause between reconnect attempts.

Federation

Using federation

General

OpenAMQ's federation model lets architects build networks of OpenAMQ servers that implement specific kinds of message flows corresponding to the main types of work we do with OpenAMQ networks. The two main reasons for using federation are:

  1. To build very high-performance pub-sub architectures, for market-data and other scenarios with high volumes of data going to very many subscribers.
  2. To partition a large network geographically, e.g. between central and regional offices, for technical and network management reasons.

We always federate two exchanges in a client-server fashion. That is, the exchange on one server is "attached" to the identically-named exchange on another server. Generally these servers have an asymetric parent-child relationship (e.g. a central parent and regional children) and the attachment is done in one direction only.

The federation model is designed mainly to allow clients applications that speak to a particular child server to:

  • Subscribe to data published on a parent server, and receive this data;
  • Send requests to services hosted on a parent server, and receive responses.

Scenarios

  • A regional location is connected to a central location by a slow satellite link. The regional applications need to access market data and business services that are provided centrally. Traffic across the satellite link must be optimised so that messages to multiple subscribes are sent only once across the link.
  • A customer requires a message broker installed at their location so that local applications can interoperate; these applications also subscribe to data feeds from a central server.

Federation models

These are a number of plausible network architectures, in which each node in the network is a failover pair (note: federation of failover pais is currently not supported by OpenAMQ but will be in a future release) or a single stand-alone server:

  1. A star network, with a single central node and many distributed nodes. This would typically be used to send information from a central point to regional networks, each serving a set of local applications and users.
  2. A tree network, with one top-level node, a small number of second-level nodes, and more nodes connected to these, in a tree hierarchy. This would typically be used to create extremely-high volume data publishing networks (capable of delivering many millions of messages per second).
  3. A loose network, with ad-hoc relationships between nodes organised at regional, national, and global levels. This would typically be used for real-life global organisations with diverse information streams, each involving a set of nodes.

How it works

Exchange federation

From the operator point of view, there are two levels of federation:

  • Federate the whole server (using the —attach option)
  • Federate individual exchanges (using per-exchange configuration)

"Whole server" federation is actually done by federating a specific set of exchanges, so it is worth understanding exchange federation, as this is the building block.

One exchange may be federated to the identically-named exchange on a parent server. You can, in a single server, federate various exchanges to different parent servers. We don't recommend this for beginners, it gets complex and invites error.

Note that OpenAMQ does not allow a single exchange to be federated more than once.

Federation types

There are several types of federation. These can be grouped into 'primitive' federation types that do one specific thing, and 'compound' federation types that do a more useful high-level job. All federations work with a 'local' exchange and a 'remote' exchange on the parent server.

The primitive federation types are:

  • "subscriber" - used for publish-subscribe scenarios, to pull required messages down from a parent server and passes them out to subscribers on the local server.
  • "publisher" - used for request-response scenarios, to unconditionally forward requests on one server to the parent server.
  • "locator" - used for request-response scenarios, to try to process requests locally and forward them to the parent server if no local service was found.

Note that "subscriber", "publisher", "request", and "service" are not formal AMQP terms but rather common terms to describe familiar business messaging scenarios. We'll explain in more detail how these federation types work later, in terms of AMQP semantics.

The compound federation types are:

  • "fanout" - used for publish-subscribe scenarios where publishers can be on any node in a federation, not just the root node. The fanout federation works by pushing all published messages to the root node, and fanning them out again to all federation nodes.
  • "service" - used for enterprise service bus (ESB) scenarios, where requests are forwarded to the nearest service, and responses come back to the original requestor.

We'll explain in more detail how federations work. Remember that AMQP works with the concepts of exchanges, queues, and bindings: an exchange is a routing engine in the server; a queue holds messages destined for one or more applications, and bindings tell the server how to route messages from any given exchange to a set of queues. We use "command" and "message" as less technical names for the AMQP concepts of "method" and "content".

The subscriber federation type

The subscriber federation replicates bindings made on one exchange to the same exchange on the parent server, so that messages are pulled down from it. Here is how the subscriber federation actually works:

  • The subscriber federation creates a private queue on the parent server, and consumes messages off that queue.
  • It monitors the local exchange for queue.bind and queue.unbind commands.
  • When it sees a queue.bind or queue.unbind command on the local exchange it sends the same command to the remote exchange, binding or unbinding its private queue using the same arguments. This effectively propagates all local subscriptions to the remote server.
  • The remote exchange will start delivering, into the private queue, messages that match the binding criteria.
  • The subscriber federation collects these messages and re-publishes them on the local exchange, causing them to be re-distributed to all local queues that were bound with the same arguments.

If many applications bind to the same routing values, the subscriber federation still makes a single bind to the parent server. In this way, the traffic between the two servers is optimised.

This federation type is typically used on topic or header exchanges but can also make sense on a direct exchange.

The publisher federation type

The publisher federation routes all messages to the remote exchange. Here is how the publisher federation actually works:

  • The publisher federation monitors the local exchange for basic.publish commands.
  • When it sees a basic.publish command, it forwards this to the remote exchange (in addition to routing the message to local queues).
  • If the remote exchange returns a message as undeliverable, the publisher federation gets this message, and then returns it back to the original client application, if the application is still conencted.

The publisher federation is typically used for direct exchanges.

The locator federation type

The locator federation routes all messages to the remote exchange when the cannot not be routed successfully on the local exchange. This is used to "locate services" on a federated network. Here is how the locator federation actually works:

  • The locator federation monitors the local exchange for basic.public commands.
  • When it sees a basic.publish that could not be routed to one or more local queues, it forwards the command to the remote exchange.
  • If the remote exchange returns a message as undeliverable, the locator federation gets this message, and then returns it back to the original client application, if the application is still conencted.

The locator federation is typically used for direct exchanges.

The fanout federation type

The fanout federation combines the functionality of the publisher and the subscriber federations, more or less. It routes all published messages to the remote exchange and redistributes all deliveries from the parent to local applications. In detail:

  • The fanout federation monitors the local exchange for basic.publish commands.
  • When it sees a basic.publish command, it forwards this to the remote exchange, and does not route the message to local queues.
  • If the remote exchange returns a message as undeliverable, the fanout federation gets this message, and then returns it back to the original client application, if the application is still conencted.
  • The fanout federation creates a private queue on the parent server, and consumes messages off that queue.
  • It monitors the local exchange for queue.bind and queue.unbind commands.
  • When it sees a queue.bind or queue.unbind command on the local exchange it sends the same command to the remote exchange, binding or unbinding its private queue using the same arguments. This effectively propagates all local subscriptions to the remote server.
  • The remote exchange will start delivering, into the private queue, messages that match the binding criteria.
  • The fanout federation collects these messages and re-publishes them on the local exchange, causing them to be re-distributed to all local queues that were bound with the same arguments.

If many applications bind to the same routing values, the fanout federation still makes a single bind to the parent server. In this way, the traffic between the two servers is optimised.

The fanout federation type is the default federation type for topic or header exchanges, when federation is enabled for the exchange.

The service federation type

The fanout federation combines the functionality of the locator and the subscriber federations, more or less. It routes requests to the nearest service and routes responses back to the original requesting application. In detail:

  • The service federation monitors the local exchange for basic.public commands.
  • When it sees a basic.publish that could not be routed to one or more local queues, it forwards the command to the remote exchange.
  • If the remote exchange returns a message as undeliverable, the service federation gets this message, and then returns it back to the original client application, if the application is still conencted.
  • The service federation creates a private queue on the parent server, and consumes messages off that queue.
  • It monitors the local exchange for queue.bind and queue.unbind commands.
  • When it sees a queue.bind or queue.unbind command on the local exchange it sends the same command to the remote exchange, binding or unbinding its private queue using the same arguments. This effectively propagates all local subscriptions to the remote server.
  • The remote exchange will start delivering, into the private queue, messages that match the binding criteria.
  • The service federation collects these messages and re-publishes them on the local exchange, causing them to be re-distributed to all local queues that were bound with the same arguments.

The service federation type is the default federation type for direct exchanges, when federation is enabled for the exchange.

Peering

Internally, federation is implemented using 'peerings', server objects that maintain connections between pairs of servers. Peerings are also used in failover. Peerings are invisible to applications, so you don't need to manage them in any way. However they have some relevance to using federation:

  • Peerings are able to detect absent or disappeared peers, and automatically reconnect when possible. This means you do not need to synchronise server startup in a federated network. Each server can start and stop independently and all peerings to and from it will automatically reconnect when they can.
  • To debug federation you may need to debug peerings, using the —debug-peering option.
  • Currently, peerings cannot be made to failover pairs, and thus it is not possible to attach to a failover pair.

Federation configuration

Configuring federation is easy for the majority of work, and can be highly customised for specific kinds of work. OpenAMQ offers a range of options:

  • You can choose automatic ESB federation, which supports 90% of scenarios using a set of pre-defined exchanges that are federated in the "right" way.
  • You can choose bulk federation in which all exchanges except internal ones are federated to a single parent server. There are various ways to fine-tune this.
  • You can chose manual federation in which you can federate individual exchanges exactly as you need to.

Automatic ESB federation

The automatic ESB (enterprise service bus) federation option federates three specific exchanges, in a way that provides your applications an instant and easy ESB. These exchanges are:

  • amq.service (a direct exchange) is connected with a service federation to the parent server. You can use amq.service for all request-response work.
  • amq.data (a topic exchange) is connected with a fanout federation to the parent server. You can use amq.data for all pub/sub work that uses topic routing.
  • amq.dataex (a header exchange) is connected with a fanout federation to the parent server. You can use amq.dataex for all pub/sub work that uses header routing.

As with all federation, automatic ESB federation is done by configuring the child server - the parent sees nothing except one more client application. To enable automatic ESB federation, run the server with the '—attach' option as follows:

amq_server --attach hostname[:port]

By default this will use the "/" virtual host and the "peering" credentials. To use different values, use these options:

--attach_vhost /vhostpath
--attach_login userlogin

Or, you can set these options in the amq_server.cfg file for the child server:

amq_server.cfg:
------------------------------------------------------------------------
<?xml?>
<config>
    <federation
        attach = "hostname[:port]"
        attach_vhost = "path"
        attach_login = "userid"
        />
    />
/config/federation/attach
If specified, the server will auto-federate to the specified parent OpenAMQ server. This federates three exchanges: amq.service (a direct exchange) using a service federation; amq.data (a topic exchange) using a fanout federation; and amq.dataex (a headers exchange) using a fanout exchange. This gives you an instant enterprise service bus (ESB) based on a spoke-and-hub model. You can fine-tune auto-federation using the —attach-login and —attach-vhost options. Default value is ''
/config/federation/attach_vhost
Specifies the auto-federation vhost name, an arbitrary string that will be used when connecting to the parent server. This must match the vhost setting of the parent server. Default value is '/'
/config/federation/attach_login
Specifies the user name to be used when logging in. You do not need to specify a password, it is taken from the security section. Default value is 'peering'

Note that automatic ESB federation is always enabled if the —attach option is set on the command line or in the configuration file. If you want to attach some exchanges but explicitly not attach the three ESB exchanges, you can do this using manual federation.

Bulk federation

The bulk federation option federates a set of exchanges that you specify using a wildcard.

As with all federation, bulk federation is done by configuring the child server

  • the parent sees nothing except one more client application. To use bulk
amq_server --attach hostname[:port] --attach_all "pattern"

For example:

amq_server --attach localhost:5673 --attach_all "amq.*"

You can use the default, or explicitly set the virtual host and peering login credentials as for automatic ESB federation.

To set these options in the amq_server.cfg file for the child server:

amq_server.cfg:
------------------------------------------------------------------------
<?xml?>
<config>
    <federation
        attach = "hostname[:port]"
        attach_all = "pattern"
        attach_vhost = "path"
        attach_login = "userid"
        />
    />
/config/federation/attach_all
If set, the server will auto-federate all exchanges that match the specified pattern which can include * and ? to mean zero or more, or a single arbitrary character. You can use naming conventions to federate specific groups of exchanges. Put quotes around wildcards to avoid shell expansion. Default value is ''

Manual federation

With manual federation you explicitly configure the federation for each exchange that you want to federate. To do this you must use a configuration file, it is not possible to do manual federation from the command line:

amq_server.cfg:
------------------------------------------------------------------------
<?xml?>
<config>
    <federate
        exchange = "pattern
      [ attach = "hostname" ]
      [ vhost = "path" ]
      [ login = "userid" ]
      [ type = "exchange-type" ]
        />

You can define as many <federate> entries as you like. The properties for each one work as follows:

  • The exchange pattern is an exchange name or a wild card, following the same rules as the —attach_all option.
  • The hostname is the server to attach to. This is an optional property - if you do not specify it, the —attach value is used. If neither are set, you'll get an error message and the server won't start.
  • The vhost is the virtual host path to use. This is an optional property - if you do not specify it, the —attach_vhost value is used. If neither are set, the default "/" path will be used.
  • The login is the userid to use to login on the parent server. This is an optional property - if you do not specify it, the —attach_login value is used. If neither are set, the default "peering" user id will be used.
  • The type is the federation type, one of "service", "data", "subscriber", "publisher", or "locator. This is an optional property, it defaults to "service" for direct exchanges and "fanout" for all others.

Login security

The child server needs to be able to connect and login to the parent server. This means it must provide a valid password. It takes the password from the <security> section of its own configuration file. So if to attach to a parent server, a child must login as "remote012" with the password "AW766", the parent needs to have the following identical user definition in their amq_server.cfg file:

amq_server.cfg:
------------------------------------------------------------------------
<?xml?>
<config>
    <security name = "plain">
        ...
        <user name = "remote012" password = "AW766" group = "super" />
        ...
    </security>
</config>

And the child may have an identical definition in amq_peering.cfg, or may inherit from the amq_server.cfg (in which case the password are the same for incoming and outgoing peering connections).

Restrictions

  • Internal, the default, and system exchanges cannot be federated.
  • There are no restrictions on the number of federations.

Global routing keys

In a typical federated application we see various types of message flow:

  • Distribution of data from publishers at different points in the federation to subscribers scattered around the federation.
  • Routing of service requests to service handlers sitting at different points in the federation.
  • Routing of service responses back to their original requesting applications.

The first two message flows do not need any specific application support but the last does. Replies are generally sent back using the 'reply-to' property of the original requesting message. If two applications have the same reply-to value, by accident, they will get each others' service responses in an unpredictable fashion.

Thus, to make federated request-response work properly, applications must use global routing keys for the reply-to property. This is not standardized by AMQP, and so we propose to reuse a standard from another domain, namely email:

queuename@hostname

This must be formatted by the application for all private queues used to get service responses. The application must:

  • Receive the host name from somewhere, or use the connection hostname and port combination.
  • Create a temporary queue, allowing the server to produce the queue name, or using another algorithm to get unique queue names.
  • Bind the queue to the amq.service exchange (if this is being used for the request-response stream) using the routing key "queuename@hostname", where queuename is the private queue name, and hostname is the host name.

Known limitations

We assume that the connection between partitions is perhaps slow, but reliable. That is, we do not attempt to queue and forward messages in case of a network failure - messages will simply be dropped. Applications that require reliable message transfer must implement end-to-end reliability.

Federated publish-subscribe creates extra hops when the publisher and subscriber are both on a child server. In this case, messages are sent first to the parent, root server and from there back out to all child servers that need them. This is how we avoid delivering the same message more than once. However it creates extra latency. We would normally put important publishers on the root parent server.

Debugging federation

When you make a federation model, it is worth creating test programs to test the overall working of the network. We will collect a set of test programs that can be used for testing federations.

To debug peerings, use the —debug_peering 1 command line option. This will cause extra output on the console window, which is useful if you need to report a problem to iMatix.

Comments

Add a New Comment

Edit | Files | Tags | Source | Print

rating: +2+x

Author

iMatix Corporation

Table of Contents

Installing and using OpenAMQ

Introduction to OpenAMQ: This document is an introduction to the concept of business messaging in general, and to OpenAMQ in particular. It is intended for new OpenAMQ users who wish to understand the problems that OpenAMQ solves, and how OpenAMQ can be useful in software applications.

Basic use of OpenAMQ: This document explains how to get OpenAMQ running on your system. It explains how to download the software, how to unpack and build it (if you are using a source package), and how to run basic tests on the resulting software.

Advanced use of OpenAMQ: This guide is for people who need to configure and manage OpenAMQ servers. We explain how to configure and tune an OpenAMQ server, covering these topics: logging, monitoring, high-availability failover, and joining OpenAMQ servers into wide-area federations.

Writing applications

Programming WireAPI: This is the main guide for developers who wish to use OpenAMQ in their applications. We describe WireAPI, the C/C++ API that OpenAMQ provides for accessing AMQP. Expert WireAPI users may wish to read the iMatix iCL guide, but this document is otherwise self-complete.

Programming PAL: This guide is for OpenAMQ developers who need a quick way to write test cases and simple scenarios. We explain the PAL language, an XML scripting tool that gives you a fast way to construct AMQP applications to test routing models, performance and stability tests, and other test cases.

Programming the Console: This document explains how to write applications that automate management of OpenAMQ servers via console automation. The OpenAMQ console automation architecture offers developers different ways of accessing the functionality of the console API and integrating it with their own preferred tools and management facilities.

Technical library

Developer's Guide to ASL: This is a technical guide for protocol developers who wish to use the iMatix ASL framework for the development of connected client-server protocols. ASL is a generic framework that uses a protocol modeling language to construct the whole infrastructure for a given protocol. ASL was built primarily to support AMQP.

Developer's Guide to iCL: This is a technical guide for developers who wish to understand how the iMatix iCL framework works. iCL is a class-oriented modelling language for C applications and is one of the basic frameworks used in iMatix applications such as OpenAMQ.

Developer's Guide to MOP: This is a technical guide for developers who wish to understand how the iMatix code generation frameworks are constructed. We explain the principles of model oriented programming, and the basics of code generation using the iMatix GSL language. This provides essential basic knowledge for anyone intending to modify the OpenAMQ software.

Developer's Guide to SMT: This is a technical guide for developers who wish to understand how the iMatix SMT framework works. To use this guide the reader should be familiar with the iMatix iCL framework, and the iMatix Model Oriented Programming principles.

RFCs

The CML Request for Comments: We describe a generic technique for managing AMQP servers from a remote client application. This technique consists of a standard transport mechanism built over AMQP, and a standard XML language used to exchange information between a management component built-in to the server, and a management application. This is a request for comments, it is not a standard.