Processing messages in transactions with Spring JMS
Integration, Spring, Spring JMS ·This post shows how to process messages in transactions with Spring JMS. We will see how an error in the execution of the consumer during the asynchronous reception of messages with JMS, can lead to the loss of messages. I then will explain how you can solve this problem using local transactions.
You will also see that this solution can cause in some cases, message duplication (for example, when it saves the message to the database and then the listener execution fails). The reason why this happens is because the JMS transaction is independent to other transactional resources like the DB. If your processing is not idempotent or if your application does not support duplicate message detection, then you will have to use distributed transactions.
Distributed transactions are beyond the scope of this post. If you are interested in handling distributed transactions, you can read Distributed transactions in Spring.
1 Use cases
I’ve implemented a test application that reproduces the following cases:
1 Sending and reception of a message: The consumer will process the received message, storing it to a database.
-
The producer sends the message to a queue:
-
The consumer retrieves the message from the queue and processes it:
2 Error occurred before message processing: The consumer retrieves the message but the execution fails before storing it to the DB.
3 Error occurred after processing the message: The consumer retrieves the message, stores it to the DB, and then the execution fails.
The source code for this application can be found at my Github repository.
2 The test application
The test application executes two test classes, TestNotTransactedMessaging and TestTransactedMessaging. These classes will both execute the three cases above described.
Let’s see the configuration of the application when it is executed without transactions.
app-config.xml
Application configuration. Basically it checks within the indicated packages to autodetect the application beans: producer and consumer. It also configures the in-memory database where processed notifications will be stored.
<context:component-scan base-package="xpadro.spring.jms.producer, xpadro.spring.jms.receiver"/> <bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate"> <constructor-arg ref="dataSource"/> </bean> <jdbc:embedded-database id="dataSource"> <jdbc:script location="classpath:db/schema.sql" /> </jdbc:embedded-database>
notx-jms-config.xml
Configures the JMS infrastructure, which is:
- Broker connection
- The JmsTemplate
- Queue where notifications will be sent
- The listener container that will send notifications to the listener to process them
<!-- Infrastructure --> <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="vm://embedded?broker.persistent=false"/> </bean> <bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <property name="targetConnectionFactory" ref="connectionFactory"/> </bean> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="cachingConnectionFactory"/> <property name="defaultDestination" ref="incomingQueue"/> </bean> <!-- Destinations --> <bean id="incomingQueue" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg value="incoming.queue"/> </bean> <!-- Listeners --> <jms:listener-container connection-factory="connectionFactory"> <jms:listener ref="notificationProcessor" destination="incoming.queue"/> </jms:listener-container>
The producer simply uses the jmsTemplate to send notifications:
@Component("producer") public class Producer { private static Logger logger = LoggerFactory.getLogger(Producer.class); @Autowired private JmsTemplate jmsTemplate; public void convertAndSendMessage(String destination, Notification notification) { jmsTemplate.convertAndSend(destination, notification); logger.info("Sending notification | Id: "+notification.getId()); } }
The listener is responsible for retrieval of notifications from the queue and stores them to the database:
@Component("notificationProcessor") public class NotificationProcessor implements MessageListener { private static Logger logger = LoggerFactory.getLogger(NotificationProcessor.class); @Autowired private JdbcTemplate jdbcTemplate; @Override public void onMessage(Message message) { try { Notification notification = (Notification) ((ObjectMessage) message).getObject(); logger.info("Received notification | Id: "+notification.getId()+" | Redelivery: "+getDeliveryNumber(message)); checkPreprocessException(notification); saveToBD(notification); checkPostprocessException(message, notification); } catch (JMSException e) { throw JmsUtils.convertJmsAccessException(e); } } ... }
The checkPreprocessException method will throw a runtime exception when a notification with id=1 arrive. In this way, we will cause an error before storing the message to the DB.
The checkPostprocessException method will throw an exception if a notification with id=2 arrive, thereby causing an error just after storing it to the DB.
The getDeliveryNumber method returns the number of times the message has been sent. This only applies within transactions, since the broker will try to resend the message after listener processing failure led to a rollback.
Finally, the saveToDB method is pretty obvious. It stores a notification to the DB.
You can always check the source code of this application by clicking the link at the beginning of this article.
3 Testing message reception without transactions
I will launch two test classes, one without transactions and the other within a local transaction. Both classes extend a base class that loads the common application context and contains some utility methods:
@ContextConfiguration(locations = {"/xpadro/spring/jms/config/app-config.xml"}) @DirtiesContext public class TestBaseMessaging { protected static final String QUEUE_INCOMING = "incoming.queue"; protected static final String QUEUE_DLQ = "ActiveMQ.DLQ"; @Autowired protected JdbcTemplate jdbcTemplate; @Autowired protected JmsTemplate jmsTemplate; @Autowired protected Producer producer; @Before public void prepareTest() { jdbcTemplate.update("delete from Notifications"); } protected int getSavedNotifications() { return jdbcTemplate.queryForObject("select count(*) from Notifications", Integer.class); } protected int getMessagesInQueue(String queueName) { return jmsTemplate.browse(queueName, new BrowserCallback<Integer>() { @Override public Integer doInJms(Session session, QueueBrowser browser) throws JMSException { Enumeration<?> messages = browser.getEnumeration(); int total = 0; while (messages.hasMoreElements()) { messages.nextElement(); total++; } return total; } }); } }
The utility methods are explained below:
- getSavedNotifications: Returns the number of notifications stored to the DB. I’ve used the queryForObject method because it is the recommended since version 3.2.2. The queryForInt method has been deprecated.
- getMessagesInQueue: Allows you to check which messages are still pending in the specified queue. For this test we are interested in knowing how many notifications are still waiting to be processed.
Now, let me show you the code for the first test (TestNotTransactedMessaging). This test launches the 3 cases indicated at the beginning of the article:
@Test public void testCorrectMessage() throws InterruptedException { Notification notification = new Notification(0, "notification to deliver correctly"); producer.convertAndSendMessage(QUEUE_INCOMING, notification); Thread.sleep(6000); printResults(); assertEquals(1, getSavedNotifications()); assertEquals(0, getMessagesInQueue(QUEUE_INCOMING)); } @Test public void testFailedAfterReceiveMessage() throws InterruptedException { Notification notification = new Notification(1, "notification to fail after receiving"); producer.convertAndSendMessage(QUEUE_INCOMING, notification); Thread.sleep(6000); printResults(); assertEquals(0, getSavedNotifications()); assertEquals(0, getMessagesInQueue(QUEUE_INCOMING)); } @Test public void testFailedAfterProcessingMessage() throws InterruptedException { Notification notification = new Notification(2, "notification to fail after processing"); producer.convertAndSendMessage(QUEUE_INCOMING, notification); Thread.sleep(6000); printResults(); assertEquals(1, getSavedNotifications()); assertEquals(0, getMessagesInQueue(QUEUE_INCOMING)); } private void printResults() { logger.info("Total items in \"incoming\" queue: "+getMessagesInQueue(QUEUE_INCOMING)); logger.info("Total items in DB: "+getSavedNotifications()); }
4 Executing the test
Ok, let’s execute the test and see what the results are:
testCorrectMessage output:
NotificationProcessor|Received notification | Id: 0 | Redelivery: 1
TestNotTransactedMessaging|Total items in “incoming” queue: 0
TestNotTransactedMessaging|Total items in DB: 1
No problem here, the queue is empty since the message has been correctly received and stored to the database.
testFailedAfterReceiveMessage output:
NotificationProcessor|Received notification | Id: 1 | Redelivery: 1
AbstractMessageListenerContainer|Execution of JMS message listener failed, and no ErrorHandler has been set.
java.lang.RuntimeException: error after receiving message
TestNotTransactedMessaging|Total items in “incoming” queue: 0
TestNotTransactedMessaging|Total items in DB: 0
Since it is executing outside a transaction, the acknowledge mode (auto by default) is used. This implies that the message is considered successfully delivered once the onMessage method is invoked and therefore deleted from the queue. Because the listener failed before storing the message to the DB, we have lost the message!!
testFailedAfterProcessingMessage output:
2013-08-22 18:39:09,906|NotificationProcessor|Received notification | Id: 2 | Redelivery: 1
2013-08-22 18:39:09,906|AbstractMessageListenerContainer|Execution of JMS message listener failed, and no ErrorHandler has been set.
java.lang.RuntimeException: error after processing message
2013-08-22 18:39:15,921|TestNotTransactedMessaging|Total items in “incoming” queue: 0
2013-08-22 18:39:15,921|TestNotTransactedMessaging|Total items in DB: 1
In this case, the message has been deleted from the queue (AUTO_ACKNOWLEDGE) and stored to the DB before the execution failed.
5 Adding local transactions
Usually we can’t allow losing messages like the second case of the test, so what we will do is to invoke the listener within a local transaction. The change is pretty simple and it does not imply modifying a single line of code from our application. We will only need to change the configuration file.
To test the 3 cases with transactions, I will replace the configuration file notx-jms-config.xml for the following:
tx-jms-config.xml
First, I’ve added the number of re-deliveries made in case of a rollback (caused by an error in the listener execution):
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="vm://embedded?broker.persistent=false"/> <property name="redeliveryPolicy"> <bean class="org.apache.activemq.RedeliveryPolicy"> <property name="maximumRedeliveries" value="4"/> </bean> </property> </bean>
Next, I indicate that the listener will be executed within a transaction. This can be done by modifying the listener container definition:
<jms:listener-container connection-factory="connectionFactory" acknowledge="transacted"> <jms:listener ref="notificationProcessor" destination="incoming.queue"/> </jms:listener-container>
This will cause every invocation of the listener to be executed within a local JMS transaction. The transaction will start when the message is received. If the listener execution fails, message reception will be rolled back.
And that’s all we have to change. Let’s launch the tests with this configuration.
6 Testing message reception within transactions
The code from the TestTransactedMessaging class is practically the same as the previous test. The only difference is that it adds a query to the DLQ (dead letter queue). When executed within transactions, if the message reception is rolled back, the broker will send the message to this queue (after all re-deliveries failed).
I’m skipping the output of the successful receiving as it does not bring anything new.
testFailedAfterReceiveMessage output:
NotificationProcessor|Received notification | Id: 1 | Redelivery: 1
AbstractMessageListenerContainer|Execution of JMS message listener failed, and no ErrorHandler has been set.
java.lang.RuntimeException: error after receiving message
NotificationProcessor|Received notification | Id: 1 | Redelivery: 2
AbstractMessageListenerContainer|Execution of JMS message listener failed, and no ErrorHandler has been set.
…
java.lang.RuntimeException: error after receiving message
NotificationProcessor|Received notification | Id: 1 | Redelivery: 5
AbstractMessageListenerContainer|Execution of JMS message listener failed, and no ErrorHandler has been set.
java.lang.RuntimeException: error after receiving message
TestTransactedMessaging|Total items in “incoming” queue: 0
TestTransactedMessaging|Total items in “dead letter” queue: 1
TestTransactedMessaging|Total items in DB: 0
As you can see, the first receiving has failed, and the broker has tried to resend it four more times (as indicated in the maximumRedeliveries property). Since the situation persisted, the message has been sent to the special DLQ queue. In this way, we do not lose the message.
testFailedAfterProcessingMessage output:
NotificationProcessor|Received notification | Id: 2 | Redelivery: 1
AbstractMessageListenerContainer|Execution of JMS message listener failed, and no ErrorHandler has been set.
java.lang.RuntimeException: error after processing message
NotificationProcessor|Received notification | Id: 2 | Redelivery: 2
TestTransactedMessaging|Total items in “incoming” queue: 0
TestTransactedMessaging|Total items in “dead letter” queue: 0
TestTransactedMessaging|Total items in DB: 2
In this case, this is what happened:
- The listener retrieved the message
- It stored the message to the DB
- Listener execution failed
- The broker resends the message. Since the situation has been solved, the listener stores the message to the DB (again). The message has been duplicated.
7 Conclusion
Adding local transactions to the message reception avoids losing messages. What we have to take into account is that duplicate messages can occur, so our listener will have to detect it, or our processing will have to be idempotent to process it again without any problem. If this is not possible, we will have to go for distributed transactions, since they support transactions that involve different resources.