Saturday, January 31, 2009

MessageListener Exception Handling.

This article talks about exception handling inside a message listener's onMessage() method:

public void onMessage(Message message);

The asynchronous API does not allow an application to throw any checked exception, and the JMS API JavaDoc specification says as follows.

"It is a client programming error for a MessageListener to throw an exception."

The above statement basically tells the JMS messaging providers to handle the (runtime) exception thrown from a message listener in their own way (undefined).

Below are some of possibilities that a messaging provider can do when received an exception thrown from the onMessage() method.

  • Set the message redelivery flag to true and redelivery the message to the message listener. The number of times a provider will attempt to redeliver a message is undefined.
  • Send the message to the provider defined dead-message-queue and continue delivering the next message.
  • Stopped message delivery to the listener.

While a messaging provider may define and specify the behavior, an application should avoid tying with a specific vendor whenever possible. It would be probably a better choice to resolve application's business logic precisely and not rely a messaging provider to guess an error from the application.

An application may consider to define and configure an error queue. An error queue is a destination used to store messages that an application can not process. When a message listener encounters an exception while processing a message, the application could send the "trouble message" to the error queue. The application can then acknowledge and continue processing the next message delivered from the messaging provider. This ensures that messages continue to flow with a predicted behavior.

Messages in the error queue can be replayed at a later time to help identifying the root cause of the exception.

A runtime exception thrown from a message listener could cause unpredictable behavior from a messaging provider and thus should be avoided. A good programming practice is to handle any possible exceptions that could be generated from within the message listener.

Friday, January 30, 2009

JMS Synchronous Message Consumption

A JMS message consumer can consume messages synchronously by calling one of the MessageConsumer's receive() methods:
  • receive()
  • receive (long timeout)
  • receiveNoWait();
Calling receive() or receive (0) will block until there is a message available. This is the easiest form to receive a message synchronously. The only disadvantage is that the application thread is blocked indefinitely.

Receive with timeout and receiveNoWait are the two alternative API to consume messages synchronously. But applications should be aware of the intended contract and behavior of the APIs. Applications may not function reliably or consistently if they are not used properly.

1. receive (long timeout)

The JMS API JavaDoc for the API is quoted as follows.

"Receives the next message that arrives within the specified timeout interval.

This call blocks until a message arrives, the timeout expires, or this message consumer is closed. A timeout of zero never expires, and the call blocks indefinitely."

One important thing to note is the timeout value. This is the maximum wait time that the application should be expecting to wait. The call returns immediately if there is a message available. The call may block up to the specified timeout value if no message is available.

Some books and articles provide misleading examples that would cause the application behaves inconsistently. For example, the following code used a timeout value of one milli second that is likely to cause the application's behavior in-deterministic.

//create a queue instance
dest = session.createQueue(destName);

//create a message consumer to receive messages on dest.
MessageConsumer consumer = session.createConsumer(dest);

//tell the connection I am ready to receive the reply
conn.start();

//receive a message with the max timeout of 1 milli second.
Message rm = consumer.receive(1);

if (rm == null) {
System.out.println ("\n No message is received:\n ");
} else {
System.out.println ("\n Message received: " + rm);
}

The above code is likely to result in a null value be returned for the receive(1) call even if there is a message on the destination. The reason is that the timer may start from the moment receive method is called, and the timeout value is also reached immediately. If there is no message available within one milli second to the application, the API returns a null value. (The message may be still in network transition from the server to the message consumer.)

Even though different messaging vendors may internally implement in a different way, it is always a good practice to code defensively.

Application should always assume the timer starts when receive(timeout) is called, and a null value is returned if timeout expired before a message is available, if any.

A good practice to use timed synchronous receive is to consider the following suggestions.
  • specify a "reasonable" max timeout value whenever possible.
  • the receive (timeout) call should always be revisited (call again) if a small timeout value is specified.
  • a small timeout value in a loop could possibly cause tight cpu loop if no other wait/sleep in the loop.
  • avoid (re)creating a new synchronous message consumer for each receive (timeout) call.
The following is an example to use the timed receive API.

//create a queue instance
dest = session.createQueue(destName);

//create a message consumer to receive messages on dest.
MessageConsumer consumer = session.createConsumer(dest);

//tell the connection I am ready to receive the reply
conn.start();

while ( !done ) {
//receive a message with the max timeout of 3 seconds.
Message rm = consumer.receive(3000);
//process business login
...
}


2. receiveNoWait().

The behavior of this API is similar to the timed receive API with a small timeout value. The programming practice should follow the above suggestions to ensure reliable messaging behavior.

Thursday, January 29, 2009

JMS Request-Response Messaging

JMS applications are often used to create event-driven style applications where an application creates a message listener and just continues on its own business. When a message arrives, the message listener's onMessage(Message m) method is called and the message is passed to the application as the method's parameter.

There are occasions when applications require using a request-response style messaging. A message producer sends a message containing a request and waits for the message consumer to respond. The producer determines its subsequent actions based on the response message.

The JMS API provides two simple classes, QueueRequestor and TopicRequestor, that are useful for implementing the request-response style messaging. However, these classes only provide an API that blocks on waiting for the response message until the response message is received. An application could block and "hang" if the response message never arrives.

This article introduces a simple mechanism to implement request-response style messaging with a timed wait on the response message. A TemporaryDestination (refers to a TemporaryQueue or a TemporaryTopic) can be used as the mechanism to implement this requirement.

A TemporaryDestination has the following characteristics.

  • It is created from the Session or its subclass. It is a destination object that does not require using the administrator's utility.
  • It has the lifetime of the connection in which it was created.
  • It is (in practice) set in a message using the Message.setJMSReplyTo() API.
  • It is (in practice) obtained using the Message.getJMSReplyTo() API.
  • It can only be consumed by the same connection that created it.

The following example uses a TemporaryQueue mechanism to implement a Ping utility. The Ping utility can be used to check if a queue message listener is active.

The example consists of three components.

1. The Ping message.

A simple ping message is defined as follows so that the message listener can tell that it received a ping message (not a business message).

A Ping Message is a Message type that contains a boolean property named "emessaging_ping_message" with its value set to true.


2. The message listener.

When a listener receives a ping message, it simply sends a ping reply to the temporary topic obtained from the ping message.

The pseudo code looks like this:

onMessage (Message msg) {
if (msg == PingMessage) {

//get the destination instance where the ping reply is to be sent.
Destination dest = msg.getJMSReplyTo();

//create a producer instance
producer = session.createProducer(dest);

//create ping reply message
Message pingReply = session.createMessage();
pingReply.setBooleanProperty(EMPing.PING_MESSAGE_REPLY, true);


producer.send(pingReply);
} else {
process_msg;
}
}


3. The Ping program.

This is a simple JMS application that sends a message to a destination and expects a response from the consumer (message listener).

The pseudo code looks like this:
//create a message producer on the destination
MessageProducer producer = session.createProducer(dest);

//create a ping message
Message m = session.createMessage();
//set ping message property
m.setBooleanProperty(PING_MESSAGE, true);

//create a temporary destination
TemporaryTopic respDest = session.createTemporaryTopic();

//set destination for the ping_reply message
m.setJMSReplyTo(respDest);

//create a message consumer to receive the ping_reply message
//This statement must be in front of the send statement
MessageConsumer consumer = session.createConsumer(respDest);

//tell the connection I am ready to receive the reply
conn.start();

//send the ping message
producer.send(m);

//receive ping reply message with max timeout 10 seconds.
Message rm = consumer.receive(10000);

if (rm == null) {
System.out.println ("\nlistener on destination is non-responsive:\n ");
} else {
System.out.println ("\nlistener on destination is alive.");
}


The complete code example is attached to this article.