66 - Strange behavior of open/close channel

Reported by g_makulikg_makulik (1244469601|%O ago)

When writing testcases for my OpenAMQ WireAPI C++ wrapper classes I found a strange behavior of the server that looks to me, if there might be some
ressource leaking.
The test case is the following: I have a client that repeatedly opens and closes channels (=connection + session) within the same process.
If I run the amq_server and only this client, everything works fine. But as soon I start another application before that client, that simply connects the server
and starts to consume from a exchange/queue pair that was created within this receiver application, I get errors (connection to server lost) when running
the other application described before. I can "stretch" the number of open channel attempts that fail, by increasing the polling and working threads of the
server. Another thing I have noticed in that context were frequent 'server heartbeat slowing' warnings after the testcase was running.

'amq_server -v' output:
{{OpenAMQ/1.3d0 - revision 12075
Debug release for internal use only

Copyright (c) 2007-2009 iMatix Corporation
This is free software; see the source for copying conditions. There is NO
warranty; not even for MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.

Build model:Debug release for internal use only
Memory model: fat
Threading model: multithreaded

Compiler: gcc -c -I/home/freescale/workspace/ufx1000_software/OpenAMQ/openamq/_install/include -g -DDEBUG -O -Wall -pthread -D_REENTRANT -DICL_MEM_DEFAULT_DIc}}

That's what my linux prints at startup
Image Name: Linux-2.6.23
Created: 2008-12-22 3:22:49 UTC
Image Type: PowerPC Linux Kernel Image (gzip compressed)

Here's the source code of my client applications (I rewrote them, just to use plain WireAPI, instead of my API classes, to eliminate possible errors or flaws in my stuff):

Application 1, TestReceiver runs 1st

#include "wireapi.h"
#include <iostream>

using namespace std;

int main(int argc, char** argv)
{
    icl_system_initialise(argc,argv);

    icl_longstr_t *auth_data;  //  Authentication data

    cout << "TestReceiver: Opening connection ..." << endl;
    amq_client_connection_t* pconn;
    amq_client_session_t* psession;

    auth_data  = amq_client_connection_auth_plain( "guest", "guest");
    pconn = amq_client_connection_new( "localhost", "/", auth_data, "", 0, 30000);

    if (pconn != NULL)
    {
        psession = amq_client_session_new(pconn);
        if (psession != NULL)
        {

            cout << "TestReceiver: Declare exchange 'TestExchange' ..." << endl;
            amq_client_session_exchange_declare
                            ( psession
                            , 0
                            , "TestExchange"
                            , "direct"
                            , 0
                            , 0
                            , 0
                            , 0
                            , NULL
                            );

            cout << "TestReceiver: Creating queue ..." << endl;
            amq_client_session_queue_declare
                            ( psession
                            , 0
                            , NULL
                            , 0
                            , 0
                            , 0
                            , 0
                            , NULL
                            );
            char* queueName = psession->queue;

            cout << "TestReceiver: Binding queue, routing key is 'TestRouting' ..." << endl;
            amq_client_session_queue_bind
                        ( psession
                        , 0
                        , queueName
                        , "TestExchange"
                        , "TestRouting"
                        , NULL                        );

            cout << "TestReceiver: Consume messages from the queue ..." << endl;
            amq_client_session_basic_consume
                        ( psession
                        , 0
                        , queueName
                        , ""
                        , 0
                        , 0
                        , 0
                        , NULL
                        );

            cout << "TestReceiver: Waiting for incoming messages ..." << endl;
            while (1)
            {
                if ( amq_client_session_wait(psession,0) == 0)
                {
                    int messageCount = amq_client_session_get_basic_arrived_count(psession);
                    if(messageCount > 0)
                    {
                        //  Get next message
                        amq_content_basic_t* pmessage;
                        pmessage = amq_client_session_basic_arrived(psession);

                        for(int i = 0;
                            i < messageCount &&
                            pmessage != NULL;
                            ++i)
                        {
                            //  Get the message body and write it to stdout
                            size_t size = amq_content_basic_get_body_size(pmessage);
                            byte* textBuffer = new byte[size + 1];
                            amq_content_basic_get_body(pmessage,textBuffer,size);
                            textBuffer[size] = 0;

                            cout << "TestReceiver: received message '" << (char*)textBuffer << "' ..." << endl;

                            delete pmessage;

                            //  Get next message
                            pmessage = amq_client_session_basic_arrived(psession);
                        }
                    }
                }

                //  Exit the loop if Ctrl+C is encountered
                if(amq_client_connection_get_alive(pconn) != TRUE)
                {
                    break;
                }
            }
        }
    }
    amq_client_session_destroy(&psession);
    amq_client_connection_destroy(&pconn);

    icl_system_terminate();

}

Application 2, ChannelOpenCloseTest runs as 2nd

#include <iostream>
#include "wireapi.h"

using namespace std;

int main(int argc, char* argv[])
{

    icl_system_initialise(argc,argv);

    icl_longstr_t *auth_data;  //  Authentication data

    amq_client_connection_t* pconn;
    amq_client_session_t* psession;

    for(int i = 0; i < 30; ++i)
    {

        auth_data  = amq_client_connection_auth_plain( "guest", "guest");
        pconn = amq_client_connection_new( "localhost", "/", auth_data, "", 0, 30000);
        if(pconn != NULL)
        {
            psession = amq_client_session_new(pconn);

            cout << "Created connection " << i << endl;
            if (psession != NULL)
            {
                amq_client_session_destroy(&psession);
                psession = NULL;
            }
            amq_client_connection_destroy(&pconn);
            pconn = NULL;
        }

        icl_longstr_destroy (&auth_data);
    }

    icl_system_terminate();
}

Attachments:

No files attached to this page.

Comments

Add a New Comment

Edit | Files | Tags | Print

rating: 0+x

Who's following this issue?

pieterhpieterh
martin_sustrikmartin_sustrik
g_makulikg_makulik
CybariteCybarite
Watch: site | category | page

Submitted by g_makulikg_makulik

Use one of these tags to say what kind of issue it is:

  • issue - a fault in the software or the packaging or the documentation.
  • change - a change or feature request.

Use one of these tags to say what state the issue is in:

  • open - a new, open issue.
  • closed - issue has been closed.
  • rejected - the issue has been rejected.

Use one of these tags to say how urgent the issue is:

  • fatal - the issue is stopping all work.
  • urgent - it's urgent.

All open

89 - multi-threaded client connection failure (17 Nov 2012 16:28) [open]
87 - Zyre returns incomplete XML (26 Apr 2010 08:15) [open]
86 - SFL 'random(num)' macro is wrong in sfl.h (31 Mar 2010 09:23) [open]
85 - Zyre does not start on Solaris (23 Mar 2010 01:29) [open]
84 - OpenAMQ JMS - AMQTopic constructor use HEADER name and class instead of TOPIC (28 Jan 2010 17:04) [open]
83 - WireAPI: How to 'override' signal handlers? (14 Jan 2010 17:33) [open]
82 - Opf Classes Cannot Accept Default Values With Characte (06 Jan 2010 09:34) [open]
81 - AMQP Topic Exhange Routing (29 Dec 2009 00:21) [open]
80 - OpenAMQ reports malformed frame on 0-9-1 queue.unbind (20 Nov 2009 12:33) [open]
79 - AMQ Server crashing if subscribe topic is set as #.# (30 Oct 2009 06:11) [open]
78 - Error while publishing the messages faster (30 Oct 2009 05:57) [open]
77 - Tuning for latency (28 Oct 2009 16:47) [open]
76 - New user forum (28 Oct 2009 11:29) [change open]
74 - Simulaneous connect/disconnect from multiple threads crashes (03 Sep 2009 15:32) [open]
73 - Topic Exchange not sending a message to XXX.* (25 Aug 2009 21:10) [open]
72 - amq_content_basic_new() causes seg fault if not connected to broker (12 Aug 2009 23:50) [open]
71 - zyre bugs (06 Aug 2009 09:33) [open]
69 - OpenAMQ and Zyre (15 Jul 2009 11:27) [open]
68 - Change names of max and min source code macros (10 Jul 2009 16:52) [open]
67 - Server crash when multiple consumers ack on shared queue (26 Jun 2009 11:35) [open]

page 1 of 212next »

Most recent