Programming WireAPI

Writing OpenAMQ Applications in C/C++

Overview

What is WireAPI?

WireAPI is a standardised API that closely maps to the semantics of the AMQP wire level protocol. For example, the AMQP method Queue.Declare maps to a WireAPI function called amq_client_session_queue_declare. We can contrast this approach with the one taken by the other kinds of API that hide the semantics of working with AMQP are hidden to the developer. WireAPI has several advantages over more abstracted APIs:

  • Any version of AMQP is cleanly mapped to identical semantics in WireAPI.
  • Developers do not need to learn new semantics - once they know the protocol they know the API.
  • Developers have full control over all aspects of the middleware, not being restrained by any unnecessary API abstraction.
  • WireAPI semantics are portable to all languages (note that iMatix does not yet offer client libraries in languages other than C/C++).

The negative aspects of WireAPI are:

  • It can be complex for beginners.
  • It forces the developers to understand the protocol.
  • There are some useful abstractions missing from WireAPI.

Overall WireAPI works well because AMQP was designed to used in this way; its semantics were designed to be meaningful to application developers. We plan to develop a WireAPI/2 that adds some useful abstractions such as end-to-end reliability, end-to-end encryption, message eventing (asynchronous deliver) and so on.

Goals and principles

The WireAPI is built using the same technology as the OpenAMQ server, and provides a component-based interface to the AMQ protocol from the client side. It is designed to:

  • Provide full asynchronous background operations.
  • Work both in single-threaded and multi-threaded applications.
  • Provide full access to all AMQP methods except those used for connection and session start-up and shut-down.
  • Provide full access to all AMQP method properties.
  • Act as the basis for other language APIs based on the WireAPI model.
  • Be 100% portable to all mainstream operating system platforms.
  • Work with any AMQP server implementation.

Architecture of C/C++ WireAPI

The C/C++ WireAPI implementation provided by iMatix for OpenAMQ is a multithreaded application in two halves. One half runs as a background thread , processing protocol methods and doing network i/o, so that messages can be sent and received indepedently of application work. The second half runs in the application thread, and provides API-level functionality, including implementation of the WireAPI objects (connections, session, etc.)

This design has several consequences that you should take into account when designing and building your application and/or messaging frameworks:

  • If you build OpenAMQ as a single-threaded application, background processing of messages will be unreliable. Concretely, the background thread will only process when the application calls the amq_client_session_wait () method.
  • WireAPI is safe to use in multithreaded applications, but you must not share WireAPI classes (connection, session, etc.) between application threads.

Using WireAPI

Compiling and linking

WireAPI is internally constructed using a number of iMatix tools such as iCL. However, the calling application does not need to use these tools in order to use WireAPI, except to correctly initialise and terminate iCL. It needs only the header files and the libraries. The simplest way to get these is to build the full OpenAMQ kernel, and then use the $IBASE/include and $IBASE/lib directories when compiling and linking the application.

Basic construction

WireAPI sits between the application:

.---------------.      .-------------.      .--------------.
|               | ---> |             | ---> |              |
|  Application  |      |   WireAPI   |      |  AMQ Server  |
|               | <--- |             | <--- |              |
`==============='      `============='      `=============='

Here is a trivial example that sends one message to an AMQ server. This program does no error handling so you should not copy this code directly:

#include "asl.h"
#include "amq_client_connection.h"
#include "amq_client_session.h"

int
main (int argc, char *argv [])
{
    amq_client_connection_t
        *connection = NULL;             //  Current connection
    amq_client_session_t
        *session = NULL;                //  Current session
    amq_content_basic_t
        *content = NULL;                //  Message content
    icl_longstr_t
        *auth_data;
    //  Initialise iCL system
    icl_system_initialise (argc, argv);
    //  Open session to local server
    auth_data  = amq_client_connection_auth_plain ("guest", "guest");
    connection = amq_client_connection_new (
        "localhost", "/", auth_data, "test", 0, 30000);
    icl_longstr_destroy (&auth_data);
    session = amq_client_session_new (connection);
    //  Create a content and send it to the "queue" exchange
    content = amq_content_basic_new ();
    amq_content_basic_set_body       (content, "0123456789", 10, NULL);
    amq_content_basic_set_message_id (content, "ID001");
    amq_client_session_basic_publish (
        session, content, 0, "to-address", NULL, FALSE, FALSE);
    amq_content_basic_unlink         (&content);
    //  Shutdown connection and session
    amq_client_session_destroy    (&session);
    amq_client_connection_destroy (&connection);
    //  Terminate iCL system
    icl_system_terminate ();
    return (0);
}

This shows some of the main object classes that compose WireAPI:

  • amq_client_connection: lets you connect to a server.
  • amq_client_session: lets you create a session and talk to the server using this session.
  • amq_content_basic: lets you create Basic content to send to the server, or process Basic content received from the server.

The iCL class syntax

The WireAPI classes are all constructed using iCL and some knowledge of iCL will make your life easier. However, the syntax for using iCL classes in C is consistent and we can summarise it:

  • classname_new creates a new object instance and returns a reference to that object.
  • classname_destroy takes the address of an object reference and destroys that object, if the reference is not null.
  • classname_somemethod takes an object reference along with arguments and does some work on the object.
  • classname->propertyname accesses a class property.

iCL signals errors by returning a null object (after a new) or by returning a non-zero error code (after other methods). iCL return codes are integers.

Thus the correct way to open a connection and session is actually like this:

auth_data  = amq_client_connection_auth_plain ("guest", "guest");
connection = amq_client_connection_new (
    "localhost", "/", auth_data, "test", 0, 30000);
icl_longstr_destroy (&auth_data);
if (!connection) {
    icl_console_print ("E: could not open connection");
    return (1);
}
session = amq_client_session_new (connection);
if (!session) {
    icl_console_print ("E: could not open session");
    return (1);
}

And the correct way to call a method like publish is:

int rc;
...
content = amq_content_basic_new ();
...
rc = amq_client_session_basic_publish (
    session, content, 0, "queue", "mydest", FALSE, FALSE);
amq_content_basic_unlink (&content);
if (rc) {
    icl_console_print ("E: could not publish message");
    return (1);
}

Tuning WireAPI

The WireAPI client library cannot be tuned via command-line options. It uses an XML configuration file called wireapi.cfg. This file, if present, can set any of the following options:

wireapi.cfg:
------------------------------------------------------------------------
<?xml?>
<config>
    <tuning
        tcp_nodelay = "1"
        tcp_rcvbuf = "0"
        tcp_sndbuf = "0"
        low_water = "0"
        high_water = "0"
        direct = "0"
    />
    <direct
        batching = "32768"
        on_overflow = ""
    />
</config>
/config/tuning/tcp_nodelay
If this value is 1, socket data is written immediately, which is usually good for latency. If this value is 0, data is buffered until there is a full packet, which is usually good for throughput. Default value is 1.
/config/tuning/tcp_rcvbuf
If this value is greater than zero, the connection to the server will use the specified value. Note: setting this value is delicate, do not use this option unless you know what you are doing. Default value is 0.
/config/tuning/tcp_sndbuf
If this value is greater than zero, the connection to the server will use the specified value. Note: setting this value is delicate, do not use this option unless you know what you are doing. Default value is 0.
/config/tuning/low_water
Number of messages in arrived queue when message flow from server is started again after it had been switched off on high water mark. Default value is 0.
/config/tuning/high_water
Number of messages in arrived queue when message flow from server is stopped. If this property is 0, message flow is never switched off. Default value is 0.
/config/tuning/direct
If set to 1, all connections will default to using Direct Mode. This is faster for publish/subscribe and request/response scenarios. See the OpenAMQ user guide for details on Direct Mode.
/config/direct/batching
Defines the maximum batch size for Direct Mode opportunistic batching on message sends. Setting this higher will improve throughput, and usually with lower latency, but will cause higher memory consumption. Setting this to zero will switch off batching. Any value less than 2048 is treated as zero. Default value is 32768. Maximum value is 2097152.
/config/direct/on_overflow
In Direct Mode, specifies how the WireAPI stack should handle an overflow condition, as defined by the arrived message queue reaching the high-water mark. The allowed actions are: 'warn' - issue a message to the console, 'trim' - discard old messages to make space for new ones, 'drop' - drop new incoming messages, and 'kill' - assert an error and kill the application. Default value is trim.
  • /config/sequence/set : sets Sender-Id and Timestamp on published messages. If set, all published messages are stamped with Sender-Id equal to the connection Id, and Timestamp equal to the current date and time in the Unix 64-bit time format (microseconds since epoch).
  • /config/sequence/check : verifies Sender-Id and Timestamp on received messages. If set, all received messages are checked with respect to the sending connection in Sender-Id and Timestamp. Out-of-sequence messages are reported to the console output.

Direct Mode is supported on OpenAMQ versions 1.3 and later.

The WireAPI library also accepts these environment variables:

  • WIREAPI_SILENT=1 - if set to 1, causes errors and warnings to be not set to standard output. Default value is 0. Note that even if set, some serious errors will still be printed. The connection->silent property inherits from this environment variable.
  • WIREAPI_FAKESLOW=1 - if set to 1, causes the WireAPI to throttle input to 1,000 messages per second, to simulate a slow network and cause backlogs on the server. This should not be used in production, it can crash a poorly-configured server.

The connection class

What is a 'connection'?

A connection is a TCP/IP connection from a client to an OpenAMQ server. AMQP is a multi-channel protocol, meaning that one network connection can carry an arbitrary number of parallel, independent virtual connections, which AMQP calls "channels". In WireAPI these are called "sessions" for compatability with other middleware APIs. NOTE: OpenAMQ currently supports exactly ONE session per connection.

Before calling any iCL method including amq_client_connection_new, you must have called icl_system_initialise (), or your application will fail with an abort.

amq_client_connection_new

Creates a new connection to the server:

amq_client_connection_t
    *connection = NULL;             //  Current connection
icl_longstr_t
    *auth_data;                     //  Authentication data

auth_data  = amq_client_connection_auth_plain ("guest", "guest");
connection = amq_client_connection_new (
    "localhost", "/", auth_data, "test", 0, 30000);

icl_longstr_destroy (&auth_data);
if (connection)
    icl_console_print ("I: connected to %s/%s - %s - %s",
        connection->server_product,
        connection->server_version,
        connection->server_platform,
        connection->server_information);
else {
    icl_console_print ("E: could not connect to server");
    return (1);
}

Notes:

  • The host_name argument specifies a server name or IP address, optionally ending in ':' plus a port number.
  • The virtual_host name specifies the virtual host to which the connection will connect. The default virtual host is "/".
  • The auth_data provides an authentication block, used to login to the server. To create a plain-text authentication block, use the auth_plain method. The new method destroys the auth_data block on behalf of the caller.
  • The instance argument sets the client instance name, which can be used to identify a specific client in the management console or server log.
  • The trace argument sets the trace level for WireAPI.
  • The timeout argument governs all synchronous exchanges with the server - if the server does not respond within this time, the connection treats it as a fatal error. A timeout of zero means "infinite". A good value for fast networks is five to ten seconds; for a slower network, a value of 30 seconds or more is reasonable.

amq_client_connection_destroy

Closes an open connection, doing a clean shut-down. Applications should use this to end a connection (rather than just exiting):

amq_client_connection_destroy (&connection);

amq_client_connection_auth_plain

Returns an authentication block for a plain login:

icl_longstr_t
    *auth_data;                     //  Authentication data

auth_data = amq_client_connection_auth_plain ("guest", "guest");

Connection Properties

These are the properties of a connection object:

  • direct (Boolean) - set this TRUE to use Direct Mode
  • alive (Boolean) - FALSE when connection has had an error
  • silent (Boolean) - set this TRUE to suppress error reporting
  • error_text (string) - error string reported by the API
  • reply_text (string) - error string reported by server
  • reply_code (integer) - error value reported by server
  • version_major (integer) - server protocol version major
  • version_major (integer) - server protocol version minor
  • server_product (string) - product name reported by server
  • server_version (string) - product version reported by server
  • server_platform (string) - operatintg system platform reported by server
  • server_copyright (string) - copyright notice reported by server
  • server_information (string) - other information reported by server
  • server_identifier (string) - unique server instance identifier
  • id (string) - holds unique connection ID string assigned by server

Direct Mode

To switch on Direct Mode for specific connections, set the connection "direct" property after creating the connection:

if (direct_mode)
    connection->direct = TRUE;

This overrides any setting in the WireAPI configuration.

The session class

What is a 'session'?

A session corresponds to an AMQP channel, and is a virtual connection to an AMQP server. You must at least create one session in order to talk with an AMQP server. While AMQP offers multiplexing in theory, OpenAMQ no longer implements this, mainly because there are no proven performance advantages. So you should create a single session per connection, no more. Future versions of WireAPI may merge the session and connection classes.

amq_client_session_new

Creates a new session:

amq_client_session_t
    *session = NULL;                //  Current session

session = amq_client_session_new (connection);
if (!session) {
    icl_console_print ("E: could not open session to server");
    return (1);
}

amq_client_session_destroy

Closes an open session, doing a clean shut-down. Applications should call this method when closing a session. Closing the connection is a valid way of closing all open sessions for that connection:

amq_client_session_destroy (&session);

amq_client_session_wait

Waits for content to arrive from the server. You must call this method in order to get content. Returns zero when it ends normally, either because content arrived or because the timeout expired, and -1 if there was an error. The timeout is in milliseconds. To wait forever, use a timeout of zero, and to poll (never wait), use a timeout of -1:

//  Wait forever until a message arrives
amq_client_session_wait (session, 0);

//  Wait for up to five seconds
amq_client_session_wait (session, 5000);

//  Wait for one second, exit if there was an error
if (amq_client_session_wait (session, timeout)) {
    icl_console_print ("E: error, aborted");
    exit (1);
}
else {
    //  zero or more contents arrived
}

//  Poll for new messages but do not wait
amq_client_session_wait (session, -1);

After using the amq_client_session_wait() call, check both the amq_client_session_basic_arrived () and amq_client_session_basic_returned () lists.

Session properties

These are the properties of a session object:

  • alive (Boolean) - FALSE when connection has had an error
  • timestamp (apr_time_t) - time when session was opened
  • error_text (string) - error string reported by the API
  • ticket (integer) - access ticket granted by server
  • queue (string) - queue name assigned by server
  • exchange (string) - exchange name from last method
  • message_count (integer) - number of messages in queue
  • consumer_count (integer) - number of consumers
  • active (Boolean) - session is paused or active
  • reply_text (string) - error string reported by server
  • reply_code (integer) - error value reported by server
  • consumer_tag (integer) - server-assigned consumer tag
  • routing_key (string) - original message routing key
  • scope (string) - queue name scope
  • delivery_tag (integer) - server-assigned delivery tag
  • redelivered (Boolean) - message is being redelivered

Note that all of these except alive, timestamp, and error_text are the result of methods sent from the server to the client. For detailed descriptions of these properties, read the AMQP specifications. All incoming method arguments are stored as session properties. Thus the "message-count" argument of an incoming Basic.Browse-Ok method will be stored in the message_count property.

The exchange class

Exchanges match and distribute messages across queues. Exchanges can be configured in the server or created at runtime.

amq_client_session_exchange_declare

This method creates an exchange if it does not already exist, and if the exchange exists, verifies that it is of the correct and expected class:

asl_field_list_t
    *field_list = NULL;
icl_longstr_t
    *arguments_table = NULL;

//  Build arguments field as necessary
field_list = asl_field_list_new (NULL);
arguments_table = asl_field_list_flatten (field_list);

rc = amq_client_session_exchange_declare (
    session,                    //  Session reference
    0,                          //  Access ticket (unused)
    exchange_name,              //  Exchange name
    "direct",                   //  Exchange type
    0,                          //  If 1, don't actually create
    0,                          //  Durable (unused)
    0,                          //  If 1, auto-delete when unused
    0,                          //  Internal exchange (unused)
    arguments_table);           //  Arguments for declaration

asl_field_list_unlink (&field_list);
icl_longstr_destroy (&arguments_table);

The Exchange.Declare method has the following specific fields:

  • exchange (shortstr) - exchange name. The exchange name is a client-selected string that identifies the exchange for publish methods. Exchange names may consist of any mixture of digits, letters, and underscores. Exchange names are scoped by the virtual host.
  • type (shortstr) - exchange type. Each exchange belongs to one of a set of exchange types implemented by the server. The exchange types define the functionality of the exchange - i.e. how messages are routed through it. It is not valid or meaningful to attempt to change the type of an existing exchange. OpenAMQ supports "fanout", "direct", "topic", and "header" exchanges.
  • passive (bit) - do not create exchange. If set, the server will not create the exchange. The client can use this to check whether an exchange exists without modifying the server state.
  • auto_delete (bit) - auto-delete when unused. If set, the exchange is deleted when all queues have finished using it.
  • arguments (table) - arguments for declaration. A set of arguments for the declaration. The syntax and semantics of these arguments depends on the server implementation. This field is ignored if passive is 1.

amq_client_session_exchange_delete

This method deletes an exchange. When an exchange is deleted all queue bindings on the exchange are cancelled.

rc = amq_client_session_exchange_delete (
    session,                    //  Session reference
    0,                          //  Access ticket (unused)
    exchange_name,              //  Exchange name
    0);                         //  If-unused option (unused)

The Exchange.Delete method has the following specific fields:

  • exchange (shortstr) - exchange name. The exchange name is a client-selected string that identifies the exchange for publish methods. Exchange names may consist of any mixture of digits, letters, and underscores.

The queue class

Queues store and forward messages. Queues can be configured in the server or created at runtime. Queues must be attached to at least one exchange in order to receive messages from publishers.

amq_client_session_queue_declare

This method creates or checks a queue. When creating a new queue the client can specify various properties that control the durability of the queue and its contents, and the level of sharing for the queue.

asl_field_list_t
    *field_list = NULL;
icl_longstr_t
    *arguments_table = NULL;

//  Build arguments field as necessary
field_list = asl_field_list_new (NULL);
arguments_table = asl_field_list_flatten (field_list);

rc = amq_client_session_queue_declare (
    session,                    //  Session reference
    0,                          //  Access ticket (unused)
    queue_name,                 //  Queue name, null means auto-assign
    0,                          //  If 1, don't actually create
    0,                          //  Durable (unused)
    0,                          //  If 1, request exclusive queue
    0,                          //  If 1, auto-delete when unused
    arguments_table);           //  Arguments for declaration
asl_field_list_unlink (&field_list);
icl_longstr_destroy (&arguments_table);

The Queue.Declare method has the following specific fields:

  • queue (shortstr) - queue name. Queue names may consist of any mixture of digits, letters, and underscores. May be specified, or may be empty (NULL). If the queue name is null, the server creates and names a queue and returns this. You can access the last created queue from session->queue; if you want to create many queues, copy the returned name somewhere safe.
  • passive (bit) - do not create queue. If set, the server will not create the queue. The client can use this to check whether a queue exists without modifying the server state.
  • exclusive (bit) - request an exclusive queue. Exclusive queues may only be consumed from by the current connection. Setting the 'exclusive' flag always implies 'auto-delete'.
  • auto_delete (bit) - auto-delete queue when unused. If set, the queue is deleted when all consumers have finished using it. Last consumer can be cancelled either explicitly or because its channel is closed. If there was no consumer ever on the queue, it won't be deleted.
  • arguments (table) - arguments for declaration. A set of arguments for the declaration. The syntax and semantics of these arguments depends on the server implementation. This field is ignored if passive is 1.

amq_client_session_queue_bind

This method binds a queue to an exchange. Until a queue is bound it will not receive any messages. In a classic messaging model, store-and-forward queues are bound to a dest exchange and subscription queues are bound to a dest_wild exchange.

rc = amq_client_session_queue_bind (
    s_session,                  //  Session reference
    0,                          //  Access ticket (unused)
    queue_name,                 //  Name of queue to bind
    exchange_name,              //  Name of exchange to bind to
    routing_key,                //  Message routing key
    arguments_table);           //  Arguments for bind

The Queue.Bind method has the following specific fields:

  • queue (shortstr) - queue name. Specifies the name of the queue to bind. If the queue name is empty (NULL), refers to the current queue for the channel, which is the last declared queue. Queue names may consist of any mixture of digits, letters, and underscores.
  • exchange (shortstr) - exchange name. The exchange name is a client-selected string that identifies the exchange for publish methods. Exchange names may consist of any mixture of digits, letters, and underscores. Exchange names are scoped by the virtual host.
  • routing_key (shortstr) - message routing key. Specifies the routing key for the binding. The routing key is used for routing messages depending on the exchange configuration. Not all exchanges use a routing key - refer to the specific exchange documentation. If the routing key is empty and the queue name is empty, the routing key will be the current queue for the channel, which is the last declared queue.
  • arguments (table) - arguments for binding. A set of arguments for the binding. The syntax and semantics of these arguments depends on the exchange class.

amq_client_session_queue_unbind

This method unbinds a queue from an exchange.

rc = amq_client_session_queue_unbind (
    s_session,                  //  Session reference
    0,                          //  Access ticket (unused)
    queue_name,                 //  Name of queue
    exchange_name,              //  Name of exchange
    routing_key,                //  Message routing key
    arguments_table);           //  Arguments for unbind

The Queue.Unbind method has the following specific fields:

  • queue (shortstr) - queue name. Specifies the name of the queue to unbind. If the queue name is empty, refers to the current queue for the channel, which is the last declared queue. Queue names may consist of any mixture of digits, letters, and underscores.
  • exchange (shortstr) - exchange name. The exchange name is a client-selected string that identifies the exchange for publish methods. Exchange names may consist of any mixture of digits, letters, and underscores. Exchange names are scoped by the virtual host.
  • routing_key (shortstr) - message routing key. Specifies the routing key of the binding to unbind.
  • arguments (table) - arguments for binding. A set of arguments of the binding to unbind.

amq_client_session_queue_purge

This method removes all messages from a queue. It does not cancel consumers. Purged messages are deleted without any formal "undo" mechanism.

rc = amq_client_session_queue_purge (
    s_session,                  //  Session reference
    0,                          //  Access ticket (unused)
    queue_name);                //  Name of queue to purge

The Queue.Purge method has the following specific fields:

  • queue (shortstr) - queue name. Specifies the name of the queue to purge. If the queue name is empty, refers to the current queue for the channel, which is the last declared queue. Queue names may consist of any mixture of digits, letters, and underscores.

amq_client_session_queue_delete

This method deletes a queue. When a queue is deleted any pending messages are sent to a dead-letter queue if this is defined in the server configuration, and all consumers on the queue are cancelled.

rc = amq_client_session_queue_delete (
    s_session,                  //  Session reference
    0,                          //  Access ticket (unused)
    queue_name,                 //  Name of queue to delete
    0,                          //  If-unused (unused)
    0);                         //  If-empty (unused)

The Queue.Delete method has the following specific fields:

  • queue (shortstr) - queue name. Specifies the name of the queue to delete. If the queue name is empty, refers to the current queue for the channel, which is the last declared queue. Queue names may consist of any mixture of digits, letters, and underscores.

The basic content class

What is a 'content'?

AMQP uses the term "content" to mean an application message (the term "message" means different things at the application, protocol, and internal technical levels, so is confusing).

WireAPI provides an iCL class called 'amq_content_basic' to lets you create and manipulate basic contents:

amq_content_basic_t
    *content;
content = amq_content_basic_new ();
amq_content_basic_set_body       (content, "0123456789", 10, NULL);
amq_content_basic_set_message_id (content, "ID001");
amq_content_basic_unlink (&content);

To create a new content, use the 'new' method. To destroy a content, use the 'unlink' method.

amq_client_session_basic_consume

The amq_client_session_basic_consume method tells a queue to start sending messages to the application. The standard pattern is to declare a queue, then bind it to one or more exchanges, and then consume from it. There are two typical use cases: private queues for response messages, and shared queue for workload balancing. The application can request exclusive access (be the only consumer), which is recommended for private response queues. The application can also request 'no_local' delivery which is recommended for certain kinds of fanout pub-sub:

amq_client_session_basic_consume (
    session,                    //  Session reference
    0,                          //  Access ticket (unused)
    queue_name,                 //  Queue name
    consumer_tag,               //  Consumer tag, or empty
    no_local,                   //  If TRUE, won't receive own messages
    0,                          //  No-ack (unused)
    exclusive,                  //  Request exclusive access to queue
    arguments);                 //  Arguments for consume

amq_client_session_basic_cancel

The amq_client_session_basic_cancel method stops a consumer. The application must use the consumer tag that was returned by the server after the consume method (which will be in session->consumer_tag immediately after the consume succeeds, and until a further consume is executed):

rc = amq_client_session_basic_cancel (
    session,                    //  Session reference
    consumer_tag);              //  Consumer tag

amq_client_session_basic_ack

The amq_client_session_basic_ack method acknowledges a message on consumers with acknowledgements. The application must use the delivery tag provided by the server, which is in the content->delivery_tag property. You can use delivery_tag zero with

rc = amq_client_session_basic_ack (
    session,                    //  Session reference
    delivery_tag,               //  Delivery tag
    multiple);                  //  Acknowledge multiple messages

amq_client_session_basic_publish

The amq_client_session_basic_publish method sends a content to a specified exchange:

rc = amq_client_session_basic_publish (
    session,                    //  Session reference
    basic_content,              //  Content reference
    0,                          //  Access ticket (unused)
    exchange_name,              //  Exchange name
    "",                         //  Routing key
    0,                          //  If 1, must be routable
    0);                         //  If 1, must be deliverable

amq_client_session_basic_get

The amq_client_session_basic_get method gets a single message off the queue. This method does not require a consumer, so is used for simpler scenarios. Since getting messages one by one is heavily synchronized, it is slower than using a consumer:

rc = amq_client_session_basic_get (
    session,                    //  Session reference
    0,                          //  Access ticket (unused)
    queue_name,                 //  Queue name
    0);                         //  No-ack (unused)

Receiving contents

For the Basic content class, WireAPI provides a set of methods to access arrived and returned contents:

amq_client_session_basic_arrived ()
amq_client_session_get_basic_arrived_count ()
amq_client_session_basic_returned ()
amq_client_session_get_basic_returned_count ()

The first method returns the oldest content waiting to be processed, the second methods returns the number of contents waiting. For example:

amq_content_basic_t
    *content;

//  Wait up to 1 second
amq_client_session_wait (session, 1000);
if (amq_client_session_get_basic_arrived_count (session)) {
    icl_console_print ("I: have messages to process...");
    content = amq_client_session_basic_arrived (session);
    while (content) {
        //  Process content
        ...
        //  Unlink/destroy content, or we get memory leaks
        amq_content_basic_unlink (&content);
        //  Get next content to process, if any
        content = amq_client_session_basic_arrived (session);
    }
}

When processing arrived or returned content the application must not assume that a single content arrived. It should assume that zero or more contents arrived or returned, and process each of them, and wait again if it needs to.

Basic content properties

All contents have these properties, which you can inspect directly using content->propertyname (e.g. 'content->routing_key'):

  • body_size (integer) - the body size of the content.
  • exchange (string) - the exchange to which the content was published.
  • routing_key (string) - the original routing_key specified by the publisher.
  • producer_id (string) - the producer id of the publishing connection.
  • content_type (string) - MIME content type.
  • content_encoding (string) - MIME content encoding.
  • headers (field table) - message header field table.
  • delivery_mode (integer) - non-persistent or persistent.
  • priority (integer) - message priority, 0 to 9.
  • correlation_id (string) - application correlation identifier
  • reply_to (string) - the destination to reply to.
  • expiration(string) - expiration specification.
  • message_id (string) - the application message identifier.
  • timestamp (integer) - message timestamp.
  • type (string) - message type name.
  • user_id (string) - creating user id.
  • app_id (string) - creating application id.
  • sender_id (string) - the sending application id.

To set any of a basic content's properties, DO NOT modify the property directly but use the method:

amq_content_basic_set_[propertyname] (content, newvalue)

Content body data

To set a content's body, use this method:

amq_content_basic_set_body (content, byte *data, size_t size, free_fn)

Where the free_fn is a function of type 'icl_mem_free_fn *' (compatible with the standard library free() function). If free_fn is not null, it is called when the data needs to be destroyed (when the content is destroyed, or if you call _set_body() again.

To get a content's body, use this method:

amq_content_basic_get_body (content, byte *buffer, size_t limit)

Where the buffer is at least as large as content->body_size. This method returns the number of bytes copied, or -1 if the buffer was too small.

Advanced content manipulation

To work with large contents - which do not fit into memory - you must use a more complex API to read and write contents. For details of this please read the amq_content_basic_class.icl and look at the test case, which demonstrates how to read and write content bodies in frames rather than as single buffers.

Field tables

Various AMQP methods take field table arguments. The asl_field and asl_field_list classes provide field tables that are designed to work with AMQP:

  • asl_field - defines a single named field holding data of various types.
  • asl_field_list - defines a field table (implemented as a list).

Simple field tables

The simplest way to build a field table is to construct this using the asl_field_list_build() method:

icl_longstr_t
    field_table;

field_table = asl_field_list_build (
    "host",  "Sleeper Service",
    "guest", "My Homework Ate My Dog",
    NULL);
...
icl_longstr_destroy (&field_table);

The build() method has a limitation - it only handles string fields. For most AMQP applications this is fine but we can get the same result using calls to individual methods:

asl_field_list_t
    *field_list;
icl_longstr_t
    *field_table;

field_list = asl_field_list_new (NULL);
asl_field_new_string  (field_list, "host", "Sleeper Service");
asl_field_new_string  (field_list, "guest", "My Homework Ate My Dog");
field_table = asl_field_list_flatten (field_list);
asl_field_list_destroy (&field_list);
...
icl_longstr_destroy (&field_table);

Field tables in more detail

Field tables are held in two ways:

  1. As a list of fields. You can navigate this list using standard iCL list navigation commands (first, next, pop, etc.)
  2. As a serialised block of data, held in an icl_longstr_t. Field tables held in this format are portable, and can be sent to other machines. This is the format we send across a network via AMQP.

To convert from a field table string to a list, create a new list and pass the string as an argument. To convert from a field list to a table string, use the flatten() method as shown above.

The main asl_field_list methods are:

  • asl_field_list_t *asl_field_list_new (icl_longstr_t *string) - create a new field table given a serialised string, or NULL to create an empty field table.
  • asl_field_t *asl_field_list_search (list, fieldname) - look for a field with a given name. Note that you must use the unlink() method on the returned field reference when you are finished using it.
  • asl_field_list_print (list) - print the field table contents for debugging purposes.
  • icl_longstr_t *asl_field_list_build (…) - build a field table from a list of name/value pairs, ending in a null name.

The main asl_field methods are:

  • asl_field_new_string (list, fieldname, stringvalue) - create a new string field with the given name and value.
  • asl_field_new_integer (list, fieldname, integervalue) - create a new integer field with the given name and value.
  • asl_field_new_decimal (list, fieldname, integervalue, decimals) - create a new decimal field with the given name and value.
  • asl_field_new_time (list, fieldname, timevalue) - create a new time field with the given name and value.
  • asl_field_list_destroy (&list) - destroy a field table.
  • asl_field_string (field) - return a string value for a field, doing any necessary conversion.
  • asl_field_integer (field) - return an integer value for a field, doing any necessary conversion.

For more details on these methods, refer to the class documentation and/or documented class source code.

You can also access a field's properties directly:

  • name (string) - the field name.
  • type (character) - 'S', 'I', 'D', or 'T' for string, integer, date, or time field.
  • decimals (integer) - number of decimals for a decimal field.
  • string (icl_longstr_t *) - string value for the field.
  • integer (int64_t) - integer value for the field.

Error handling

WireAPI returns two types of error:

  • Errors reported internally, e.g. timeout.
  • Errors reported by the remote server, e.g. invalid queue name.

In the first case, the error message is provided in session->error_text. There is no localisation - error messages are English only (for now). When there is an internal error, the session->reply_code will be zero.

In the second case, the error message is provided in session->reply_text and an error code is provided in session->reply_code. If the error was at the connection level, the error is placed in connection->reply_text and connection->reply_code instead.

The application can print the right error message using code like this:

if (s_session->reply_code)
    icl_console_print ("E: %d - %s",
        s_session->reply_code, s_session->reply_text);
else
    icl_console_print ("E: %s", s_session->error_text);

Miscellaneous topics

Nowait methods

AMQP/0.9 has a "nowait" option on some methods that turn normally synchronous methods into asynchronous ones. This is important for applications that do very large volumes of wiring (queue creation, etc.) since it can dramatically cut the start-up time.

WireAPI implements nowait as a second set of methods:

  • amq_client_session_exchange_declare_nowait
  • amq_client_session_exchange_delete_nowait
  • amq_client_session_queue_declare_nowait
  • amq_client_session_queue_bind_nowait
  • amq_client_session_queue_purge_nowait
  • amq_client_session_queue_delete_nowait
  • amq_client_session_queue_unbind_nowait
  • amq_client_session_basic_consume_nowait
  • amq_client_session_basic_cancel_nowait

Each of these methods takes the same arguments as their synchronous versions.

To use the nowait functionality on automatically-named queues (private exclusive queues), you need to use the default queue functionality, i.e:

  • queue.declare with no name (creates a name automatically)
  • queue.bind with no name (uses that queue name)
  • queue.consume with no name (uses that queue name)

Object ownership

WireAPI uses the standard iCL model to define object ownership:

  • The layer which calls the "new" method on an object also destroys it.
  • If an intermediate layer wants to co-own the object, it does this using possession. An object must explicitly allow this. The content objects are designed for this.
  • If an intermediate layer wants to hold a reference to an object, it does this using linking. An object must explicitly allow this.

Synchronous vs. asynchronous methods

AMQP (like all ASL protocols) divides methods strictly into those that expect an immediate reply (synchronous) and those that do not (asynchronous). WireAPI handles these cases as follows:

  • When you use a synchronous method (e.g. Basic.Consume), WireAPI waits for the server to respond with a synchronous reply, and it processes this reply. Any asynchronous methods that the server sends before the reply are also processed (e.g. incoming content will be correctly queued.)
  • When you use an asynchronous method (e.g. Basic.Publish), WireAPI does not wait for any server response. You can therefore send such events rapidly and expect WireAPI to return as fast as it can, and process them in the background.

Single-Threaded background processing

WireAPI works in both a multi-threaded model (in which one thread handles all dialogue with the server and a second handles the application) and single-threaded (in which a single thread does all the work).

You choose the model when you build OpenAMQ, which specifically has both single- and multi-threaded capability built into it.

The single-threaded model has one specific requirement: the application must periodically call the amq_client_session_wait() method, since it is during this call that asynchronous incoming methods are processed. Always call this method instead of "sleep" or an equivalent in your application.

The demo chat application

Building and testing

The following OpenAMQ application is a simple example of how to send and receive messages via WireAPI. This is a "chat" application. It consists of two programs that are listed at the end of this chapter:

  • im_sender.c publishes a line of text to a chat room
  • im_receiver.c joins a chat room and prints everything that is posted there

To compile and link the files set your environment in the same way as you do to build OpenAMQ server and type this command:

c -l im_sender.c im_receiver.c

To start the chat room demo, open two console windows and run these commands, one in each window, in the following order:

amq_server
im_receiver localhost:5672 "demo room"

Note that the amq_server by default starts on port 5672. Now to send a line of text to the chat room, run the im_sender program in a third window:

im_sender localhost:5672 "demo room" "This is a not a string"

There can be any number of clients connected to the same chatroom. There's no need to create chat rooms explicitly; these are created on demand.

Sender program

im_sender.c:
---------------------------------------------------------------------------
//
//  im_sender.c - Sends messages to the chatroom
//  By iMatix Corporation, April 2008.  Code released into the public domain.
//
//  Name:     im_sender
//  Usage:    im_sender <broker-addeess> <chatroom> <your-name>
//  Example:  im_sender 127.0.0.1:5672 "OpenAMQ discussion"
//                  "Baron Bartholomaeus von Saburg-Fridetzki"
//  Sends messages from stdio to the chat room
//  To receive messages from the chat room, use im_receiver application

#include "base.h"
#include "amq_client_connection.h"
#include "amq_client_session.h"

int main (int argc, char *argv [])
{
    amq_client_connection_t
        *connection;
    amq_client_session_t
        *session;
    icl_longstr_t
        *auth_data;
    amq_content_basic_t
        *content;
    char
        message_text [1024];
    char
        *message_body;
    assert (argc == 4);

    //  Initialise system
    icl_system_initialise (argc, argv);

    //  Open a connection
    auth_data = amq_client_connection_auth_plain ("guest", "guest");
    connection = amq_client_connection_new (
            argv [1], "/", auth_data, "im_sender", 0, 30000);
    assert (connection);
    icl_longstr_destroy (&auth_data);

    //  Open a channel
    session = amq_client_session_new (connection);
    assert (session);

    while (1) {
        //  Read one line from stdin
        fgets (message_text, sizeof (message_text), stdin);
        //  Exit the loop if Ctrl+C is encountered
        if (!connection->alive)
            break;
        //  Create the message body
        message_body =
            malloc (strlen (argv [3]) + 2 + strlen (message_text) + 1);
        assert (message_body);
        sprintf (message_body, "%s: %s", argv [3], message_text);
        //  Create the message itself
        content = amq_content_basic_new ();
        amq_content_basic_set_body (content, message_body,
            strlen (message_body), free);
        //  Send the message
        amq_client_session_basic_publish (
            session,                        //  session
            content,                        //  content to send
            0,                              //  ticket
            "amq.direct",                   //  exchange to send message to
            argv [2],                       //  routing-key
            FALSE,                          //  mandatory
            FALSE);                         //  immediate
        //  Release the message
        amq_content_basic_unlink (&content);
    }
    //  Close the channel
    amq_client_session_destroy (&session);
    //  Close the connection
    amq_client_connection_destroy (&connection);
    //  Uninitialise system
    icl_system_terminate ();
    return 0;
}

Receiver program

im_receiver.c:
---------------------------------------------------------------------------
//
//  im_receiver.c - Receives messages from a chatroom
//  By iMatix Corporation, April 2008.  Code released into the public domain.
//
//  Name:     im_receiver
//  Usage:    im_receiver <broker-addeess> <chatroom>
//  Example:  im_receiver 127.0.0.1:5672 "OpenAMQ discussion"
//  Receives messages from the chatroom and writed them to stdout
//  To send messages to the chat room, use im_sender application

#include "base.h"
#include "amq_client_connection.h"
#include "amq_client_session.h"

int main (int argc, char *argv [])
{
    amq_client_connection_t
        *connection;
    amq_client_session_t
        *session;
    icl_longstr_t
        *auth_data;
    amq_content_basic_t
        *content;
    char
        message_text [1024];
    size_t
        message_size;

    assert (argc == 3);

    //  Initialise system
    icl_system_initialise (argc, argv);

    //  Open a connection
    auth_data = amq_client_connection_auth_plain ("guest", "guest");
    connection = amq_client_connection_new (
            argv [1], "/", auth_data, "im_receiver", 0, 30000);
    assert (connection);
    icl_longstr_destroy (&auth_data);

    //  Open a channel
    session = amq_client_session_new (connection);
    assert (session);

    //  Create a private queue
    amq_client_session_queue_declare (
        session,                        //  session
        0,                              //  ticket
        NULL,                           //  queue name
        FALSE,                          //  passive
        FALSE,                          //  durable
        TRUE,                           //  exclusive
        TRUE,                           //  auto-delete
        NULL);                          //  arguments

    //  Bind the queue to the exchange
    amq_client_session_queue_bind (
        session,                        //  session
        0,                              //  ticket
        NULL,                           //  queue
        "amq.direct",                   //  exchange
        argv [2],                       //  routing-key
        NULL);                          //  arguments

    //  Consume from the queue
    amq_client_session_basic_consume (
        session,                        //  session
        0,                              //  ticket
        NULL,                           //  queue
        NULL,                           //  consumer-tag
        TRUE,                           //  no-local
        TRUE,                           //  no-ack
        TRUE,                           //  exclusive
        NULL);                          //  arguments

    while (1) {
        while (1) {
            //  Get next message
            content = amq_client_session_basic_arrived (session);
            if (!content)
                break;

            //  Get the message body and write it to stdout
            message_size = amq_content_basic_get_body (content,
                  (byte*) message_text, sizeof (message_text));
            if (message_size) {
                message_text [message_size] = 0;
                fputs (message_text, stdout);
            }
            //  Destroy the message
            amq_content_basic_unlink (&content);
        }
        //  Wait while next message arrives
        amq_client_session_wait (session, 0);

        //  Exit the loop if Ctrl+C is encountered
        if (!connection->alive)
            break;
    }
    //  Close the channel
    amq_client_session_destroy (&session);

    //  Close the connection
    amq_client_connection_destroy (&connection);

    //  Uninitialise system
    icl_system_terminate ();

    return 0;
}

Comments

Add a New Comment

Edit | Files | Tags | Source | Print

rating: +2+x

Author

iMatix Corporation

Installing and using OpenAMQ

Introduction to OpenAMQ: This document is an introduction to the concept of business messaging in general, and to OpenAMQ in particular. It is intended for new OpenAMQ users who wish to understand the problems that OpenAMQ solves, and how OpenAMQ can be useful in software applications.

Basic use of OpenAMQ: This document explains how to get OpenAMQ running on your system. It explains how to download the software, how to unpack and build it (if you are using a source package), and how to run basic tests on the resulting software.

Advanced use of OpenAMQ: This guide is for people who need to configure and manage OpenAMQ servers. We explain how to configure and tune an OpenAMQ server, covering these topics: logging, monitoring, high-availability failover, and joining OpenAMQ servers into wide-area federations.

Writing applications

Programming WireAPI: This is the main guide for developers who wish to use OpenAMQ in their applications. We describe WireAPI, the C/C++ API that OpenAMQ provides for accessing AMQP. Expert WireAPI users may wish to read the iMatix iCL guide, but this document is otherwise self-complete.

Programming PAL: This guide is for OpenAMQ developers who need a quick way to write test cases and simple scenarios. We explain the PAL language, an XML scripting tool that gives you a fast way to construct AMQP applications to test routing models, performance and stability tests, and other test cases.

Programming the Console: This document explains how to write applications that automate management of OpenAMQ servers via console automation. The OpenAMQ console automation architecture offers developers different ways of accessing the functionality of the console API and integrating it with their own preferred tools and management facilities.

Technical library

Developer's Guide to ASL: This is a technical guide for protocol developers who wish to use the iMatix ASL framework for the development of connected client-server protocols. ASL is a generic framework that uses a protocol modeling language to construct the whole infrastructure for a given protocol. ASL was built primarily to support AMQP.

Developer's Guide to iCL: This is a technical guide for developers who wish to understand how the iMatix iCL framework works. iCL is a class-oriented modelling language for C applications and is one of the basic frameworks used in iMatix applications such as OpenAMQ.

Developer's Guide to MOP: This is a technical guide for developers who wish to understand how the iMatix code generation frameworks are constructed. We explain the principles of model oriented programming, and the basics of code generation using the iMatix GSL language. This provides essential basic knowledge for anyone intending to modify the OpenAMQ software.

Developer's Guide to SMT: This is a technical guide for developers who wish to understand how the iMatix SMT framework works. To use this guide the reader should be familiar with the iMatix iCL framework, and the iMatix Model Oriented Programming principles.

RFCs

The CML Request for Comments: We describe a generic technique for managing AMQP servers from a remote client application. This technique consists of a standard transport mechanism built over AMQP, and a standard XML language used to exchange information between a management component built-in to the server, and a management application. This is a request for comments, it is not a standard.