Writing OpenAMQ Applications in C/C++
Overview
What is WireAPI?
WireAPI is a standardised API that closely maps to the semantics of the AMQP wire level protocol. For example, the AMQP method Queue.Declare maps to a WireAPI function called amq_client_session_queue_declare. We can contrast this approach with the one taken by the other kinds of API that hide the semantics of working with AMQP are hidden to the developer. WireAPI has several advantages over more abstracted APIs:
- Any version of AMQP is cleanly mapped to identical semantics in WireAPI.
- Developers do not need to learn new semantics - once they know the protocol they know the API.
- Developers have full control over all aspects of the middleware, not being restrained by any unnecessary API abstraction.
- WireAPI semantics are portable to all languages (note that iMatix does not yet offer client libraries in languages other than C/C++).
The negative aspects of WireAPI are:
- It can be complex for beginners.
- It forces the developers to understand the protocol.
- There are some useful abstractions missing from WireAPI.
Overall WireAPI works well because AMQP was designed to used in this way; its semantics were designed to be meaningful to application developers. We plan to develop a WireAPI/2 that adds some useful abstractions such as end-to-end reliability, end-to-end encryption, message eventing (asynchronous deliver) and so on.
Goals and principles
The WireAPI is built using the same technology as the OpenAMQ server, and provides a component-based interface to the AMQ protocol from the client side. It is designed to:
- Provide full asynchronous background operations.
- Work both in single-threaded and multi-threaded applications.
- Provide full access to all AMQP methods except those used for connection and session start-up and shut-down.
- Provide full access to all AMQP method properties.
- Act as the basis for other language APIs based on the WireAPI model.
- Be 100% portable to all mainstream operating system platforms.
- Work with any AMQP server implementation.
Architecture of C/C++ WireAPI
The C/C++ WireAPI implementation provided by iMatix for OpenAMQ is a multithreaded application in two halves. One half runs as a background thread , processing protocol methods and doing network i/o, so that messages can be sent and received indepedently of application work. The second half runs in the application thread, and provides API-level functionality, including implementation of the WireAPI objects (connections, session, etc.)
This design has several consequences that you should take into account when designing and building your application and/or messaging frameworks:
- If you build OpenAMQ as a single-threaded application, background processing of messages will be unreliable. Concretely, the background thread will only process when the application calls the amq_client_session_wait () method.
- WireAPI is safe to use in multithreaded applications, but you must not share WireAPI classes (connection, session, etc.) between application threads.
Using WireAPI
Compiling and linking
WireAPI is internally constructed using a number of iMatix tools such as iCL. However, the calling application does not need to use these tools in order to use WireAPI, except to correctly initialise and terminate iCL. It needs only the header files and the libraries. The simplest way to get these is to build the full OpenAMQ kernel, and then use the $IBASE/include and $IBASE/lib directories when compiling and linking the application.
Basic construction
WireAPI sits between the application:
.---------------. .-------------. .--------------.
| | ---> | | ---> | |
| Application | | WireAPI | | AMQ Server |
| | <--- | | <--- | |
`===============' `=============' `=============='
Here is a trivial example that sends one message to an AMQ server. This program does no error handling so you should not copy this code directly:
#include "asl.h"
#include "amq_client_connection.h"
#include "amq_client_session.h"
int
main (int argc, char *argv [])
{
amq_client_connection_t
*connection = NULL; // Current connection
amq_client_session_t
*session = NULL; // Current session
amq_content_basic_t
*content = NULL; // Message content
icl_longstr_t
*auth_data;
// Initialise iCL system
icl_system_initialise (argc, argv);
// Open session to local server
auth_data = amq_client_connection_auth_plain ("guest", "guest");
connection = amq_client_connection_new (
"localhost", "/", auth_data, "test", 0, 30000);
icl_longstr_destroy (&auth_data);
session = amq_client_session_new (connection);
// Create a content and send it to the "queue" exchange
content = amq_content_basic_new ();
amq_content_basic_set_body (content, "0123456789", 10, NULL);
amq_content_basic_set_message_id (content, "ID001");
amq_client_session_basic_publish (
session, content, 0, "to-address", NULL, FALSE, FALSE);
amq_content_basic_unlink (&content);
// Shutdown connection and session
amq_client_session_destroy (&session);
amq_client_connection_destroy (&connection);
// Terminate iCL system
icl_system_terminate ();
return (0);
}
This shows some of the main object classes that compose WireAPI:
- amq_client_connection: lets you connect to a server.
- amq_client_session: lets you create a session and talk to the server using this session.
- amq_content_basic: lets you create Basic content to send to the server, or process Basic content received from the server.
The iCL class syntax
The WireAPI classes are all constructed using iCL and some knowledge of iCL will make your life easier. However, the syntax for using iCL classes in C is consistent and we can summarise it:
- classname_new creates a new object instance and returns a reference to that object.
- classname_destroy takes the address of an object reference and destroys that object, if the reference is not null.
- classname_somemethod takes an object reference along with arguments and does some work on the object.
- classname->propertyname accesses a class property.
iCL signals errors by returning a null object (after a new) or by returning a non-zero error code (after other methods). iCL return codes are integers.
Thus the correct way to open a connection and session is actually like this:
auth_data = amq_client_connection_auth_plain ("guest", "guest");
connection = amq_client_connection_new (
"localhost", "/", auth_data, "test", 0, 30000);
icl_longstr_destroy (&auth_data);
if (!connection) {
icl_console_print ("E: could not open connection");
return (1);
}
session = amq_client_session_new (connection);
if (!session) {
icl_console_print ("E: could not open session");
return (1);
}
And the correct way to call a method like publish is:
int rc;
...
content = amq_content_basic_new ();
...
rc = amq_client_session_basic_publish (
session, content, 0, "queue", "mydest", FALSE, FALSE);
amq_content_basic_unlink (&content);
if (rc) {
icl_console_print ("E: could not publish message");
return (1);
}
Tuning WireAPI
The WireAPI client library cannot be tuned via command-line options. It uses an XML configuration file called wireapi.cfg. This file, if present, can set any of the following options:
wireapi.cfg:
------------------------------------------------------------------------
<?xml?>
<config>
<tuning
tcp_nodelay = "1"
tcp_rcvbuf = "0"
tcp_sndbuf = "0"
low_water = "0"
high_water = "0"
direct = "0"
/>
<direct
batching = "32768"
on_overflow = ""
/>
</config>
- /config/tuning/tcp_nodelay
- If this value is 1, socket data is written immediately, which is usually good for latency. If this value is 0, data is buffered until there is a full packet, which is usually good for throughput. Default value is 1.
- /config/tuning/tcp_rcvbuf
- If this value is greater than zero, the connection to the server will use the specified value. Note: setting this value is delicate, do not use this option unless you know what you are doing. Default value is 0.
- /config/tuning/tcp_sndbuf
- If this value is greater than zero, the connection to the server will use the specified value. Note: setting this value is delicate, do not use this option unless you know what you are doing. Default value is 0.
- /config/tuning/low_water
- Number of messages in arrived queue when message flow from server is started again after it had been switched off on high water mark. Default value is 0.
- /config/tuning/high_water
- Number of messages in arrived queue when message flow from server is stopped. If this property is 0, message flow is never switched off. Default value is 0.
- /config/tuning/direct
- If set to 1, all connections will default to using Direct Mode. This is faster for publish/subscribe and request/response scenarios. See the OpenAMQ user guide for details on Direct Mode.
- /config/direct/batching
- Defines the maximum batch size for Direct Mode opportunistic batching on message sends. Setting this higher will improve throughput, and usually with lower latency, but will cause higher memory consumption. Setting this to zero will switch off batching. Any value less than 2048 is treated as zero. Default value is 32768. Maximum value is 2097152.
- /config/direct/on_overflow
- In Direct Mode, specifies how the WireAPI stack should handle an overflow condition, as defined by the arrived message queue reaching the high-water mark. The allowed actions are: 'warn' - issue a message to the console, 'trim' - discard old messages to make space for new ones, 'drop' - drop new incoming messages, and 'kill' - assert an error and kill the application. Default value is trim.
- /config/sequence/set : sets Sender-Id and Timestamp on published messages. If set, all published messages are stamped with Sender-Id equal to the connection Id, and Timestamp equal to the current date and time in the Unix 64-bit time format (microseconds since epoch).
- /config/sequence/check : verifies Sender-Id and Timestamp on received messages. If set, all received messages are checked with respect to the sending connection in Sender-Id and Timestamp. Out-of-sequence messages are reported to the console output.
Direct Mode is supported on OpenAMQ versions 1.3 and later.
The WireAPI library also accepts these environment variables:
- WIREAPI_SILENT=1 - if set to 1, causes errors and warnings to be not set to standard output. Default value is 0. Note that even if set, some serious errors will still be printed. The connection->silent property inherits from this environment variable.
- WIREAPI_FAKESLOW=1 - if set to 1, causes the WireAPI to throttle input to 1,000 messages per second, to simulate a slow network and cause backlogs on the server. This should not be used in production, it can crash a poorly-configured server.
The connection class
What is a 'connection'?
A connection is a TCP/IP connection from a client to an OpenAMQ server. AMQP is a multi-channel protocol, meaning that one network connection can carry an arbitrary number of parallel, independent virtual connections, which AMQP calls "channels". In WireAPI these are called "sessions" for compatability with other middleware APIs. NOTE: OpenAMQ currently supports exactly ONE session per connection.
Before calling any iCL method including amq_client_connection_new, you must have called icl_system_initialise (), or your application will fail with an abort.
amq_client_connection_new
Creates a new connection to the server:
amq_client_connection_t
*connection = NULL; // Current connection
icl_longstr_t
*auth_data; // Authentication data
auth_data = amq_client_connection_auth_plain ("guest", "guest");
connection = amq_client_connection_new (
"localhost", "/", auth_data, "test", 0, 30000);
icl_longstr_destroy (&auth_data);
if (connection)
icl_console_print ("I: connected to %s/%s - %s - %s",
connection->server_product,
connection->server_version,
connection->server_platform,
connection->server_information);
else {
icl_console_print ("E: could not connect to server");
return (1);
}
Notes:
- The host_name argument specifies a server name or IP address, optionally ending in ':' plus a port number.
- The virtual_host name specifies the virtual host to which the connection will connect. The default virtual host is "/".
- The auth_data provides an authentication block, used to login to the server. To create a plain-text authentication block, use the auth_plain method. The new method destroys the auth_data block on behalf of the caller.
- The instance argument sets the client instance name, which can be used to identify a specific client in the management console or server log.
- The trace argument sets the trace level for WireAPI.
- The timeout argument governs all synchronous exchanges with the server - if the server does not respond within this time, the connection treats it as a fatal error. A timeout of zero means "infinite". A good value for fast networks is five to ten seconds; for a slower network, a value of 30 seconds or more is reasonable.
amq_client_connection_destroy
Closes an open connection, doing a clean shut-down. Applications should use this to end a connection (rather than just exiting):
amq_client_connection_destroy (&connection);
amq_client_connection_auth_plain
Returns an authentication block for a plain login:
icl_longstr_t
*auth_data; // Authentication data
auth_data = amq_client_connection_auth_plain ("guest", "guest");
Connection Properties
These are the properties of a connection object:
- direct (Boolean) - set this TRUE to use Direct Mode
- alive (Boolean) - FALSE when connection has had an error
- silent (Boolean) - set this TRUE to suppress error reporting
- error_text (string) - error string reported by the API
- reply_text (string) - error string reported by server
- reply_code (integer) - error value reported by server
- version_major (integer) - server protocol version major
- version_major (integer) - server protocol version minor
- server_product (string) - product name reported by server
- server_version (string) - product version reported by server
- server_platform (string) - operatintg system platform reported by server
- server_copyright (string) - copyright notice reported by server
- server_information (string) - other information reported by server
- server_identifier (string) - unique server instance identifier
- id (string) - holds unique connection ID string assigned by server
Direct Mode
To switch on Direct Mode for specific connections, set the connection "direct" property after creating the connection:
if (direct_mode)
connection->direct = TRUE;
This overrides any setting in the WireAPI configuration.
The session class
What is a 'session'?
A session corresponds to an AMQP channel, and is a virtual connection to an AMQP server. You must at least create one session in order to talk with an AMQP server. While AMQP offers multiplexing in theory, OpenAMQ no longer implements this, mainly because there are no proven performance advantages. So you should create a single session per connection, no more. Future versions of WireAPI may merge the session and connection classes.
amq_client_session_new
Creates a new session:
amq_client_session_t
*session = NULL; // Current session
session = amq_client_session_new (connection);
if (!session) {
icl_console_print ("E: could not open session to server");
return (1);
}
amq_client_session_destroy
Closes an open session, doing a clean shut-down. Applications should call this method when closing a session. Closing the connection is a valid way of closing all open sessions for that connection:
amq_client_session_destroy (&session);
amq_client_session_wait
Waits for content to arrive from the server. You must call this method in order to get content. Returns zero when it ends normally, either because content arrived or because the timeout expired, and -1 if there was an error. The timeout is in milliseconds. To wait forever, use a timeout of zero, and to poll (never wait), use a timeout of -1:
// Wait forever until a message arrives
amq_client_session_wait (session, 0);
// Wait for up to five seconds
amq_client_session_wait (session, 5000);
// Wait for one second, exit if there was an error
if (amq_client_session_wait (session, timeout)) {
icl_console_print ("E: error, aborted");
exit (1);
}
else {
// zero or more contents arrived
}
// Poll for new messages but do not wait
amq_client_session_wait (session, -1);
After using the amq_client_session_wait() call, check both the amq_client_session_basic_arrived () and amq_client_session_basic_returned () lists.
Session properties
These are the properties of a session object:
- alive (Boolean) - FALSE when connection has had an error
- timestamp (apr_time_t) - time when session was opened
- error_text (string) - error string reported by the API
- ticket (integer) - access ticket granted by server
- queue (string) - queue name assigned by server
- exchange (string) - exchange name from last method
- message_count (integer) - number of messages in queue
- consumer_count (integer) - number of consumers
- active (Boolean) - session is paused or active
- reply_text (string) - error string reported by server
- reply_code (integer) - error value reported by server
- consumer_tag (integer) - server-assigned consumer tag
- routing_key (string) - original message routing key
- scope (string) - queue name scope
- delivery_tag (integer) - server-assigned delivery tag
- redelivered (Boolean) - message is being redelivered
Note that all of these except alive, timestamp, and error_text are the result of methods sent from the server to the client. For detailed descriptions of these properties, read the AMQP specifications. All incoming method arguments are stored as session properties. Thus the "message-count" argument of an incoming Basic.Browse-Ok method will be stored in the message_count property.
The exchange class
Exchanges match and distribute messages across queues. Exchanges can be configured in the server or created at runtime.
amq_client_session_exchange_declare
This method creates an exchange if it does not already exist, and if the exchange exists, verifies that it is of the correct and expected class:
asl_field_list_t
*field_list = NULL;
icl_longstr_t
*arguments_table = NULL;
// Build arguments field as necessary
field_list = asl_field_list_new (NULL);
arguments_table = asl_field_list_flatten (field_list);
rc = amq_client_session_exchange_declare (
session, // Session reference
0, // Access ticket (unused)
exchange_name, // Exchange name
"direct", // Exchange type
0, // If 1, don't actually create
0, // Durable (unused)
0, // If 1, auto-delete when unused
0, // Internal exchange (unused)
arguments_table); // Arguments for declaration
asl_field_list_unlink (&field_list);
icl_longstr_destroy (&arguments_table);
The Exchange.Declare method has the following specific fields:
- exchange (shortstr) - exchange name. The exchange name is a client-selected string that identifies the exchange for publish methods. Exchange names may consist of any mixture of digits, letters, and underscores. Exchange names are scoped by the virtual host.
- type (shortstr) - exchange type. Each exchange belongs to one of a set of exchange types implemented by the server. The exchange types define the functionality of the exchange - i.e. how messages are routed through it. It is not valid or meaningful to attempt to change the type of an existing exchange. OpenAMQ supports "fanout", "direct", "topic", and "header" exchanges.
- passive (bit) - do not create exchange. If set, the server will not create the exchange. The client can use this to check whether an exchange exists without modifying the server state.
- auto_delete (bit) - auto-delete when unused. If set, the exchange is deleted when all queues have finished using it.
- arguments (table) - arguments for declaration. A set of arguments for the declaration. The syntax and semantics of these arguments depends on the server implementation. This field is ignored if passive is 1.
amq_client_session_exchange_delete
This method deletes an exchange. When an exchange is deleted all queue bindings on the exchange are cancelled.
rc = amq_client_session_exchange_delete (
session, // Session reference
0, // Access ticket (unused)
exchange_name, // Exchange name
0); // If-unused option (unused)
The Exchange.Delete method has the following specific fields:
- exchange (shortstr) - exchange name. The exchange name is a client-selected string that identifies the exchange for publish methods. Exchange names may consist of any mixture of digits, letters, and underscores.
The queue class
Queues store and forward messages. Queues can be configured in the server or created at runtime. Queues must be attached to at least one exchange in order to receive messages from publishers.
amq_client_session_queue_declare
This method creates or checks a queue. When creating a new queue the client can specify various properties that control the durability of the queue and its contents, and the level of sharing for the queue.
asl_field_list_t
*field_list = NULL;
icl_longstr_t
*arguments_table = NULL;
// Build arguments field as necessary
field_list = asl_field_list_new (NULL);
arguments_table = asl_field_list_flatten (field_list);
rc = amq_client_session_queue_declare (
session, // Session reference
0, // Access ticket (unused)
queue_name, // Queue name, null means auto-assign
0, // If 1, don't actually create
0, // Durable (unused)
0, // If 1, request exclusive queue
0, // If 1, auto-delete when unused
arguments_table); // Arguments for declaration
asl_field_list_unlink (&field_list);
icl_longstr_destroy (&arguments_table);
The Queue.Declare method has the following specific fields:
- queue (shortstr) - queue name. Queue names may consist of any mixture of digits, letters, and underscores. May be specified, or may be empty (NULL). If the queue name is null, the server creates and names a queue and returns this. You can access the last created queue from session->queue; if you want to create many queues, copy the returned name somewhere safe.
- passive (bit) - do not create queue. If set, the server will not create the queue. The client can use this to check whether a queue exists without modifying the server state.
- exclusive (bit) - request an exclusive queue. Exclusive queues may only be consumed from by the current connection. Setting the 'exclusive' flag always implies 'auto-delete'.
- auto_delete (bit) - auto-delete queue when unused. If set, the queue is deleted when all consumers have finished using it. Last consumer can be cancelled either explicitly or because its channel is closed. If there was no consumer ever on the queue, it won't be deleted.
- arguments (table) - arguments for declaration. A set of arguments for the declaration. The syntax and semantics of these arguments depends on the server implementation. This field is ignored if passive is 1.
amq_client_session_queue_bind
This method binds a queue to an exchange. Until a queue is bound it will not receive any messages. In a classic messaging model, store-and-forward queues are bound to a dest exchange and subscription queues are bound to a dest_wild exchange.
rc = amq_client_session_queue_bind (
s_session, // Session reference
0, // Access ticket (unused)
queue_name, // Name of queue to bind
exchange_name, // Name of exchange to bind to
routing_key, // Message routing key
arguments_table); // Arguments for bind
The Queue.Bind method has the following specific fields:
- queue (shortstr) - queue name. Specifies the name of the queue to bind. If the queue name is empty (NULL), refers to the current queue for the channel, which is the last declared queue. Queue names may consist of any mixture of digits, letters, and underscores.
- exchange (shortstr) - exchange name. The exchange name is a client-selected string that identifies the exchange for publish methods. Exchange names may consist of any mixture of digits, letters, and underscores. Exchange names are scoped by the virtual host.
- routing_key (shortstr) - message routing key. Specifies the routing key for the binding. The routing key is used for routing messages depending on the exchange configuration. Not all exchanges use a routing key - refer to the specific exchange documentation. If the routing key is empty and the queue name is empty, the routing key will be the current queue for the channel, which is the last declared queue.
- arguments (table) - arguments for binding. A set of arguments for the binding. The syntax and semantics of these arguments depends on the exchange class.
amq_client_session_queue_unbind
This method unbinds a queue from an exchange.
rc = amq_client_session_queue_unbind (
s_session, // Session reference
0, // Access ticket (unused)
queue_name, // Name of queue
exchange_name, // Name of exchange
routing_key, // Message routing key
arguments_table); // Arguments for unbind
The Queue.Unbind method has the following specific fields:
- queue (shortstr) - queue name. Specifies the name of the queue to unbind. If the queue name is empty, refers to the current queue for the channel, which is the last declared queue. Queue names may consist of any mixture of digits, letters, and underscores.
- exchange (shortstr) - exchange name. The exchange name is a client-selected string that identifies the exchange for publish methods. Exchange names may consist of any mixture of digits, letters, and underscores. Exchange names are scoped by the virtual host.
- routing_key (shortstr) - message routing key. Specifies the routing key of the binding to unbind.
- arguments (table) - arguments for binding. A set of arguments of the binding to unbind.
amq_client_session_queue_purge
This method removes all messages from a queue. It does not cancel consumers. Purged messages are deleted without any formal "undo" mechanism.
rc = amq_client_session_queue_purge (
s_session, // Session reference
0, // Access ticket (unused)
queue_name); // Name of queue to purge
The Queue.Purge method has the following specific fields:
- queue (shortstr) - queue name. Specifies the name of the queue to purge. If the queue name is empty, refers to the current queue for the channel, which is the last declared queue. Queue names may consist of any mixture of digits, letters, and underscores.
amq_client_session_queue_delete
This method deletes a queue. When a queue is deleted any pending messages are sent to a dead-letter queue if this is defined in the server configuration, and all consumers on the queue are cancelled.
rc = amq_client_session_queue_delete (
s_session, // Session reference
0, // Access ticket (unused)
queue_name, // Name of queue to delete
0, // If-unused (unused)
0); // If-empty (unused)
The Queue.Delete method has the following specific fields:
- queue (shortstr) - queue name. Specifies the name of the queue to delete. If the queue name is empty, refers to the current queue for the channel, which is the last declared queue. Queue names may consist of any mixture of digits, letters, and underscores.
The basic content class
What is a 'content'?
AMQP uses the term "content" to mean an application message (the term "message" means different things at the application, protocol, and internal technical levels, so is confusing).
WireAPI provides an iCL class called 'amq_content_basic' to lets you create and manipulate basic contents:
amq_content_basic_t
*content;
content = amq_content_basic_new ();
amq_content_basic_set_body (content, "0123456789", 10, NULL);
amq_content_basic_set_message_id (content, "ID001");
amq_content_basic_unlink (&content);
To create a new content, use the 'new' method. To destroy a content, use the 'unlink' method.
amq_client_session_basic_consume
The amq_client_session_basic_consume method tells a queue to start sending messages to the application. The standard pattern is to declare a queue, then bind it to one or more exchanges, and then consume from it. There are two typical use cases: private queues for response messages, and shared queue for workload balancing. The application can request exclusive access (be the only consumer), which is recommended for private response queues. The application can also request 'no_local' delivery which is recommended for certain kinds of fanout pub-sub:
amq_client_session_basic_consume (
session, // Session reference
0, // Access ticket (unused)
queue_name, // Queue name
consumer_tag, // Consumer tag, or empty
no_local, // If TRUE, won't receive own messages
0, // No-ack (unused)
exclusive, // Request exclusive access to queue
arguments); // Arguments for consume
amq_client_session_basic_cancel
The amq_client_session_basic_cancel method stops a consumer. The application must use the consumer tag that was returned by the server after the consume method (which will be in session->consumer_tag immediately after the consume succeeds, and until a further consume is executed):
rc = amq_client_session_basic_cancel (
session, // Session reference
consumer_tag); // Consumer tag
amq_client_session_basic_ack
The amq_client_session_basic_ack method acknowledges a message on consumers with acknowledgements. The application must use the delivery tag provided by the server, which is in the content->delivery_tag property. You can use delivery_tag zero with
rc = amq_client_session_basic_ack (
session, // Session reference
delivery_tag, // Delivery tag
multiple); // Acknowledge multiple messages
amq_client_session_basic_publish
The amq_client_session_basic_publish method sends a content to a specified exchange:
rc = amq_client_session_basic_publish (
session, // Session reference
basic_content, // Content reference
0, // Access ticket (unused)
exchange_name, // Exchange name
"", // Routing key
0, // If 1, must be routable
0); // If 1, must be deliverable
amq_client_session_basic_get
The amq_client_session_basic_get method gets a single message off the queue. This method does not require a consumer, so is used for simpler scenarios. Since getting messages one by one is heavily synchronized, it is slower than using a consumer:
rc = amq_client_session_basic_get (
session, // Session reference
0, // Access ticket (unused)
queue_name, // Queue name
0); // No-ack (unused)
Receiving contents
For the Basic content class, WireAPI provides a set of methods to access arrived and returned contents:
amq_client_session_basic_arrived ()
amq_client_session_get_basic_arrived_count ()
amq_client_session_basic_returned ()
amq_client_session_get_basic_returned_count ()
The first method returns the oldest content waiting to be processed, the second methods returns the number of contents waiting. For example:
amq_content_basic_t
*content;
// Wait up to 1 second
amq_client_session_wait (session, 1000);
if (amq_client_session_get_basic_arrived_count (session)) {
icl_console_print ("I: have messages to process...");
content = amq_client_session_basic_arrived (session);
while (content) {
// Process content
...
// Unlink/destroy content, or we get memory leaks
amq_content_basic_unlink (&content);
// Get next content to process, if any
content = amq_client_session_basic_arrived (session);
}
}
When processing arrived or returned content the application must not assume that a single content arrived. It should assume that zero or more contents arrived or returned, and process each of them, and wait again if it needs to.
Basic content properties
All contents have these properties, which you can inspect directly using content->propertyname (e.g. 'content->routing_key'):
- body_size (integer) - the body size of the content.
- exchange (string) - the exchange to which the content was published.
- routing_key (string) - the original routing_key specified by the publisher.
- producer_id (string) - the producer id of the publishing connection.
- content_type (string) - MIME content type.
- content_encoding (string) - MIME content encoding.
- headers (field table) - message header field table.
- delivery_mode (integer) - non-persistent or persistent.
- priority (integer) - message priority, 0 to 9.
- correlation_id (string) - application correlation identifier
- reply_to (string) - the destination to reply to.
- expiration(string) - expiration specification.
- message_id (string) - the application message identifier.
- timestamp (integer) - message timestamp.
- type (string) - message type name.
- user_id (string) - creating user id.
- app_id (string) - creating application id.
- sender_id (string) - the sending application id.
To set any of a basic content's properties, DO NOT modify the property directly but use the method:
amq_content_basic_set_[propertyname] (content, newvalue)
Content body data
To set a content's body, use this method:
amq_content_basic_set_body (content, byte *data, size_t size, free_fn)
Where the free_fn is a function of type 'icl_mem_free_fn *' (compatible with the standard library free() function). If free_fn is not null, it is called when the data needs to be destroyed (when the content is destroyed, or if you call _set_body() again.
To get a content's body, use this method:
amq_content_basic_get_body (content, byte *buffer, size_t limit)
Where the buffer is at least as large as content->body_size. This method returns the number of bytes copied, or -1 if the buffer was too small.
Advanced content manipulation
To work with large contents - which do not fit into memory - you must use a more complex API to read and write contents. For details of this please read the amq_content_basic_class.icl and look at the test case, which demonstrates how to read and write content bodies in frames rather than as single buffers.
Field tables
Various AMQP methods take field table arguments. The asl_field and asl_field_list classes provide field tables that are designed to work with AMQP:
- asl_field - defines a single named field holding data of various types.
- asl_field_list - defines a field table (implemented as a list).
Simple field tables
The simplest way to build a field table is to construct this using the asl_field_list_build() method:
icl_longstr_t
field_table;
field_table = asl_field_list_build (
"host", "Sleeper Service",
"guest", "My Homework Ate My Dog",
NULL);
...
icl_longstr_destroy (&field_table);
The build() method has a limitation - it only handles string fields. For most AMQP applications this is fine but we can get the same result using calls to individual methods:
asl_field_list_t
*field_list;
icl_longstr_t
*field_table;
field_list = asl_field_list_new (NULL);
asl_field_new_string (field_list, "host", "Sleeper Service");
asl_field_new_string (field_list, "guest", "My Homework Ate My Dog");
field_table = asl_field_list_flatten (field_list);
asl_field_list_destroy (&field_list);
...
icl_longstr_destroy (&field_table);
Field tables in more detail
Field tables are held in two ways:
- As a list of fields. You can navigate this list using standard iCL list navigation commands (first, next, pop, etc.)
- As a serialised block of data, held in an icl_longstr_t. Field tables held in this format are portable, and can be sent to other machines. This is the format we send across a network via AMQP.
To convert from a field table string to a list, create a new list and pass the string as an argument. To convert from a field list to a table string, use the flatten() method as shown above.
The main asl_field_list methods are:
- asl_field_list_t *asl_field_list_new (icl_longstr_t *string) - create a new field table given a serialised string, or NULL to create an empty field table.
- asl_field_t *asl_field_list_search (list, fieldname) - look for a field with a given name. Note that you must use the unlink() method on the returned field reference when you are finished using it.
- asl_field_list_print (list) - print the field table contents for debugging purposes.
- icl_longstr_t *asl_field_list_build (…) - build a field table from a list of name/value pairs, ending in a null name.
The main asl_field methods are:
- asl_field_new_string (list, fieldname, stringvalue) - create a new string field with the given name and value.
- asl_field_new_integer (list, fieldname, integervalue) - create a new integer field with the given name and value.
- asl_field_new_decimal (list, fieldname, integervalue, decimals) - create a new decimal field with the given name and value.
- asl_field_new_time (list, fieldname, timevalue) - create a new time field with the given name and value.
- asl_field_list_destroy (&list) - destroy a field table.
- asl_field_string (field) - return a string value for a field, doing any necessary conversion.
- asl_field_integer (field) - return an integer value for a field, doing any necessary conversion.
For more details on these methods, refer to the class documentation and/or documented class source code.
You can also access a field's properties directly:
- name (string) - the field name.
- type (character) - 'S', 'I', 'D', or 'T' for string, integer, date, or time field.
- decimals (integer) - number of decimals for a decimal field.
- string (icl_longstr_t *) - string value for the field.
- integer (int64_t) - integer value for the field.
Error handling
WireAPI returns two types of error:
- Errors reported internally, e.g. timeout.
- Errors reported by the remote server, e.g. invalid queue name.
In the first case, the error message is provided in session->error_text. There is no localisation - error messages are English only (for now). When there is an internal error, the session->reply_code will be zero.
In the second case, the error message is provided in session->reply_text and an error code is provided in session->reply_code. If the error was at the connection level, the error is placed in connection->reply_text and connection->reply_code instead.
The application can print the right error message using code like this:
if (s_session->reply_code)
icl_console_print ("E: %d - %s",
s_session->reply_code, s_session->reply_text);
else
icl_console_print ("E: %s", s_session->error_text);
Miscellaneous topics
Nowait methods
AMQP/0.9 has a "nowait" option on some methods that turn normally synchronous methods into asynchronous ones. This is important for applications that do very large volumes of wiring (queue creation, etc.) since it can dramatically cut the start-up time.
WireAPI implements nowait as a second set of methods:
- amq_client_session_exchange_declare_nowait
- amq_client_session_exchange_delete_nowait
- amq_client_session_queue_declare_nowait
- amq_client_session_queue_bind_nowait
- amq_client_session_queue_purge_nowait
- amq_client_session_queue_delete_nowait
- amq_client_session_queue_unbind_nowait
- amq_client_session_basic_consume_nowait
- amq_client_session_basic_cancel_nowait
Each of these methods takes the same arguments as their synchronous versions.
To use the nowait functionality on automatically-named queues (private exclusive queues), you need to use the default queue functionality, i.e:
- queue.declare with no name (creates a name automatically)
- queue.bind with no name (uses that queue name)
- queue.consume with no name (uses that queue name)
Object ownership
WireAPI uses the standard iCL model to define object ownership:
- The layer which calls the "new" method on an object also destroys it.
- If an intermediate layer wants to co-own the object, it does this using possession. An object must explicitly allow this. The content objects are designed for this.
- If an intermediate layer wants to hold a reference to an object, it does this using linking. An object must explicitly allow this.
Synchronous vs. asynchronous methods
AMQP (like all ASL protocols) divides methods strictly into those that expect an immediate reply (synchronous) and those that do not (asynchronous). WireAPI handles these cases as follows:
- When you use a synchronous method (e.g. Basic.Consume), WireAPI waits for the server to respond with a synchronous reply, and it processes this reply. Any asynchronous methods that the server sends before the reply are also processed (e.g. incoming content will be correctly queued.)
- When you use an asynchronous method (e.g. Basic.Publish), WireAPI does not wait for any server response. You can therefore send such events rapidly and expect WireAPI to return as fast as it can, and process them in the background.
Single-Threaded background processing
WireAPI works in both a multi-threaded model (in which one thread handles all dialogue with the server and a second handles the application) and single-threaded (in which a single thread does all the work).
You choose the model when you build OpenAMQ, which specifically has both single- and multi-threaded capability built into it.
The single-threaded model has one specific requirement: the application must periodically call the amq_client_session_wait() method, since it is during this call that asynchronous incoming methods are processed. Always call this method instead of "sleep" or an equivalent in your application.
The demo chat application
Building and testing
The following OpenAMQ application is a simple example of how to send and receive messages via WireAPI. This is a "chat" application. It consists of two programs that are listed at the end of this chapter:
- im_sender.c publishes a line of text to a chat room
- im_receiver.c joins a chat room and prints everything that is posted there
To compile and link the files set your environment in the same way as you do to build OpenAMQ server and type this command:
c -l im_sender.c im_receiver.c
To start the chat room demo, open two console windows and run these commands, one in each window, in the following order:
amq_server
im_receiver localhost:5672 "demo room"
Note that the amq_server by default starts on port 5672. Now to send a line of text to the chat room, run the im_sender program in a third window:
im_sender localhost:5672 "demo room" "This is a not a string"
There can be any number of clients connected to the same chatroom. There's no need to create chat rooms explicitly; these are created on demand.
Sender program
im_sender.c:
---------------------------------------------------------------------------
//
// im_sender.c - Sends messages to the chatroom
// By iMatix Corporation, April 2008. Code released into the public domain.
//
// Name: im_sender
// Usage: im_sender <broker-addeess> <chatroom> <your-name>
// Example: im_sender 127.0.0.1:5672 "OpenAMQ discussion"
// "Baron Bartholomaeus von Saburg-Fridetzki"
// Sends messages from stdio to the chat room
// To receive messages from the chat room, use im_receiver application
#include "base.h"
#include "amq_client_connection.h"
#include "amq_client_session.h"
int main (int argc, char *argv [])
{
amq_client_connection_t
*connection;
amq_client_session_t
*session;
icl_longstr_t
*auth_data;
amq_content_basic_t
*content;
char
message_text [1024];
char
*message_body;
assert (argc == 4);
// Initialise system
icl_system_initialise (argc, argv);
// Open a connection
auth_data = amq_client_connection_auth_plain ("guest", "guest");
connection = amq_client_connection_new (
argv [1], "/", auth_data, "im_sender", 0, 30000);
assert (connection);
icl_longstr_destroy (&auth_data);
// Open a channel
session = amq_client_session_new (connection);
assert (session);
while (1) {
// Read one line from stdin
fgets (message_text, sizeof (message_text), stdin);
// Exit the loop if Ctrl+C is encountered
if (!connection->alive)
break;
// Create the message body
message_body =
malloc (strlen (argv [3]) + 2 + strlen (message_text) + 1);
assert (message_body);
sprintf (message_body, "%s: %s", argv [3], message_text);
// Create the message itself
content = amq_content_basic_new ();
amq_content_basic_set_body (content, message_body,
strlen (message_body), free);
// Send the message
amq_client_session_basic_publish (
session, // session
content, // content to send
0, // ticket
"amq.direct", // exchange to send message to
argv [2], // routing-key
FALSE, // mandatory
FALSE); // immediate
// Release the message
amq_content_basic_unlink (&content);
}
// Close the channel
amq_client_session_destroy (&session);
// Close the connection
amq_client_connection_destroy (&connection);
// Uninitialise system
icl_system_terminate ();
return 0;
}
Receiver program
im_receiver.c:
---------------------------------------------------------------------------
//
// im_receiver.c - Receives messages from a chatroom
// By iMatix Corporation, April 2008. Code released into the public domain.
//
// Name: im_receiver
// Usage: im_receiver <broker-addeess> <chatroom>
// Example: im_receiver 127.0.0.1:5672 "OpenAMQ discussion"
// Receives messages from the chatroom and writed them to stdout
// To send messages to the chat room, use im_sender application
#include "base.h"
#include "amq_client_connection.h"
#include "amq_client_session.h"
int main (int argc, char *argv [])
{
amq_client_connection_t
*connection;
amq_client_session_t
*session;
icl_longstr_t
*auth_data;
amq_content_basic_t
*content;
char
message_text [1024];
size_t
message_size;
assert (argc == 3);
// Initialise system
icl_system_initialise (argc, argv);
// Open a connection
auth_data = amq_client_connection_auth_plain ("guest", "guest");
connection = amq_client_connection_new (
argv [1], "/", auth_data, "im_receiver", 0, 30000);
assert (connection);
icl_longstr_destroy (&auth_data);
// Open a channel
session = amq_client_session_new (connection);
assert (session);
// Create a private queue
amq_client_session_queue_declare (
session, // session
0, // ticket
NULL, // queue name
FALSE, // passive
FALSE, // durable
TRUE, // exclusive
TRUE, // auto-delete
NULL); // arguments
// Bind the queue to the exchange
amq_client_session_queue_bind (
session, // session
0, // ticket
NULL, // queue
"amq.direct", // exchange
argv [2], // routing-key
NULL); // arguments
// Consume from the queue
amq_client_session_basic_consume (
session, // session
0, // ticket
NULL, // queue
NULL, // consumer-tag
TRUE, // no-local
TRUE, // no-ack
TRUE, // exclusive
NULL); // arguments
while (1) {
while (1) {
// Get next message
content = amq_client_session_basic_arrived (session);
if (!content)
break;
// Get the message body and write it to stdout
message_size = amq_content_basic_get_body (content,
(byte*) message_text, sizeof (message_text));
if (message_size) {
message_text [message_size] = 0;
fputs (message_text, stdout);
}
// Destroy the message
amq_content_basic_unlink (&content);
}
// Wait while next message arrives
amq_client_session_wait (session, 0);
// Exit the loop if Ctrl+C is encountered
if (!connection->alive)
break;
}
// Close the channel
amq_client_session_destroy (&session);
// Close the connection
amq_client_connection_destroy (&connection);
// Uninitialise system
icl_system_terminate ();
return 0;
}
Comments
Edit | Files | Tags | Source | Print
Author
iMatix Corporation
Table of Contents
|
Installing and using OpenAMQ
Introduction to OpenAMQ: This document is an introduction to the concept of business messaging in general, and to OpenAMQ in particular. It is intended for new OpenAMQ users who wish to understand the problems that OpenAMQ solves, and how OpenAMQ can be useful in software applications.
Basic use of OpenAMQ: This document explains how to get OpenAMQ running on your system. It explains how to download the software, how to unpack and build it (if you are using a source package), and how to run basic tests on the resulting software.
Advanced use of OpenAMQ: This guide is for people who need to configure and manage OpenAMQ servers. We explain how to configure and tune an OpenAMQ server, covering these topics: logging, monitoring, high-availability failover, and joining OpenAMQ servers into wide-area federations.
Writing applications
Programming WireAPI: This is the main guide for developers who wish to use OpenAMQ in their applications. We describe WireAPI, the C/C++ API that OpenAMQ provides for accessing AMQP. Expert WireAPI users may wish to read the iMatix iCL guide, but this document is otherwise self-complete.
Programming PAL: This guide is for OpenAMQ developers who need a quick way to write test cases and simple scenarios. We explain the PAL language, an XML scripting tool that gives you a fast way to construct AMQP applications to test routing models, performance and stability tests, and other test cases.
Programming the Console: This document explains how to write applications that automate management of OpenAMQ servers via console automation. The OpenAMQ console automation architecture offers developers different ways of accessing the functionality of the console API and integrating it with their own preferred tools and management facilities.
Technical library
Developer's Guide to ASL: This is a technical guide for protocol developers who wish to use the iMatix ASL framework for the development of connected client-server protocols. ASL is a generic framework that uses a protocol modeling language to construct the whole infrastructure for a given protocol. ASL was built primarily to support AMQP.
Developer's Guide to iCL: This is a technical guide for developers who wish to understand how the iMatix iCL framework works. iCL is a class-oriented modelling language for C applications and is one of the basic frameworks used in iMatix applications such as OpenAMQ.
Developer's Guide to MOP: This is a technical guide for developers who wish to understand how the iMatix code generation frameworks are constructed. We explain the principles of model oriented programming, and the basics of code generation using the iMatix GSL language. This provides essential basic knowledge for anyone intending to modify the OpenAMQ software.
Developer's Guide to SMT: This is a technical guide for developers who wish to understand how the iMatix SMT framework works. To use this guide the reader should be familiar with the iMatix iCL framework, and the iMatix Model Oriented Programming principles.
RFCs
The CML Request for Comments: We describe a generic technique for managing AMQP servers from a remote client application. This technique consists of a standard transport mechanism built over AMQP, and a standard XML language used to exchange information between a management component built-in to the server, and a management application. This is a request for comments, it is not a standard.
The documentation above talks about a function called amq_client_session_basic_arrived_count. This method is actually called amq_client_session_get_basic_arrived_count.
Thanks for that errata, it's fixed now.
Portfolio
Hi,
Correct me if I misunderstood it, but the article states that Basic.Publish is always assynchronous. Is that right? Is there a sync counterpart?
Thanks,
Lac
Hello, is there a way to do a partial read of the message body ?
I would like to do something like :
pseudo code :
Header header;
amq_content_basic_get_body (content, (byte*)header, sizeof (Header));
Object *o = allocateObject(header);
amq_content_basic_get_body (content, (byte*)o->getBuffer(), o->size());
The idea is I have some dynamic content that is going to transit through AMQ and if I cannot do a partial read, then I'm going to end up doing 2 allocations + 1 memcpy.
pseudo code :
byte buffer[SomeSize]; // first allocation
amq_content_basic_get_body (content, (byte*)buffer, sizeof (buffer));
Header *header = buffer;
Object *o = allocateObject(header); // second allocation
memcpy(o->getBuffer(), header + 1, o->size()); // memcpy
Any idea ?
Tx
David
Hi David,
AFAIK it's not possible to do so. As far I understand your sample 'allocateObject()' does some memory allocation based on the information in the header part, right? So what about putting all your possible 'Object ' structures into a union and just reinterpret(_cast) the part after your header structure appropriately, without extra allocation and copy from the original buffer?
BTW I would recommend you to use Google Protobuf or s.th. alike to define your messages' content. Handling wireable formats can become pretty tricky, especially if you need to interchange stuff between different machine types and using different programming languages to interpret it. Google Protobuf works pretty good for my needs and is easy to use an understand.
HTH
günther
Unfortunately I cannot do that. I'm trying to refactor some historic hand written code and I don't want to do a big bang. So I'm doing a step by step refacto. In this current iteration, I want to keep the memory layout which is : a class object containing a #pragma pack structure holding the wiredata.
This is not ideal, but this is what I have to work with. For now.
I could use the field table if I could put binary data in it, but it seems like it can hold only strings. So it seems like I cannot use it.
Looking at the source code of amq, I've also found the (undocumented) amq_content_basic_wire_get() which might do what I want, but it seems very low level and I'm not sure it's going to return only content (I'm afraid it could return also protocol content). For example : source comment says it is automatically skipping heartbeat frames. Plus it does not work out of an amq_content but uses an ip_bucket as input.
Hello,
all messages I receive have body_size == 0
There is no method amq_content_basic_set_body_size() while section "Basic content property" (http://www.openamq.org/doc:prog-wireapi/comments/show#toc40) says all properties are settable using amq_content_basic_set_[propertyname] (content, newvalue).
I would have expected the method amq_content_basic_set_body to do it for me but it does not.
Please advise.
Tx
David
All right… Found method amq_content_basic_get_body_size() which returns the correct size…
Should'nt the documentation be fixed to stress that you cannot access directly the properties but you need to call the get_XXX() functions ?
Tx
David
Hi David,
It's documented somewhere in the common part, as far I remember, that you should never access any of the WireAPI data structures directly but
use the get/set functions.
WBR
Günther
Hello, tx for the answer.
However, this very page says :
Basic content properties
All contents have these properties, which you can inspect directly using content->propertyname
So the documentation is not consistent and should be fixed.
Also for my other question (partial read) : do you know where to begin ?
Tx
David
Hello,
how can you tune openamq for latency ?
I'm using direct mode with amq 1.3d0 and have this custom wireapi.cfg :
<?xml?>
<config>
<direct batching="0" on_overflow="warn"/>
</config>
If I send only one message without this config file, I see a local roundtrip of 40msec (client, amq and server running local).
If I send only one message with this config file, I see a local roundtrip of 20msec (client, amq and server still running local)
This is way too slow !
When I send several thousands of messages I see a roundtrip of 60microsecs. Which is acceptable.
Most of the time my system will not have so many messages but they need to be delivered fast. I need to remove all the batching so that I can achieve the same kind of low latency roundtrip (60microsecs) with a few messages per seconds.
setting batching=0 both on wireapi.cfg and server does not seem to have the correct effect. Is there anything else I can tune ?
Tx
David
Hi David,
That's interesting. I have observed the same bad latencies with my 1st tests, that also involve sending just a handful of messages around.
I am also using direct mode and it was just local distribution for these tests, I didn't touch the batching parameter. I decided to investigate later about that but now as you're asking …
I would appreciate if pieter can give us a hint about this, but as he decided to cancel (free) support here :-( …
Regards,
Günther
BTW:
I think you should open separate topics/issues for your questions, I feel these threads 'pollute' the WireAPI documentation page in a way.
Sending just a few messages means you hit buffering issues, which push up latency. You can tune this (—tcp_nodelay option). Direct mode will reduce latency as well.
Putting these questions in a separate thread would be good. We stopped free support because it was too much work and it's nice to see OpenAMQ users helping each other, this rarely happened before.
We're thinking of moving to a better forum for discussion. Would you use that?
Portfolio
Hello, yes sure I would use that (the forum), I think it is actually hard to find the way in the doc right now.
If I'm not wrong, tcp_nodelay is activated by default isn't it ? (at least this is what the doc is saying)
To demonstrate the issue even more : this morning I had to rollback a change that gone to production with the wrong amq version (1.3d0 instead of good old 1.2c4).
Latency jumped from 2/5msec up to 40/80msec.
1.2c4 was compiled with google's allocator, 1.3d0 did not. I have to say 1.3d0 should not have gone to prod, this was a mistake cause I have had not fully validated it. Also 1.3d0 was not setup to use direct mode. But so far, the testing I have done showed that even with direct mode enabled the latency was not good at all.
The document above says the following:
I think, you meant Basic.Get is synchronous, and not Basic.Consume.
Thanks,
Raman
The Session properties are listed as
…
reply_code (integer) - error value reported by server
consumer_tag (integer) - server-assigned consumer tag
routing_key (string) - original message routing key
scope (string) - queue name scope
delivery_tag (integer) - server-assigned delivery tag
redelivered (Boolean) - message is being redelivered
I found while compiling on RedHat Linux, in a printf that
Property consumer_tag required a char*, not an integer (%s, not %d).
Property scope didn't exist. But queue does.
Keven Miller
Is there a way to link extra libraries to the app compiled with openamq WireAPI library ? Couldn't find a reference to it. Thanks a lot for you help.
/home/apps/ibase/bin/c
-Wall -Werror -I. -I/sources/sto/apps/evpubd/lib/ I/sources/apps/include -I/sources/common/include -I/sources/appsibase/include -l test_sender.c
Compiling test_sender…
Linking test_sender…
test_sender.o: In function ‘get_status_by_index’:
/sources/sto/apps/evpubd/ev_publisher.c:1819: undefined reference to ‘get_send_ctx’
test_sender.o: In function ‘send_it’:
test_sender.c:2515: undefined reference to ‘get_send_ctx’
collect2: ld returned 1 exit status
make: *** [test_sender] Error 1
Check the 'c' script used for compilation, it has explanations at the start of how to do this.
Portfolio