Reported by Srinik (17 Nov 2012 16:28)
Steps to reproduce it
a) Change the IP-address in the code below to point to the correct IP -address of the server, compile and run it.
im_sender.c - Sends messages to the chatroom
// By iMatix Corporation, April 2008. Code released into the public domain.
Name: im_sender
// Usage: im_sender <broker-addeess> <chatroom> <your-name>
// Example: im_sender 127.0.0.1:5672 "OpenAMQ discussion"
// "Baron Bartholomaeus von Saburg-Fridetzki"
// Sends messages from stdio to the chat room
// To receive messages from the chat room, use im_receiver application
/* Open amq */
#include "base.h"
#include "amq_client_connection.h"
#include "amq_client_session.h"
#define _MULTI_THREADED
#include <pthread.h>
#include <stdio.h>
#include <string.h>
/*
- Publisher stuff.
*/
char *publisher = "mt_sender";
char open_am_host[] = "192.168.1.152:5672";
char routing_key[] = "ava-80";
amq_client_connection_t *amq_connection;
amq_client_session_t *amq_session;
char *amq_messages[] =
{
"Hello World\n",
"I am a publisher\n",
"I can send messages to interesed listeners\n",
"Let me know what you are interested in\n",
"Thank you and happy holidays\n",
NULL
};
amq_client_session_t *
create_ampq_conn_session(void)
{
amq_client_session_t *session;
amq_client_connection_t *connection;
icl_longstr_t *auth_data;
// Initialize system
icl_system_initialise(1, &publisher);
// Open a connection
auth_data = amq_client_connection_auth_plain ("guest", "guest");
connection = amq_client_connection_new (
open_am_host, "/", auth_data, publisher, 0, 30000);
amq_connection = connection;
icl_longstr_destroy (&auth_data);
if (connection) {
icl_console_print("I: connected to %s/%s - %s - %s",
connection->server_product,
connection->server_version,
connection->server_platform,
connection->server_information);
} else {
icl_console_print("E: could not connect to server");
exit(1);
}
// Open a channel
session = amq_client_session_new (connection);
assert (session);
return (session);
}
void
close_amq_conn_session(void)
{
// Close the channel
amq_client_session_destroy (&amq_session);
// Close the connection
amq_client_connection_destroy (&amq_connection);
// Uninitialise system
icl_system_terminate ();
}
int
send_message(amq_client_session_t *sess, char *message)
{
char *msg = malloc(strlen(publisher) + 2 + strlen(message) + 1);
amq_content_basic_t *content;
assert(msg);
sprintf(msg, "%s: %s\n", publisher, message);
content = amq_content_basic_new();
amq_content_basic_set_body(content, msg, strlen(msg), free);
amq_client_session_basic_publish(
sess,
content,
0,
"amq.direct",
routing_key, FALSE, FALSE);
amq_content_basic_unlink(&content);
return (1);
}
/*
- Thread stuff.
*/
int cur_msg;
pthread_mutex_t th_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t th_cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t tm_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t tm_cond = PTHREAD_COND_INITIALIZER;
void *
msg_thread(void *unused)
{
char *msg;
int msg_num = 10;
printf("%s:Opening an amq connection\n", func);
amq_session = create_ampq_conn_session();
while (1) {
pthread_cond_signal(&th_cond);
pthread_mutex_lock(&tm_mutex);
pthread_cond_wait(&tm_cond, &tm_mutex);
pthread_mutex_unlock(&tm_mutex);
printf("%s: Picking up msg: %d\n", func, cur_msg);
msg = amq_messages[cur_msg];
if (msg) {
send_message(amq_session, msg);
pthread_cond_signal(&th_cond);
} else {
break;
}
}
close_amq_conn_session();
return NULL;
}
int sent_all_msgs;
void *
send_thread(void *unused)
{
cur_msg = 0;
sleep(1);
while (amq_messages[cur_msg]) {
printf("%s: Send message: %d\n", func, cur_msg);
pthread_cond_signal(&tm_cond);
pthread_mutex_lock(&tm_mutex);
pthread_cond_wait(&tm_cond, &tm_mutex);
pthread_mutex_unlock(&tm_mutex);
cur_msg++;
}
sent_all_msgs = 1;
printf("%s: exiting\n", func);
return (NULL);
}
#define NTHREADS 2
pthread_t thread_id[NTHREADS];
int
app_init(void)
{
int rv;
int tid;
if ((rv = pthread_create(&thread_id[0], NULL, &msg_thread, NULL))) {
return rv;
}
if ((rv = pthread_create(&thread_id[1], NULL, &send_thread, NULL))) {
return rv;
}
sleep (5);
rv = pthread_mutex_lock(&tm_mutex);
if (rv) {
return rv;
}
rv = pthread_cond_broadcast(&tm_cond);
if (rv) {
return rv;
}
rv = pthread_mutex_unlock(&tm_mutex);
if (rv) {
return rv;
}
return rv;
}
int main (int argc, char *argv [])
{
int i;
if (app_init()) {
perror("Received error\n");
}
while (!sent_all_msgs);
for (i = 0; i < NTHREADS; ++i) {
pthread_join(thread_id[i], NULL);
}
pthread_cond_destroy(&tm_cond);
pthread_mutex_destroy(&tm_mutex);
return 0;
}
Output:
-bash-3.1# ./mt_sender2
msg_thread:Opening an amq connection
send_thread: Send message: 0
send_thread: Send message: 1
15:23:05: E: could not connect to server
-bash-3.1#
Attachments:
No files attached to this page.
Comments
Who's following this issue?
pieterhmartin_sustrik
Cybarite
Srinik
Submitted by Srinik
Use one of these tags to say what kind of issue it is:
- issue - a fault in the software or the packaging or the documentation.
- change - a change or feature request.
Use one of these tags to say what state the issue is in:
- open - a new, open issue.
- closed - issue has been closed.
- rejected - the issue has been rejected.
Use one of these tags to say how urgent the issue is:
- fatal - the issue is stopping all work.
- urgent - it's urgent.
All open
89 - multi-threaded client connection failure (17 Nov 2012 16:28) [open]
87 - Zyre returns incomplete XML (26 Apr 2010 08:15) [open]
86 - SFL 'random(num)' macro is wrong in sfl.h (31 Mar 2010 09:23) [open]
85 - Zyre does not start on Solaris (23 Mar 2010 01:29) [open]
84 - OpenAMQ JMS - AMQTopic constructor use HEADER name and class instead of TOPIC (28 Jan 2010 17:04) [open]
83 - WireAPI: How to 'override' signal handlers? (14 Jan 2010 17:33) [open]
82 - Opf Classes Cannot Accept Default Values With Characte (06 Jan 2010 09:34) [open]
81 - AMQP Topic Exhange Routing (29 Dec 2009 00:21) [open]
80 - OpenAMQ reports malformed frame on 0-9-1 queue.unbind (20 Nov 2009 12:33) [open]
79 - AMQ Server crashing if subscribe topic is set as #.# (30 Oct 2009 06:11) [open]
78 - Error while publishing the messages faster (30 Oct 2009 05:57) [open]
77 - Tuning for latency (28 Oct 2009 16:47) [open]
76 - New user forum (28 Oct 2009 11:29) [change open]
74 - Simulaneous connect/disconnect from multiple threads crashes (03 Sep 2009 15:32) [open]
73 - Topic Exchange not sending a message to XXX.* (25 Aug 2009 21:10) [open]
72 - amq_content_basic_new() causes seg fault if not connected to broker (12 Aug 2009 23:50) [open]
71 - zyre bugs (06 Aug 2009 09:33) [open]
69 - OpenAMQ and Zyre (15 Jul 2009 11:27) [open]
68 - Change names of max and min source code macros (10 Jul 2009 16:52) [open]
67 - Server crash when multiple consumers ack on shared queue (26 Jun 2009 11:35) [open]
Most recent
90 - Frequent coredump in OpenAMQ (09 Apr 2013 12:32) [fatal urgent]
89 - multi-threaded client connection failure (17 Nov 2012 16:28) [open]
88 - amq_console_agent crashes (28 Aug 2010 08:46) [closed]
87 - Zyre returns incomplete XML (26 Apr 2010 08:15) [open]
86 - SFL 'random(num)' macro is wrong in sfl.h (31 Mar 2010 09:23) [open]
85 - Zyre does not start on Solaris (23 Mar 2010 01:29) [open]
84 - OpenAMQ JMS - AMQTopic constructor use HEADER name and class instead of TOPIC (28 Jan 2010 17:04) [open]
83 - WireAPI: How to 'override' signal handlers? (14 Jan 2010 17:33) [open]
82 - Opf Classes Cannot Accept Default Values With Characte (06 Jan 2010 09:34) [open]
81 - AMQP Topic Exhange Routing (29 Dec 2009 00:21) [open]