89 - multi-threaded client connection failure

Reported by SrinikSrinik (17 Nov 2012 16:28)

Steps to reproduce it
a) Change the IP-address in the code below to point to the correct IP -address of the server, compile and run it.

im_sender.c - Sends messages to the chatroom
// By iMatix Corporation, April 2008. Code released into the public domain.

Name: im_sender
// Usage: im_sender <broker-addeess> <chatroom> <your-name>
// Example: im_sender 127.0.0.1:5672 "OpenAMQ discussion"
// "Baron Bartholomaeus von Saburg-Fridetzki"
// Sends messages from stdio to the chat room
// To receive messages from the chat room, use im_receiver application

/* Open amq */
#include "base.h"
#include "amq_client_connection.h"
#include "amq_client_session.h"

#define _MULTI_THREADED
#include <pthread.h>
#include <stdio.h>
#include <string.h>

/*

  • Publisher stuff.

*/
char *publisher = "mt_sender";
char open_am_host[] = "192.168.1.152:5672";
char routing_key[] = "ava-80";
amq_client_connection_t *amq_connection;
amq_client_session_t *amq_session;

char *amq_messages[] =
{
"Hello World\n",
"I am a publisher\n",
"I can send messages to interesed listeners\n",
"Let me know what you are interested in\n",
"Thank you and happy holidays\n",
NULL
};

amq_client_session_t *
create_ampq_conn_session(void)
{
amq_client_session_t *session;
amq_client_connection_t *connection;
icl_longstr_t *auth_data;

// Initialize system
icl_system_initialise(1, &publisher);

// Open a connection
auth_data = amq_client_connection_auth_plain ("guest", "guest");
connection = amq_client_connection_new (
open_am_host, "/", auth_data, publisher, 0, 30000);
amq_connection = connection;
icl_longstr_destroy (&auth_data);
if (connection) {
icl_console_print("I: connected to %s/%s - %s - %s",
connection->server_product,
connection->server_version,
connection->server_platform,
connection->server_information);
} else {
icl_console_print("E: could not connect to server");
exit(1);
}

// Open a channel
session = amq_client_session_new (connection);
assert (session);

return (session);
}

void
close_amq_conn_session(void)
{
// Close the channel
amq_client_session_destroy (&amq_session);
// Close the connection
amq_client_connection_destroy (&amq_connection);
// Uninitialise system
icl_system_terminate ();
}

int
send_message(amq_client_session_t *sess, char *message)
{
char *msg = malloc(strlen(publisher) + 2 + strlen(message) + 1);
amq_content_basic_t *content;

assert(msg);
sprintf(msg, "%s: %s\n", publisher, message);
content = amq_content_basic_new();
amq_content_basic_set_body(content, msg, strlen(msg), free);

amq_client_session_basic_publish(
sess,
content,
0,
"amq.direct",
routing_key, FALSE, FALSE);
amq_content_basic_unlink(&content);

return (1);
}

/*

  • Thread stuff.

*/
int cur_msg;

pthread_mutex_t th_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t th_cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t tm_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t tm_cond = PTHREAD_COND_INITIALIZER;

void *
msg_thread(void *unused)
{
char *msg;
int msg_num = 10;

printf("%s:Opening an amq connection\n", func);
amq_session = create_ampq_conn_session();
while (1) {
pthread_cond_signal(&th_cond);
pthread_mutex_lock(&tm_mutex);
pthread_cond_wait(&tm_cond, &tm_mutex);
pthread_mutex_unlock(&tm_mutex);

printf("%s: Picking up msg: %d\n", func, cur_msg);
msg = amq_messages[cur_msg];
if (msg) {
send_message(amq_session, msg);
pthread_cond_signal(&th_cond);
} else {
break;
}
}
close_amq_conn_session();
return NULL;
}

int sent_all_msgs;
void *
send_thread(void *unused)
{
cur_msg = 0;
sleep(1);
while (amq_messages[cur_msg]) {

printf("%s: Send message: %d\n", func, cur_msg);
pthread_cond_signal(&tm_cond);
pthread_mutex_lock(&tm_mutex);
pthread_cond_wait(&tm_cond, &tm_mutex);
pthread_mutex_unlock(&tm_mutex);
cur_msg++;
}
sent_all_msgs = 1;
printf("%s: exiting\n", func);
return (NULL);
}

#define NTHREADS 2
pthread_t thread_id[NTHREADS];
int
app_init(void)
{
int rv;
int tid;

if ((rv = pthread_create(&thread_id[0], NULL, &msg_thread, NULL))) {
return rv;
}
if ((rv = pthread_create(&thread_id[1], NULL, &send_thread, NULL))) {
return rv;
}
sleep (5);

rv = pthread_mutex_lock(&tm_mutex);
if (rv) {
return rv;
}
rv = pthread_cond_broadcast(&tm_cond);
if (rv) {
return rv;
}
rv = pthread_mutex_unlock(&tm_mutex);
if (rv) {
return rv;
}

return rv;
}

int main (int argc, char *argv [])
{
int i;
if (app_init()) {
perror("Received error\n");
}

while (!sent_all_msgs);
for (i = 0; i < NTHREADS; ++i) {
pthread_join(thread_id[i], NULL);
}
pthread_cond_destroy(&tm_cond);
pthread_mutex_destroy(&tm_mutex);
return 0;
}

Output:

-bash-3.1# ./mt_sender2
msg_thread:Opening an amq connection
send_thread: Send message: 0
send_thread: Send message: 1
15:23:05: E: could not connect to server
-bash-3.1#

Attachments:

No files attached to this page.

Comments

Add a New Comment

Edit | Files | Tags | Print

rating: 0+x

Who's following this issue?

pieterhpieterh
martin_sustrikmartin_sustrik
CybariteCybarite
SrinikSrinik
Watch: site | category | page

Submitted by SrinikSrinik

Use one of these tags to say what kind of issue it is:

  • issue - a fault in the software or the packaging or the documentation.
  • change - a change or feature request.

Use one of these tags to say what state the issue is in:

  • open - a new, open issue.
  • closed - issue has been closed.
  • rejected - the issue has been rejected.

Use one of these tags to say how urgent the issue is:

  • fatal - the issue is stopping all work.
  • urgent - it's urgent.

All open

89 - multi-threaded client connection failure (17 Nov 2012 16:28) [open]
87 - Zyre returns incomplete XML (26 Apr 2010 08:15) [open]
86 - SFL 'random(num)' macro is wrong in sfl.h (31 Mar 2010 09:23) [open]
85 - Zyre does not start on Solaris (23 Mar 2010 01:29) [open]
84 - OpenAMQ JMS - AMQTopic constructor use HEADER name and class instead of TOPIC (28 Jan 2010 17:04) [open]
83 - WireAPI: How to 'override' signal handlers? (14 Jan 2010 17:33) [open]
82 - Opf Classes Cannot Accept Default Values With Characte (06 Jan 2010 09:34) [open]
81 - AMQP Topic Exhange Routing (29 Dec 2009 00:21) [open]
80 - OpenAMQ reports malformed frame on 0-9-1 queue.unbind (20 Nov 2009 12:33) [open]
79 - AMQ Server crashing if subscribe topic is set as #.# (30 Oct 2009 06:11) [open]
78 - Error while publishing the messages faster (30 Oct 2009 05:57) [open]
77 - Tuning for latency (28 Oct 2009 16:47) [open]
76 - New user forum (28 Oct 2009 11:29) [change open]
74 - Simulaneous connect/disconnect from multiple threads crashes (03 Sep 2009 15:32) [open]
73 - Topic Exchange not sending a message to XXX.* (25 Aug 2009 21:10) [open]
72 - amq_content_basic_new() causes seg fault if not connected to broker (12 Aug 2009 23:50) [open]
71 - zyre bugs (06 Aug 2009 09:33) [open]
69 - OpenAMQ and Zyre (15 Jul 2009 11:27) [open]
68 - Change names of max and min source code macros (10 Jul 2009 16:52) [open]
67 - Server crash when multiple consumers ack on shared queue (26 Jun 2009 11:35) [open]

page 1 of 212next »

Most recent