Behavior, Content, Money – 3 Things you should never give away for free!!!

BCmoney MobileTV

Push-Messaging with JMS

Posted by bryan on January 21, 2010 in AJAX, E-Learning, Java, JavaScript, Web Services with 1 Comment


No Gravatar
Hiram Chirino, Logo for Apache ActiveMQ, http:...

Hiram Chirino, Logo for Apache ActiveMQ, http://activemq.apache.org/ Released under the Apache Software License 2.0 (Photo credit: Wikipedia)

While the finer details of new web standards like ActivityStrea.ms Realtime, PubSubHubbub and HTML5 Web Sockets are still being ironed out, the often-used, tried & true method of push messaging and real-time data integration in the Java community (not to mention larger developer community as a whole) has been Java Message Service (JMS).

JMS is a mainstay of the Java core and has been used for Web Service interoperability, as well as more innovative uses such as real-time and/or event-driven architecture (EDA) Web Applications.

In this article, I’d like to summarize some of my developments (i.e. joys and pains) with JMS in my current project, as well as summarize some best practices and lessons learned.

First I’ll start with some important findings, JMS is:

  1. best used as a supplement to other Web Service technologies (such as SOAP and REST)
  2. dangerous to use in transactional systems if you aren’t careful to synchronize messages
  3. not very useful if you want to send very large datasets at once, or, do quite expensive I/O operations or calculations between messages
  4. an excellent solution for real-time short “bursty” data exchange between disparate applications
  5. has a far too steep learning curve, largely due to the immense number of providers with JMS Server offerings, each with their own variations of the standard, API syntax, threading model, command-line controls, and/or custom features

Next, I would walk through a brief tutorial on using one particularly well-known JMS provider, Apache’s ActiveMQ

Installation

  1. Download the latest stable ActiveMQ release: http://activemq.apache.org/download.html
  2. Unzip to a location easy to access (for convenience, I use the recommended ACTIVEMQ_HOME environment variable, pointed to the download location which I drop in C:/Apps/apache-activemq-5.x on Windows or /home//Apps/apache-activemq-5.x on Unix-based systems (i.e. Mac, Linux)

Configuration
To configure, you should:

    1. uncomment the JMS endpoints for the protocols you want to use in “apache-activemq-5.x/conf/activemq.xml” (i.e. vm, TCP, HTTP, Stomp)
    2. Create a JNDI configuration (where to do this depends on your implementation, in an IDE you should zip, rename to “jndi-properties.jar” and include in library build path).
      An example of my JNDI configuration file is as follows:
      java.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactory
      
      # use the following property to configure the default connector
      java.naming.provider.url = tcp://localhost:61616
      
      # use the following property to specify the JNDI name the connection factory should appear as.  (i.e. queueConnectionFactory, topicConnectionFactory, ConnectionFactory)
      connectionFactoryNames = jms/ConnectionFactory
      
      # register some queues in JNDI using the form  EX: queue.[jndiName] = [physicalName]
      queue.MyQueue = s5.MyQueue
      
      # register some topics in JNDI using the form  EX: topic.[jndiName] = [physicalName]
      topic.MyTopic = s5.MyTopic
      

Running

          1. open a command-line or terminal to the ACTIVEMQ_HOME directory and type:
            activemq.bat
            -OR-
            ./activemq
            Your JMS server should now be running
          2. Receiving Messages
            open your favourite IDE or text editor, then copy & paste the following starter code:

            package jms;
            
            /**
             * The SimpleAsynchConsumer class consists only of a main
             * method, which receives one or more messages from a queue or
             * topic using asynchronous message delivery.  It uses the
             * message listener TextListener.  Run this program in
             * conjunction with SimpleProducer.
             *
             * Specify a queue or topic name on the command line when you run
             * the program. To end the program, type Q or q on the command
             * line.
             */
            import javax.jms.*;
            import javax.naming.*;
            import java.io.*;
            
            public class Receive {
                /**
                 * Main method.
                 *
                 * @param args     the destination name and type used by the
                 */
                public static void main(String[] args) {
                    String destName = null;
                    Context jndiContext = null;
                    ConnectionFactory connectionFactory = null;
                    Connection connection = null;
                    Session session = null;
                    Destination dest = null;
                    MessageConsumer consumer = null;
                    TextListener listener = null;
                    TextMessage message = null;
                    InputStreamReader inputStreamReader = null;
                    char answer = 'n';
            
                    if (args.length != 1) {
                        System.out.println("Program takes one argument: ");
                        System.exit(1);
                    }
            
                    destName = new String(args[0]);
                    System.out.println("Destination name is " + destName);
            
                    /**
                     * Create a JNDI API InitialContext object if none exists
                     * yet.
                     */
                    try {
                        jndiContext = new InitialContext();
                    } catch (NamingException e) {
                        System.out.println("Could not create JNDI API context: " +
                            e.toString());
                        System.exit(1);
                    }
            
                    /**
                     * Look up connection factory and destination.  If either
                     * does not exist, exit.  If you look up a
                     * TopicConnectionFactory or a QueueConnectionFactory,
                     * program behavior is the same.
                     */
                    try {
                        connectionFactory = (ConnectionFactory) jndiContext.lookup(
                                "jms/ConnectionFactory");
                        dest = (Destination) jndiContext.lookup(destName);
                    } catch (Exception e) {
                        System.out.println("JNDI API lookup failed: " + e.toString());
                        System.exit(1);
                    }
            
                    /**
                     * Create connection.
                     * Create session from connection; false means session is
                     * not transacted.
                     * Create consumer.
                     * Register message listener (TextListener).
                     * Receive text messages from destination.
                     * When all messages have been received, type Q to quit.
                     * Close connection.
                     */
                    try {
                        connection = connectionFactory.createConnection();
                        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                        consumer = session.createConsumer(dest);
                        listener = new TextListener();
                        consumer.setMessageListener(listener);
                        connection.start();
                        System.out.println("To end program, type Q or q, " +
                            "then ");
                        inputStreamReader = new InputStreamReader(System.in);
            
                        while (!((answer == 'q') || (answer == 'Q'))) {
                            try {
                                answer = (char) inputStreamReader.read();
                            } catch (IOException e) {
                                System.out.println("I/O exception: " + e.toString());
                            }
                        }
                    } catch (JMSException e) {
                        System.out.println("Exception occurred: " + e.toString());
                    } finally {
                        if (connection != null) {
                            try {
                                connection.close();
                            } catch (JMSException e) {
                            }
                        }
                    }
                }
            }
          3. save the project and compile it in the IDE or use the following commands to manually compile:

            javac -cp activemq-5.x-all.jar Receive.java
            java -cp activemq-5.x-all.jar Receive "MyApp"

            Your JMS Listener is now listening and waiting patiently to receive inbound JMS messages. Please leave the IDE output or console window open/active (i.e. do not stop the running process for now). We’ll get into how to stop the process and server later.
            (NOTE: in ActiveMQ, you can check the JMS Admin console to confirm a new topic or queue got created, depending on which code you chose to copy above)

Sending Messages

          1. in your favourite IDE or text editor, copy & paste the following starter code:
            package jms;
            
            /**
             * The SimpleProducer class consists only of a main method,
             * which sends several messages to a queue or topic.
             *
             * Run this program in conjunction with SimpleSynchConsumer or
             * SimpleAsynchConsumer. Specify a queue or topic name on the
             * command line when you run the program.  By default, the
             * program sends one message.  Specify a number after the
             * destination name to send that number of messages.
             */
            import javax.jms.*;
            import javax.naming.*;
            
            public class Send {
                /**
                 * Main method.
                 *
                 * @param args     the destination used by the example
                 *                 and, optionally, the number of
                 *                 messages to send
                 */
                public static void main(String[] args) {
                    final int NUM_MSGS;
            
                    if ((args.length < 1) || (args.length > 2)) {
                        System.out.println("Program takes one or two arguments: " +
                            " []");
                        System.exit(1);
                    }
            
                    String destName = new String(args[0]);
                    System.out.println("Destination name is " + destName);
            
                    if (args.length == 2) {
                        NUM_MSGS = (new Integer(args[1])).intValue();
                    } else {
                        NUM_MSGS = 1;
                    }
            
                    /**
                     * Create a JNDI API InitialContext object if none exists
                     * yet.
                     */
                    Context jndiContext = null;
            
                    try {
                        jndiContext = new InitialContext();
                    } catch (NamingException e) {
                        System.out.println("Could not create JNDI API context: " +
                            e.toString());
                        System.exit(1);
                    }
            
                    /**
                     * Look up connection factory and destination.  If either
                     * does not exist, exit.  If you look up a
                     * TopicConnectionFactory or a QueueConnectionFactory,
                     * program behavior is the same.
                     */
                    ConnectionFactory connectionFactory = null;
                    Destination dest = null;
            
                    try {
                        connectionFactory = (ConnectionFactory) jndiContext.lookup(
                                "jms/ConnectionFactory");
                        dest = (Destination) jndiContext.lookup(destName);
                    } catch (Exception e) {
                        System.out.println("JNDI API lookup failed: " + e.toString());
                        e.printStackTrace();
                        System.exit(1);
                    }
            
                    /**
                     * Create connection.
                     * Create session from connection; false means session is
                     * not transacted.
                     * Create producer and text message.
                     * Send messages, varying text slightly.
                     * Send end-of-messages message.
                     * Finally, close connection.
                     */
                    Connection connection = null;
                    MessageProducer producer = null;
            
                    try {
                        connection = connectionFactory.createConnection();
            
                        Session session =
                            connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                        producer = session.createProducer(dest);
            
                        TextMessage message = session.createTextMessage();
            
                        for (int i = 0; i < NUM_MSGS; i++) {
                            message.setText("This is message " + (i + 1));
                            System.out.println("Sending message: " + message.getText());
                            producer.send(message);
                        }
            
                        /**
                         * Send a non-text control message indicating end of
                         * messages.
                         */
                        producer.send(session.createMessage());
                    } catch (JMSException e) {
                        System.out.println("Exception occurred: " + e.toString());
                    } finally {
                        if (connection != null) {
                            try {
                                connection.close();
                            } catch (JMSException e) {
                            }
                        }
                    }
                }
            }
          2. save the project and compile it in the IDE or use the following commands to manually compile:

            javac -p activemq-5.x-all.jar Send.java
            java -p activemq-5.x-all.jar Send "MyApp" "Hellooooo NURSE!"

Check the logged outputs of your “Receive” JMS Listener’s console or IDE window. You should see a message, if not, please make sure you compiled the code properly and encountered no error messages.

You should be all set for robust Event-driven messaging now. Of course, the content of our basic test message was simply a “Hello, World!” type message, but could easily be a more complex XML document, or even JSON snippet, which would be more useful for WebApps in particular.

If you have trouble with any of these steps, you should consult the official ActiveMQ Getting Started guide and/or the Message forums.

Push Messaging to Web Apps
When using ActiveMQ, Push Messaging is accomplished via the AJAX client. It has fairly good browser support and degrades well. There are options for page refresh-based polling in the absolute worst case, but this practice is now shunned in favor of a more COMET-like Reverse AJAX approach. The basic requirement is to use a script tag as follows:





These simple 7 lines of JavaScript allows us to dynamically inject trusted Javascript behaviour from the same-origin server on supporting web browsers, and use a self-updating iFrame as the fallback where such behaviour is not fully supported. The contents of amq.js are dynamically created by the server-side AjaxServlet

Important notes for WebApp:

        1. Ensure web.xml looks something like this:
          <?xml version="1.0" encoding="UTF-8"?>
          <!--
              Licensed to the Apache Software Foundation (ASF) under one or more
              contributor license agreements.  See the NOTICE file distributed with
              this work for additional information regarding copyright ownership.
              The ASF licenses this file to You under the Apache License, Version 2.0
              (the "License"); you may not use this file except in compliance with
              the License.  You may obtain a copy of the License at
             
              http://www.apache.org/licenses/LICENSE-2.0
             
              Unless required by applicable law or agreed to in writing, software
              distributed under the License is distributed on an "AS IS" BASIS,
              WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
              See the License for the specific language governing permissions and
              limitations under the License.
          -->
          <!DOCTYPE web-app PUBLIC "-//Sun Microsystems, Inc.//DTD Web Application 2.3//EN" "http://java.sun.com/dtd/web-app_2_3.dtd">
          <web-app>
              <display-name>InteractiveSlides</display-name>
              <description>Interactive Slideshows based on S5, with real-time controls powered by JMS (via Apache ActiveMQ)</description>    
              <!-- context config -->
              <context-param>
                  <param-name>org.apache.activemq.brokerURL</param-name>
                  <param-value>tcp://localhost:61616</param-value>
                  <description>The URL of the Message Broker to connect to</description>
              </context-param>
              <context-param>
                  <param-name>org.apache.activemq.embeddedBroker</param-name>
                  <param-value>false</param-value>
                  <description>Whether we should include an embedded broker or not</description>
              </context-param>
          
              <resource-ref>
                  <description>Connection Factory</description>
                  <res-ref-name>jms/connectionFactory</res-ref-name>
                  <res-type>javax.jms.TopicConnectionFactory</res-type>
                  <res-auth>Container</res-auth>
              </resource-ref>
              <!-- Queue ref -->
              <resource-env-ref>
                  <resource-env-ref-name>jms/s5.MyQueue</resource-env-ref-name>
                  <resource-env-ref-type>javax.jms.Queue</resource-env-ref-type>
              </resource-env-ref>
              <!-- Topic ref -->
              <resource-env-ref>
                  <resource-env-ref-name>jms/s5.MyTopic</resource-env-ref-name>
                  <resource-env-ref-type>javax.jms.Topic</resource-env-ref-type>
              </resource-env-ref>    
          
          <!-- servlet mappings -->    
              <!-- the subscription REST servlet -->
              <servlet>
                  <servlet-name>AjaxServlet</servlet-name>
                  <servlet-class>org.apache.activemq.web.AjaxServlet</servlet-class>
                  <load-on-startup>1</load-on-startup>
                  <async-supported>true</async-supported>
              </servlet>
              <servlet>
                  <servlet-name>MessageServlet</servlet-name>
                  <servlet-class>org.apache.activemq.web.MessageServlet</servlet-class>
                  <load-on-startup>1</load-on-startup>
                  <async-supported>true</async-supported>
                  <!-- Uncomment this parameter if you plan to use multiple consumers over REST -->
                  <init-param>
                      <param-name>destinationOptions</param-name>
                      <param-value>consumer.prefetchSize=1</param-value>
                  </init-param> 
              </servlet>
              
              <servlet-mapping>
                  <servlet-name>AjaxServlet</servlet-name>
                  <url-pattern>/amq/*</url-pattern>
              </servlet-mapping>
              <servlet-mapping>
                  <servlet-name>MessageServlet</servlet-name>
                  <url-pattern>/message/*</url-pattern>
              </servlet-mapping>    
          
              <filter>
                <filter-name>session</filter-name>
                <filter-class>org.apache.activemq.web.SessionFilter</filter-class>
                <async-supported>true</async-supported>
              </filter>    
              <filter-mapping>
                <filter-name>session</filter-name>
                <url-pattern>/*</url-pattern>
              </filter-mapping>
          
              <listener>
                  <listener-class>org.apache.activemq.web.SessionListener</listener-class>
              </listener>
          </web-app>
          
        2. Ensure your JMS server’s configuration file (apache-activemq-5.4.2/conf/) looks something like this:
          <!--
              Licensed to the Apache Software Foundation (ASF) under one or more
              contributor license agreements.  See the NOTICE file distributed with
              this work for additional information regarding copyright ownership.
              The ASF licenses this file to You under the Apache License, Version 2.0
              (the "License"); you may not use this file except in compliance with
              the License.  You may obtain a copy of the License at
             
              http://www.apache.org/licenses/LICENSE-2.0
             
              Unless required by applicable law or agreed to in writing, software
              distributed under the License is distributed on an "AS IS" BASIS,
              WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
              See the License for the specific language governing permissions and
              limitations under the License.
          -->
          <beans
            xmlns="http://www.springframework.org/schema/beans"
            xmlns:amq="http://activemq.apache.org/schema/core"
            xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
            xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
            http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
              <!-- Allows us to use system properties as variables in this configuration file -->
              <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
                  <property name="locations">
                      <value>file:${activemq.base}/conf/credentials.properties</value>
                  </property>      
              </bean>
              <!-- 
                  The <broker> element is used to configure the ActiveMQ broker. 
              -->
              <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.base}/data" destroyApplicationContextOnStop="true"> 
                  <!--
          			For better performances use VM cursor and small memory limit.
          			For more information, see:
                      
                      http://activemq.apache.org/message-cursors.html
                      
                      Also, if your producer is "hanging", it's probably due to producer flow control.
                      For more information, see:
                      http://activemq.apache.org/producer-flow-control.html
                  -->              
                  <destinationPolicy>
                      <policyMap>
                        <policyEntries>
                          <policyEntry topic=">" producerFlowControl="true" memoryLimit="1mb">
                            <pendingSubscriberPolicy>
                              <vmCursor />
                            </pendingSubscriberPolicy>
                          </policyEntry>
                          <policyEntry queue=">" producerFlowControl="true" memoryLimit="1mb">
                            <!-- Use VM cursor for better latency
                                 For more information, see:
                                 
                                 http://activemq.apache.org/message-cursors.html
                                 
                            <pendingQueuePolicy>
                              <vmQueueCursor/>
                            </pendingQueuePolicy>
                            -->
                          </policyEntry>
                        </policyEntries>
                      </policyMap>
                  </destinationPolicy> 
                  <!-- 
                      The managementContext is used to configure how ActiveMQ is exposed in 
                      JMX. By default, ActiveMQ uses the MBean server that is started by 
                      the JVM. For more information, see: 
                      
                      http://activemq.apache.org/jmx.html 
                  -->
                  <managementContext>
                      <managementContext createConnector="false"/>
                  </managementContext>
                  <!-- 
                      Configure message persistence for the broker. The default persistence
                      mechanism is the KahaDB store (identified by the kahaDB tag). 
                      For more information, see: 
                      
                      http://activemq.apache.org/persistence.html 
                  -->
                  <persistenceAdapter>
                      <kahaDB directory="${activemq.base}/data/kahadb"/>
                  </persistenceAdapter>               
                  <!-- 
                      The transport connectors expose ActiveMQ over a given protocol to
                      clients and other brokers. For more information, see: 
                      
                      http://activemq.apache.org/configuring-transports.html 
                  -->
                  <transportConnectors>            
                      <!-- Create a TCP transport that is advertised via an IP multicast group named default. -->
                      <transportConnector name="openwire" uri="tcp://localhost:61616" />
                      <!-- Non-blocking Input/Output TCP connector for large-scale apps
                        <transportConnector name="nio" uri="nio://localhost:61616"/>			
                      -->			
                      <!-- Create an SSL transport. To use properly, make sure to configure the SSL options via the system properties or the sslContext element. -->
                      <transportConnector name="ssl" uri="ssl://localhost:61617"/>
                      <!-- Create an HTTP transport. For non-secure communications -->
                      <transportConnector name="http" uri="http://localhost:61618" />
                      <!-- Create a STOMP transport for cross-platform STOMP clients (AJAX, Flash, PHP, Python, Perl, Ruby, C#, etc). -->
                      <transportConnector name="stomp" uri="stomp://localhost:61613"/>
                      <!-- Create a XMPP transport. Useful for XMPP chat clients (i.e. within a department intranet). -->
                  <transportConnector name="xmpp" uri="xmpp://localhost:61222"/>
                </transportConnectors>
              </broker>
              <!-- 
                  Enable web consoles, REST and Ajax APIs and demos
                  
                  Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details 
              -->    
              <import resource="jetty.xml"/>   
          </beans>
          
        3. Compile project as a .war file
        4. Deploy project .war file to a J2EE Web Server with Servlet container and JSP support, for instance Jetty or Tomcat.
        5. Open JMS Admin console in a browser (if using a JMS provider other than ActiveMQ, please consult that provider’s documentation for the correct Admin URL, if any exists)
        6. Open the JMS Test Suite in a browser (suggest using localhost at first for testing connectivity within your own system/network, then try moving the web app to your own remote server and reaching your local JMS broker or vice versa – moving the JMS server to a remote host and reaching it from your local Web App and Server deployment; lastly, try using two separate remote servers – one for JMS, and one for the webapp – as this will be the best architecture to take advantage of distributed computing and the seperation of concerns).

 

You can’t really demo any of this online without a dedicated Java Server (and possibly a separate Message server for a Live Production application that can scale to high numbers of users; although ActiveMQ can perform quite well even when running on the same server for small-to-mid range apps). Where Java server hosting is notoriously expensive, I figured the best way to demo this was to get it running on AppEngine with a nice implementation of S5 to show the power of messaging services such as JMS (however had to settle for the limitations of the much more basic HTML5 WebMessaging API rather than ActiveMQ). I encourage you to try this all out, the best way is really to just download it and run it locally on your own laptop/desktop yourself, before thinking about alternative uses:


-OR-

Last but not least, feel free to fork the entire project on GIT and create something cool! The code is Public Domain but I encourage you to send pull requests if you make your own updates, so that the app can remain as killer (all-killer and no filler) a demonstration of real-time messaging for E-Learning webapps as possible.