Monday, 26 October 2009

JMS: Reliable and robust applications - Part 2



1. Introduction
This post explains how to use features of the JMS API to achieve the level of reliability and performance your application requires. All this content is taken from my summaries based on the specification.

2.     Basic Reliability Mechanisms

2.1 Controlling Message Acknowledgment
Until a JMS message has been acknowledged, it is not considered to be successfully consumed; this ordinarily takes place in three steps.

1. The client receives the message.
2. The client processes the message.
3. The message is acknowledged. Acknowledgment is initiated either by the JMS provider or by the client, depending on the session acknowledgment mode.

Let’s pay attention in the following:
      
QueueSession createQueueSession(boolean transacted, int acknowledgeMode)                               
         throws JMSException
TopicSession createTopicSession(boolean transacted, int acknowledgeMode)
          throws JMSException


If the first parameter is true, the session is transacted and the acknowledgment happens automatically when a transaction is committed. If a transaction is rolled back, all consumed messages are redelivered.

If the first parameter is false, the session is nontransacted and the value in the parameter acknowledgeMode indicates whether the consumer or the client will acknowledge any messages it receives, ignored if the session is transacted.

Session.AUTO_ACKNOWLEDGE. The session automatically acknowledges a client’s receipt of a message either when the client has successfully returned from a call to receive or when the MessageListener it has called to process the message returns successfully. A synchronous receive in an AUTO_ACKNOWLEDGE session is the exception to the rule that message consumption is a three-stage process. In this case, the receipt and acknowledgment take place in one step, followed by the processing of the message.

Session.CLIENT_ACKNOWLEDGE. A client acknowledges a message by calling the message’s acknowledge method. In this mode, acknowledgment takes place on the session level: Acknowledging a consumed message automatically acknowledges the receipt of all messages that have been consumed by its session. For example, if a message consumer consumes ten messages and then acknowledges the fifth message delivered, all ten messages are acknowledged.

Session.DUPS_OK_ACKNOWLEDGE. This option instructs the session to lazily acknowledge the delivery of messages. This is likely to result in the delivery of some duplicate messages if the JMS provider fails, so it should be used only by consumers that can tolerate duplicate messages. This option can reduce session overhead by minimizing the work the session does to prevent duplicates.

If messages have been received but not acknowledged when a QueueSession terminates, the JMS provider retains them and redelivers them when a consumer next accesses the queue. The provider also retains unacknowledged messages for a terminated TopicSession with a durable TopicSubscriber. Unacknowledged messages for a nondurable TopicSubscriber are dropped when the session is closed.

2.2  Specifying Message Persistence
The JMS API supports two delivery modes for messages to specify whether messages are lost if the JMS provider fails. These delivery modes are fields of the javax.jms.DeliveryMode interface.

• The PERSISTENT delivery mode (by default), instructs the JMS provider to take extra care to ensure that a message is not lost in transit in case of a JMS provider failure. A message sent with this delivery mode is logged to stable storage when it is sent.

• The NON_PERSISTENT delivery mode does not require the JMS provider to store the message or otherwise guarantee that it is not lost if the provider fails. This mode may improve performance and reduce storage overhead, but you should use it only if your application can afford to miss messages.
      
2.3 Setting Message Priority Levels
You can use message priority levels (0 lowest to 9 highest, 4 default) to instruct the JMS provider to deliver urgent messages first. You can set the priority level in either of two ways.

2.4 Allowing Messages to Expire
By default, a message never expires. If a message will become obsolete after a certain period, however, you may want to set an expiration time. You can do this in either of two ways.

2.5 Creating Temporary Destinations
The JMS API enables you to create destinations:

QueueSession.createTemporaryQueue

TopicSession.createTemporary

They last only for the duration of the connection in which they are created. The only message consumers that can consume from a temporary destination are those created by the same connection that created the destination. Any message producer can send to the temporary destination. If you close the connection that a temporary destination belongs to, the destination is closed and its contents lost.

You can use temporary destinations to implement a simple request/reply mechanism (see JMSReplyTo, JMSCorrelationID, JMSMessageID).

3.     Advanced Reliability Mechanisms

3.1 Creating Durable Subscriptions
To make sure that a pub/sub application receives all published messages, use PERSISTENT delivery mode for the publishers. In addition, use durable subscriptions for the subscribers.

The TopicSession.createSubscriber method creates a nondurable subscriber and can receive only messages that are published while it is active.







The TopicSession.createDurableSubscriber method creates a durable subscriber. A durable subscription can have only one active subscriber at a time.








A durable subscriber registers a durable subscription with a unique identity that is retained by the JMS provider. Subsequent subscriber objects with the same identity resume the subscription in the state in which it was left by the previous subscriber. If a durable subscription has no active subscriber, the JMS provider retains the subscription’s messages until they are received by the subscription or until they expire.
You establish the unique identity of a durable subscriber by setting the following:

• A client ID for the connection
• A topic and a subscription name for the subscriber

With a durable subscriber, the subscriber can be closed and recreated, but the subscription continues to exist and to hold messages until the application calls the unsubscribe method.

3.2 Using JMS API Local Transactions
You can group a series of operations together into an atomic unit of work called a transaction. If any one of the operations fails, the transaction can be rolled back, and the operations can be attempted again from the beginning. If all the operations succeed, the transaction can be committed.

In a JMS client, you can use local transactions to group message sends and receives. The JMS API Session interface provides commit and rollback methods that you can use in a JMS client. A transaction commit means that all produced messages are sent and all consumed messages are acknowledged. A transaction rollback means that all produced messages are destroyed and all consumed messages are recovered and redelivered unless they have expired.

A transacted session is always involved in a transaction. As soon as the commit or the rollback method is called, one transaction ends and another transaction begins. Closing a transacted session rolls back its transaction in progress, including any pending sends and receives.






In an Enterprise JavaBeans component, you cannot use the Session.commit and Session.rollback methods. Instead, you use distributed transactions.

You can combine several sends and receives in a single JMS API local transaction. If you do so, you need to be careful about the order of the operations. You will have no problems if the transaction consists of all sends or all receives or if the receives come before the sends. But if you try to use a request-reply mechanism, whereby you send a message and then try to receive a reply to the sent message in the same transaction, the program will hang, because the send cannot take place until the transaction is committed. Because a message sent during a transaction is not actually sent until the transaction is committed, the transaction cannot contain any receives that depend on that message’s having been sent.

It is also important to note that the production and the consumption of a message cannot both be part of the same transaction. The reason is that the transactions take place between the clients and the JMS provider, which intervenes between the production and the consumption of the message.

Another way of putting this is that the act of producing and/or consuming messages in a session can be transactional, but the act of producing and consuming a specific message across different sessions cannot be transactional.

Because the commit and the rollback methods for local transactions are associated with the session, you cannot combine queue and topic operations in a single transaction.
You can, however, receive from one queue and send to another queue in the same transaction, assuming that you use the same Queue-Session to create the QueueReceiver and the QueueSender. You can pass a client program’s session to a message listener’s constructor function and use it to create a message producer, so that you can use the same session for receives and sends in asynchronous message consumers.

Friday, 16 October 2009

JMS: Basic Concepts - Part 1



1. Introduction
My intention with this post is to summarize the most important ideas behind of JMS. As always, I am not trying to reinvent the wheel and all the notes are taken from the specification.


2. What Is the JMS API?

The Java Message Service is a Java API that allows applications to create, send, receive, and read messages.
The JMS API enables distributed communication that is not only loosely coupled but also

Asynchronous. A JMS provider can deliver messages to a client as they arrive; a client does not have to request messages in order to receive them.

Reliable. The JMS API can ensure that a message is delivered once and only once.

3.  When Can You Use the JMS API?
An enterprise application provider is likely to choose a messaging API over a tightly coupled API, such as Remote Procedure Call (RPC), under the following circumstances.

• The provider wants the components not to depend on information about other components’ interfaces, so that components can be easily replaced.
• The provider wants the application to run whether or not all components are up and running simultaneously.
• The application business model allows a component to send information to another and to continue to operate without receiving an immediate response.


4. JMS API Architecture
A JMS application is composed of the following parts.

JMS provider is a messaging system that implements the JMS interfaces and provides administrative and control features. An implementation of the J2EE platform at release 1.3 includes a JMS provider.

JMS clients are the programs or components, written in the Java programming language, that produce and consume messages.

Messages are the objects that communicate information between JMS clients.

Administered objects are preconfigured JMS objects created by an administrator for the use of clients.

Native clients are programs that use a messaging product’s native client API instead of the JMS API. An application first created before the JMS API became available and subsequently modified is likely to include both JMS and native clients.





5.  Messaging Domains
Point-to-Point Messaging Domain
Each message is addressed to a specific queue, and receiving clients extract messages from the queue(s) established to hold their messages.
Queues retain all messages sent to them until the messages are consumed or until the messages expire.




PTP messaging has the following characteristics:

• Each message has only one consumer.
• A sender and a receiver of a message have no timing dependencies. The receiver can fetch the message whether or not it was running when the client sent the message.
• The receiver acknowledges the successful processing of a message.

Publish/Subscribe Messaging Domain
Publishers and subscribers are generally anonymous and may dynamically publish or subscribe to the content hierarchy. The system takes care of distributing the messages arriving from a topic’s multiple publishers to its multiple subscribers. Topics retain messages only as long as it takes to distribute them to current subscribers.




Pub/sub messaging has the following characteristics.

• Each message may have multiple consumers.
• Publishers and subscribers have a timing dependency. A client that subscribes to a topic can consume only messages published after the client has created a subscription, and the subscriber must continue to be active in order for it to consume messages.

The JMS API relaxes this timing dependency to some extent by allowing clients to create durable subscriptions. Durable subscriptions can receive messages sent while the subscribers are not active. Durable subscriptions provide the flexibility and reliability of queues but still allow clients to send messages to many recipients.


6.  Message Consumption
Messaging products are inherently asynchronous in that no fundamental timing dependency exists between the production and the consumption of a message. However, the JMS Specification uses this term in a more precise sense. Messages can be consumed in either of two ways:

Synchronously. A subscriber or a receiver explicitly fetches the message from the destination by calling the receive method. The receive method can block until a message arrives or can time out if a message does not arrive within a specified time limit.

Asynchronously. A client can register a message listener with a consumer. A message listener is similar to an event listener. Whenever a message arrives at the destination, the JMS provider delivers the message by calling the listener’s onMessage method, which acts on the contents of the message.

7.Main elements in JMS application

Administered Objects

Two parts of a JMS application—destinations and connection factories—are best maintained administratively rather than programmatically. The technology underlying these objects is likely to be very different from one implementation of the JMS API to another. Therefore, the management of these objects belongs with other administrative tasks that vary from provider to provider.

JMS clients access these objects through interfaces that are portable, so a client application can run with little or no change on more than one implementation of the JMS API.

Connection Factories
A connection factory is the object a client uses to create a connection with a provider.
A connection factory encapsulates a set of connection configuration parameters that has been defined by an administrator.


  • Context ctx = new InitialContext();

             

  • QueueConnectionFactory queueConnectionFactory = (QueueConnectionFactory) ctx.lookup("QueueConnectionFactory");

             

  • TopicConnectionFactory topicConnectionFactory = (TopicConnectionFactory) ctx.lookup("TopicConnectionFactory");


Destinations
A destination is the object a client uses to specify the target of messages it produces
and the source of messages it consumes.


  • Topic myTopic = (Topic) ctx.lookup("MyTopic");



  • Queue myQueue = (Queue) ctx.lookup("MyQueue");


Connections
A connection encapsulates a virtual connection with a JMS provider. A connection could represent an open TCP/IP socket between a client and a provider service daemon. You use a connection to create one or more sessions.


  • QueueConnection queueConnection = queueConnectionFactory.createQueueConnection();

             

  • TopicConnection topicConnection = topicConnectionFactory.createTopicConnection();


Sessions
A session is a single-threaded context for producing and consuming messages. You use sessions to create message producers, message consumers, and messages. Sessions serialize the execution of message listeners.
A session provides a transactional context with which to group a set of sends and receives into an atomic unit of work.


  • TopicSession topicSession = topicConnection.createTopicSession(false,

                     Session.AUTO_ACKNOWLEDGE);
             

  • QueueSession queueSession = queueConnection.createQueueSession(true, 0);


Message Producers
A message producer is an object created by a session and is used for sending messages to a destination.


  • QueueSender queueSender = queueSession.createSender(myQueue);



  • TopicPublisher topicPublisher = topicSession.createPublisher(myTopic);


Once you have created a message producer, you can use it to send messages.


  • queueSender.send(message);



  • topicPublisher.publish(message);


Message Consumers
A message consumer is an object created by a session and is used for receiving messages sent to a destination. A message consumer allows a JMS client to register interest in a destination with a JMS provider. The JMS provider manages the delivery of messages from a destination to the registered consumers of the destination.


  • QueueReceiver queueReceiver = queueSession.createReceiver(myQueue);



  • TopicSubscriber topicSubscriber = topicSession.createSubscriber(myTopic);


You use the TopicSession.createDurableSubscriber method to create a durable topic subscriber.

Message Listeners
A message listener is an object that acts as an asynchronous event handler for messages.
This object implements the MessageListener interface, which contains one method, onMessage. In the onMessage method, you define the actions to be taken when a message arrives.
You register the message listener with a specific QueueReceiver or TopicSubscriber by using the setMessageListener method. For example, if you define a class named TopicListener that implements the MessageListener interface, you can register the message listener as follows:


  • TopicListener topicListener = new TopicListener();



  • topicSubscriber.setMessageListener(topicListener);


A message listener is not specific to a particular destination type. The same listener can obtain messages from either a queue or a topic, depending on whether the listener is set by a QueueReceiver or a TopicSubscriber object. A message listener does, however, usually expect a specific message type and format. Moreover, if it needs to reply to messages, a message listener must either assume a particular destination type or obtain the destination type of the message and create a producer for that destination type.

Your onMessage method should handle all exceptions. It must not throw checked exceptions, and throwing a RuntimeException, though possible, is considered a programming error.
The session used to create the message consumer serializes the execution of all message listeners registered with the session. At any time, only one of the session’s message listeners is running.
In the J2EE 1.3 platform, a message-driven bean is a special kind of message listener. For details, see Section 6.2 on page 75.

Exception Handling
The root class for exceptions thrown by JMS API methods is JMSException. Catching JMSException provides a generic way of handling all exceptions related to the JMS API.

8. Examples

Example 1 - Sending Messages to a Queue
/*
       * Look up connection factory and queue. If either does
       * not exist, exit.
       */
       try {
              queueConnectionFactory = (QueueConnectionFactory)
              jndiContext.lookup("QueueConnectionFactory");
              queue = (Queue) jndiContext.lookup(queueName);
       } catch (NamingException e) {
              ......................
       }
       /*
       * Create connection.
       * Create session from connection; false means session is not transacted.
       * Create sender and text message.
       * Send messages, varying text slightly.
       * Send end-of-messages message.
       * Finally, close connection.
       */
       try {
              queueConnection = queueConnectionFactory.createQueueConnection();
              queueSession =
queueConnection.createQueueSession(false,
Session.AUTO_ACKNOWLEDGE);
              queueSender = queueSession.createSender(queue);
              message = queueSession.createTextMessage();
              for (int i = 0; i < NUM_MSGS; i++) {
                           message.setText("This is message " + (i + 1));
                           queueSender.send(message);
              }
              /*
               * Send a non-text control message indicating end of
               * messages.
               */
              queueSender.send(queueSession.createMessage());
       } catch (JMSException e) {
                     .............
       } finally {  
                     if (queueConnection != null) {
                           try {
                                  queueConnection.close();
                           } catch (JMSException e) {
                                  .............
}
                     }
       }

     
Example 2 - Publishing Messages to a Topic

/*
* Look up connection factory and topic. If either does
      * not exist, exit.
      */
try {
                     topicConnectionFactory = (TopicConnectionFactory)
                     jndiContext.lookup("TopicConnectionFactory");
                     topic = (Topic) jndiContext.lookup(topicName);
              } catch (NamingException e) {
                     ............
              }
       /*
       * Create connection.
       * Create session from connection; false means session is
       * not transacted.
       * Create publisher and text message.
       * Send messages, varying text slightly.
       * Finally, close connection.
       */
       try {
              topicConnection =
                     topicConnectionFactory.createTopicConnection();
              topicSession =
topicConnection.createTopicSession(false,
                           Session.AUTO_ACKNOWLEDGE);
              topicPublisher = topicSession.createPublisher(topic);
              message = topicSession.createTextMessage();
              for (int i = 0; i < NUM_MSGS; i++) {
                           message.setText("This is message " + (i + 1));
                           topicPublisher.publish(message);
              }
       } catch (JMSException e) {
              ............
       } finally {
              if (topicConnection != null) {
                     try {
                           topicConnection.close();
                     } catch (JMSException e) {
                           ............
                     }
              }
       }