13 Replies Latest reply: Aug 31, 2010 3:25 AM by 807581 RSS

    ActiveMQ-CPP client hangs when connect ActiveMQ-CPP client & OpenMQ broker

    807581
      I am trying to connect an ActiveMQ-CPP client with an Oracle OpenMQ broker via STOMP. Both manufacturers claim this will work, and I have been able to get an ActiveMQ-CPP client to connect to an ActiveMQ broker via STOMP, an OpenMQ client with an OpenMQ broker via STOMP, and an OpenMQ client with an ActiveMQ broker via STOMP without problems, but the only one missing is what I need- ActiveMQ-CPP client to connect with OpenMQ broker.

      I am using Fedora Linux and am using the provided "example" script for ActiveMQ-CPP, changing the brokerURL to be "tcp://localhost:61613?wireFormat=stomp" instead, where the OpenMQ STOMP bridge is located at localhost:61613.

      On the OpenMQ end, I receive the request to connect by the client and I start a connection:

      INFO: Create JMS connection for user admin with client id ID:csa-nexus-57767-1281630228652-1:0
      Aug 12, 2010 8:23:48 AM
      INFO: Started JMS connection 8950669406784000768[ID:csa-nexus-57767-1281630228652-1:0] for user admin

      This is where the ActiveMQ-CPP client hangs at "connection->start", or if this is removed, "connection->createSession".

      Any help would be appreciated. Thanks!
        • 1. Re: ActiveMQ-CPP client hangs when connect ActiveMQ-CPP client & OpenMQ bro
          807581
          OpenMQ supports STOMP protocol as described in the OpenMQ Admin Guide
          http://docs.sun.com/app/docs/doc/821-1794/gjdkc?l=en&q=message+queue&a=view

          However OpenMQ does not claim to support various third party STOMP client libraries. Haven said that, it seems unlikely the example can't pass the CONNECT frame considering others in this forum seem had success to connect OpenMQ via STOMP using a different ActiveMQ library. For curiosity, could you please provide a pointer for ActiveMQ-CPP binary and the example you used ? also the reason that you need to use ActiveMQ-CPP to connect to MQ broker via STOMP ?
          • 2. Re: ActiveMQ-CPP client hangs when connect ActiveMQ-CPP client & OpenMQ bro
            807581
            I need to use ActiveMQ-CPP because I need a C++ messaging client which supports failover, which OpenMQ's C client does not. I thought it was unlikely that it wouldn't be able to connect as well. I can't find a good way to debug the ActiveMQ-CPP client enough to know whether the problem is on the ActiveMQ-CPP client's end or the OpenMQ broker's end.

            Here is the source code for installing ActiveMQ-CPP, that is how you install it: http://activemq.apache.org/cms/activemq-cpp-322-release.html.

            The example comes bundled with ActiveMQ-CPP installation, but I'll show you parts of the main.cpp file that does all of the work:
             
            .... 
            
            
            class HelloWorldConsumer : public ExceptionListener,
            public MessageListener,
            public Runnable { 
            
            
            private: 
            
            
            .....
            this->brokerURI = brokerURI;
            }
            virtual ~HelloWorldConsumer(){
            cleanup();
            } 
            
            
            void close() {
            this->cleanup();
            } 
            
            
            void waitUntilReady() {
            latch.await();
            } 
            
            
            virtual void run() { 
            
            
            try { 
            
            
            auto_ptr<ConnectionFactory> connectionFactory( ConnectionFactory::createCMSConnectionFactory( brokerURI ) ); 
            
            
            
            // Create a Connection
            connection = connectionFactory->createConnection("admin", "admin"); 
            
            
            connection->start();
            
            
            connection->setExceptionListener(this); 
            
            
            
            // Create a Session
            if( this->sessionTransacted == true ) {
            session = connection->createSession( Session::SESSION_TRANSACTED ); 
            } else {
            session = connection->createSession( Session::AUTO_ACKNOWLEDGE );
            } 
            
            
            // Create the destination (Topic or Queue)
            if( useTopic ) {
            destination = session->createTopic( "TEST.FOO" );
            } else {
            destination = session->createQueue( "TEST.FOO" );
            } 
            
            
            // Create a MessageConsumer from the Session to the Topic or Queue
            consumer = session->createConsumer( destination ); 
            
            
            consumer->setMessageListener( this ); 
            
            
            std::cout.flush();
            std::cerr.flush(); 
            
            
            // Indicate we are ready for messages.
            latch.countDown(); 
            
            
            // Wait while asynchronous messages come in.
            doneLatch.await( waitMillis );
            
            } catch( CMSException& e ) { 
            
            
            // Indicate we are ready for messages.
            latch.countDown(); 
            
            
            e.printStackTrace();
            }
            } 
            
            
            // Called from the consumer since this class is a registered MessageListener.
            virtual void onMessage( const Message* message ){ 
            
            
            static int count = 0; 
            
            
            try
            {
            count++;
            const TextMessage* textMessage =
            dynamic_cast< const TextMessage* >( message );
            string text = ""; 
            
            
            if( textMessage != NULL ) {
            text = textMessage->getText();
            } else {
            text = "NOT A TEXTMESSAGE!";
            } 
            
            
            printf( "Message #%d Received: %s\n", count, text.c_str() ); 
            
            
            } catch (CMSException& e) {
            e.printStackTrace();
            } 
            
            
            // Commit all messages.
            if( this->sessionTransacted ) {
            session->commit();
            } 
            
            
            // No matter what, tag the count down latch until done.
            doneLatch.countDown();
            } 
            
            
            // If something bad happens you see it here as this class is also been
            // registered as an ExceptionListener with the connection.
            virtual void onException( const CMSException& ex AMQCPP_UNUSED) {
            printf("CMS Exception occurred. Shutting down client.\n");
            ex.printStackTrace();
            exit(1);
            } 
            
            
            
            int main(int argc AMQCPP_UNUSED, char* argv[] AMQCPP_UNUSED) { 
            
            
            activemq::library::ActiveMQCPP::initializeLibrary(); 
            
            
            std::cout << "=====================================================\n";
            std::cout << "Starting the example:" << std::endl;
            std::cout << "-----------------------------------------------------\n"; 
            
            
            
            std::string brokerURI =
            "tcp://localhost:61613"
            "?wireFormat=stomp"
            // "&soConnectTimeout=5"
            // "&connection.sendTimeout=5"
            // "&connection.useAsyncSend=true"
            // "&transport.useInactivityMonitor=false"
            // "&connection.alwaysSyncSend=true"
            // "&connection.useAsyncSend=true"
            // "&transport.commandTracingEnabled=true"
            // "&transport.tcpTracingEnabled=true"
            // "&wireFormat.tightEncodingEnabled=true"
            ; 
            
            
            //============================================================
            // set to true to use topics instead of queues
            // Note in the code above that this causes createTopic or
            // createQueue to be used in both consumer an producer.
            //============================================================
            bool useTopics = true;
            bool sessionTransacted = false;
            int numMessages = 2000; 
            
            
            long long startTime = System::currentTimeMillis(); 
            
            
            HelloWorldProducer producer( brokerURI, numMessages, useTopics );
            HelloWorldConsumer consumer( brokerURI, numMessages, useTopics, sessionTransacted ); 
            
            
            // Start the consumer thread.
            Thread consumerThread( &consumer );
            consumerThread.start(); 
            
            
            // Wait for the consumer to indicate that its ready to go.
            consumer.waitUntilReady();
            
            // Start the producer thread.
            Thread producerThread( &producer );
            producerThread.start(); 
            
            
            // Wait for the threads to complete.
            producerThread.join();
            consumerThread.join(); 
            
            
            long long endTime = System::currentTimeMillis();
            double totalTime = (double)(endTime - startTime) / 1000.0; 
            
            
            consumer.close();
            producer.close(); 
            
            
            std::cout << "Time to completion = " << totalTime << " seconds." << std::endl; 
            
            
            std::cout << "-----------------------------------------------------\n";
            std::cout << "Finished with the example." << std::endl;
            std::cout << "=====================================================\n"; 
            
            
            activemq::library::ActiveMQCPP::shutdownLibrary();
            } 
            
            
            // END SNIPPET: demo 
            The places where the code will hang upon connection are "connection->start() and connection->createSession()". Again, this happens with any ActiveMQ client, no matter the language, and via Stomp, both should be supported. But using an OpenMQ client with ActiveMQ broker works perfectly. And using this code with its own ActiveMQ broker still via Stomp works perfectly as well.

            Thanks!
            • 3. Re: ActiveMQ-CPP client hangs when connect ActiveMQ-CPP client & OpenMQ bro
              807581
              Athough MQ C-API currently does not support auto-connection failover, it should be possible to do that at application layer using current MQ C-API.

              For the ActiveMQ-CPP example, it's unclear if the client simply waiting for messages. If you'd like to debug it, I'd suggest modify the example to first do create then close connection to see if work as expected ..
              • 4. Re: ActiveMQ-CPP client hangs when connect ActiveMQ-CPP client & OpenMQ bro
                807581
                Can you explain a little more details about how to configure the OpenMQ C client for failover? If failover worked with that method, I may be able to use that instead. However, I haven't seen any documentation to guide that setup.

                In the example, the connection->start() never returns, so if I try to start() then close() it still hangs in start(). If I try to close() before start(), I receive the error as expected:

                ActiveMQConnection::enforceConnected - Connection has already been closed!
                FILE: activemq/core/ActiveMQConnection.cpp, LINE: 785
                FILE: activemq/core/ActiveMQConnection.cpp, LINE: 413

                Then it enters start() again and hangs. Again, I know it is sending some sort of CONNECT frame since the OpenMQ broker creates and starts a JMS connection for the right user, and then waits. The example is simply waiting for something that never happens, and I can't figure out what that is. It only happens with the OpenMQ broker, no others..
                • 5. Re: ActiveMQ-CPP client hangs when connect ActiveMQ-CPP client & OpenMQ bro
                  807581
                  don't know what ActiveMQ-CPP connection->start() does underneath.. I'd suggest you post a query to ActiveMQ-CPP. The CONNECT frame in OpenMQ is implemented conforming to the STOMP protocol.

                  As mentioned previously, MQ C-API does not currently support auto connection failover. Here is the C-API reference doc
                  http://docs.sun.com/app/docs/doc/821-1795/aeldi?l=en&q=message+queue&a=view

                  An application can use MQCreateConnection, MQConnectionExceptionListenerFunc, MQCloseConnection to re-create a connection (possibly to a different broker) in a separate thread unpon receiving connection exception
                  • 6. Re: ActiveMQ-CPP client hangs when connect ActiveMQ-CPP client & OpenMQ bro
                    807581
                    Thanks.

                    The connection->start() refers to a standard Java Connection object and its start method, nothing custom. I did post on their forum at the same time but you are much quicker to reply. :)

                    I have an existing OpenMQ broker cluster which I woudl be using for failover capabilities. Would this still work, and if so would I need to change very much in the code for the broker cluster?

                    Thanks. Sorry, I'm a newbie to JMS messaging in general, so all of this is helping me out immensely!
                    • 7. Re: ActiveMQ-CPP client hangs when connect ActiveMQ-CPP client & OpenMQ bro
                      807581
                      Except in MQCreateConnection to specify the broker host/port information, the C client code that works for single broker works in cluster as well.
                      • 8. Re: ActiveMQ-CPP client hangs when connect ActiveMQ-CPP client & OpenMQ bro
                        807581
                        One more question: how can I see the messages the broker/client are sending to each other, like the CONNECT request, etc.?

                        I want to see if the OpenMQ broker is sending back a CONNECT confirmation after it creates the connection to the ActiveMQ-CPP client. On the client itself, I can so this by appending "&transport.commandTracingEnabled=true" to the URI when it connects to the broker, but I imagine it is different for the broker. I set the debugging level to DEBUGHIGH but it still doesn't show these commands.

                        Thanks.
                        • 9. Re: ActiveMQ-CPP client hangs when connect ActiveMQ-CPP client & OpenMQ bro
                          807581
                          OpenMQ STOMP service uses Java logging, you can configure the STOMP service logging level in Java logging properties file, please see
                          http://docs.sun.com/app/docs/doc/821-1794/gjzqa?l=en&q=message+queue&a=view

                          e.g. imq.bridge.stomp.level = FINE

                          On transport, you would need to do TCP snoop.

                          On MQ C-API, if you are going do application layer connection failover, please make sure you call MQCloseConnection then MQCreateConnection in a separate thread from the thread that called connection exception listener callback. After connection recreation, then re-create other MQ handles your application uses, e.g. MQSessionHandle, MQProducerHandle, MQConsumerHandle ..
                          • 10. Re: ActiveMQ-CPP client hangs when connect ActiveMQ-CPP client & OpenMQ bro
                            807581
                            Through debugging, I have found that after receiving a CONNECT frame and creating a connection for the ActiveMQ-CPP client via STOMP, it sends back a frame to the client that it cannot unmarshal, most likely due to the format of the frame. I believe this is causing most of the problem. Is there anything that can be done about this in order for the broker and client to work together? Both OpenMQ broker and ActiveMQ-CPP client claim to stick to STOMP format standards, but one or both are not in this case. Please let me know what can be done in order to make them be able to communicate.

                            Thanks.
                            • 11. Re: ActiveMQ-CPP client hangs when connect ActiveMQ-CPP client & OpenMQ bro
                              807581
                              IIRC the Stomp frames in OpenMQ always use a content-length header while ActiveMQ uses this header for binary messages only, and uses a null byte terminator for text messages.
                              So I think that the ActiveMQ Stomp library times out because if waits for the null byte, which never is sent from the OpenMQ broker.
                              Hope this helps,
                              Michael
                              • 12. Re: ActiveMQ-CPP client hangs when connect ActiveMQ-CPP client & OpenMQ bro
                                807581
                                Any ideas on how to fake that it either has come in ActiveMQ or add it to OpenMQ?

                                Thanks.
                                • 13. Re: ActiveMQ-CPP client hangs when connect ActiveMQ-CPP client & OpenMQ bro
                                  807581
                                  The STOMP protocol at
                                  http://stomp.codehaus.org/Protocol

                                  indicates "It is recommended that MESSAGE frames include a content-length header which is a byte count for the length of the message body ... The frame still needs to be terminated with a null byte"

                                  MQ always includes a content-length header for MESSAGE frame as described in MQ Administration Guide and terminates a frame with a null byte which is not part of the message body - this is implemented consistently for both JMS TextMessage and JMS BytesMessage when sending to a STOMP client
                                  http://docs.sun.com/app/docs/doc/821-1794/gjdkc?l=en&q=message+queue&a=view

                                  I'd suggest contact ActiveMQ-CPP side with this information along with your debug finding.