Processing messages in transactions with Spring JMS

This post shows how to process messages in transactions with Spring JMS. We will see how an error in the execution of the consumer during the asynchronous reception of messages with JMS, can lead to the loss of messages. I then will explain how you can solve this problem using local transactions.

You will also see that this solution can cause in some cases, message duplication (for example, when it saves the message to the database and then the listener execution fails). The reason why this happens is because the JMS transaction is independent to other transactional resources like the DB. If your processing is not idempotent or if your application does not support duplicate message detection, then you will have to use distributed transactions.

Distributed transactions are beyond the scope of this post. If you are interested in handling distributed transactions, you can read Distributed transactions in Spring.

 

1 Use cases

I’ve implemented a test application that reproduces the following cases:

1 Sending and reception of a message: The consumer will process the received message, storing it to a database.

    1. The producer sends the message to a queue:
  • send message diagram
    1. The consumer retrieves the message from the queue and processes it:
  • receive message diagram

    2 Error occurred before message processing: The consumer retrieves the message but the execution fails before storing it to the DB.

    error before processing diagram

    3 Error occurred after processing the message: The consumer retrieves the message, stores it to the DB, and then the execution fails.

    error after processing diagram

     

    The source code for this application can be found at my Github repository.

     

    2 The test application

    The test application executes two test classes, TestNotTransactedMessaging and TestTransactedMessaging. These classes will both execute the three cases above described.

    Let’s see the configuration of the application when it is executed without transactions.

     

    app-config.xml

    Application configuration. Basically it checks within the indicated packages to autodetect the application beans: producer and consumer. It also configures the in-memory database where processed notifications will be stored.

    <context:component-scan base-package="xpadro.spring.jms.producer, xpadro.spring.jms.receiver"/>
    
    <bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate">
        <constructor-arg ref="dataSource"/>
    </bean>
    
    <jdbc:embedded-database id="dataSource">
        <jdbc:script location="classpath:db/schema.sql" />
    </jdbc:embedded-database>

     

    notx-jms-config.xml

    Configures the JMS infrastructure, which is:

    <!-- Infrastructure -->
    <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="vm://embedded?broker.persistent=false"/>
    </bean>
    
    <bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        <property name="targetConnectionFactory" ref="connectionFactory"/>
    </bean>
    
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="cachingConnectionFactory"/>
        <property name="defaultDestination" ref="incomingQueue"/>
    </bean>
    
    <!-- Destinations -->
    <bean id="incomingQueue" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg value="incoming.queue"/>
    </bean>
    	
    <!-- Listeners -->
    <jms:listener-container connection-factory="connectionFactory">
        <jms:listener ref="notificationProcessor" destination="incoming.queue"/>
    </jms:listener-container>

     

    The producer simply uses the jmsTemplate to send notifications:

    @Component("producer")
    public class Producer {
        private static Logger logger = LoggerFactory.getLogger(Producer.class);
        
        @Autowired
        private JmsTemplate jmsTemplate;
        
        public void convertAndSendMessage(String destination, Notification notification) {
            jmsTemplate.convertAndSend(destination, notification);
            logger.info("Sending notification | Id: "+notification.getId());
        }
    }

     

    The listener is responsible for retrieval of notifications from the queue and stores them to the database:

    @Component("notificationProcessor")
    public class NotificationProcessor implements MessageListener {
        private static Logger logger = LoggerFactory.getLogger(NotificationProcessor.class);
        
        @Autowired
        private JdbcTemplate jdbcTemplate;
        
        @Override
        public void onMessage(Message message) {
            try {
                Notification notification = (Notification) ((ObjectMessage) message).getObject();
                logger.info("Received notification | Id: "+notification.getId()+" | Redelivery: "+getDeliveryNumber(message));
                
                checkPreprocessException(notification);
                saveToBD(notification);
                checkPostprocessException(message, notification);
            } catch (JMSException e) {
                throw JmsUtils.convertJmsAccessException(e);
            }
        }	
        ...
    }

    The checkPreprocessException method will throw a runtime exception when a notification with id=1 arrive. In this way, we will cause an error before storing the message to the DB.

    The checkPostprocessException method will throw an exception if a notification with id=2 arrive, thereby causing an error just after storing it to the DB.

    The getDeliveryNumber method returns the number of times the message has been sent. This only applies within transactions, since the broker will try to resend the message after listener processing failure led to a rollback.

    Finally, the saveToDB method is pretty obvious. It stores a notification to the DB.

    You can always check the source code of this application by clicking the link at the beginning of this article.

     

    3 Testing message reception without transactions

    I will launch two test classes, one without transactions and the other within a local transaction. Both classes extend a base class that loads the common application context and contains some utility methods:

    @ContextConfiguration(locations = {"/xpadro/spring/jms/config/app-config.xml"})
    @DirtiesContext
    public class TestBaseMessaging {
        protected static final String QUEUE_INCOMING = "incoming.queue";
        protected static final String QUEUE_DLQ = "ActiveMQ.DLQ";
        
        @Autowired
        protected JdbcTemplate jdbcTemplate;
        
        @Autowired
        protected JmsTemplate jmsTemplate;
        
        @Autowired
        protected Producer producer;
        
        @Before
        public void prepareTest() {
            jdbcTemplate.update("delete from Notifications");
        }
        
        protected int getSavedNotifications() {
            return jdbcTemplate.queryForObject("select count(*) from Notifications", Integer.class);
        }
        
        protected int getMessagesInQueue(String queueName) {
            return jmsTemplate.browse(queueName, new BrowserCallback<Integer>() {
                @Override
                public Integer doInJms(Session session, QueueBrowser browser) throws JMSException {
                    Enumeration<?> messages = browser.getEnumeration();
                    int total = 0;
                    while (messages.hasMoreElements()) {
                        messages.nextElement();
                        total++;
                    }
                    
                    return total;
                }
            });
        }
    }

     

    The utility methods are explained below:

    Now, let me show you the code for the first test (TestNotTransactedMessaging). This test launches the 3 cases indicated at the beginning of the article:

    @Test
    public void testCorrectMessage() throws InterruptedException {
        Notification notification = new Notification(0, "notification to deliver correctly");
        producer.convertAndSendMessage(QUEUE_INCOMING, notification);
        
        Thread.sleep(6000);
        printResults();
        
        assertEquals(1, getSavedNotifications());
        assertEquals(0, getMessagesInQueue(QUEUE_INCOMING));
    }
    
    @Test
    public void testFailedAfterReceiveMessage() throws InterruptedException {
        Notification notification = new Notification(1, "notification to fail after receiving");
        producer.convertAndSendMessage(QUEUE_INCOMING, notification);
        
        Thread.sleep(6000);
        printResults();
        
        assertEquals(0, getSavedNotifications());
        assertEquals(0, getMessagesInQueue(QUEUE_INCOMING));
    }
    
    @Test
    public void testFailedAfterProcessingMessage() throws InterruptedException {
        Notification notification = new Notification(2, "notification to fail after processing");
        producer.convertAndSendMessage(QUEUE_INCOMING, notification);
        
        Thread.sleep(6000);
        printResults();
        
        assertEquals(1, getSavedNotifications());
        assertEquals(0, getMessagesInQueue(QUEUE_INCOMING));
    }
    
    private void printResults() {
        logger.info("Total items in \"incoming\" queue: "+getMessagesInQueue(QUEUE_INCOMING));
        logger.info("Total items in DB: "+getSavedNotifications());
    }

     

    4 Executing the test

    Ok, let’s execute the test and see what the results are:

    testCorrectMessage output:

    Producer|Sending notification | Id: 0
    NotificationProcessor|Received notification | Id: 0 | Redelivery: 1
    TestNotTransactedMessaging|Total items in “incoming” queue: 0
    TestNotTransactedMessaging|Total items in DB: 1

    No problem here, the queue is empty since the message has been correctly received and stored to the database.

    testFailedAfterReceiveMessage output:

    Producer|Sending notification | Id: 1
    NotificationProcessor|Received notification | Id: 1 | Redelivery: 1
    AbstractMessageListenerContainer|Execution of JMS message listener failed, and no ErrorHandler has been set.
    java.lang.RuntimeException: error after receiving message
    TestNotTransactedMessaging|Total items in “incoming” queue: 0
    TestNotTransactedMessaging|Total items in DB: 0

    Since it is executing outside a transaction, the acknowledge mode (auto by default) is used. This implies that the message is considered successfully delivered once the onMessage method is invoked and therefore deleted from the queue. Because the listener failed before storing the message to the DB, we have lost the message!!

    testFailedAfterProcessingMessage output:

    2013-08-22 18:39:09,906|Producer|Sending notification | Id: 2
    2013-08-22 18:39:09,906|NotificationProcessor|Received notification | Id: 2 | Redelivery: 1
    2013-08-22 18:39:09,906|AbstractMessageListenerContainer|Execution of JMS message listener failed, and no ErrorHandler has been set.
    java.lang.RuntimeException: error after processing message
    2013-08-22 18:39:15,921|TestNotTransactedMessaging|Total items in “incoming” queue: 0
    2013-08-22 18:39:15,921|TestNotTransactedMessaging|Total items in DB: 1

    In this case, the message has been deleted from the queue (AUTO_ACKNOWLEDGE) and stored to the DB before the execution failed.

     

    5 Adding local transactions

    Usually we can’t allow losing messages like the second case of the test, so what we will do is to invoke the listener within a local transaction. The change is pretty simple and it does not imply modifying a single line of code from our application. We will only need to change the configuration file.

    To test the 3 cases with transactions, I will replace the configuration file notx-jms-config.xml for the following:

    tx-jms-config.xml

    First, I’ve added the number of re-deliveries made in case of a rollback (caused by an error in the listener execution):

    <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="vm://embedded?broker.persistent=false"/>
        <property name="redeliveryPolicy">
            <bean class="org.apache.activemq.RedeliveryPolicy">
                <property name="maximumRedeliveries" value="4"/>
            </bean>
        </property>
    </bean>

     

    Next, I indicate that the listener will be executed within a transaction. This can be done by modifying the listener container definition:

    <jms:listener-container connection-factory="connectionFactory" acknowledge="transacted">
        <jms:listener ref="notificationProcessor" destination="incoming.queue"/>
    </jms:listener-container>

     

    This will cause every invocation of the listener to be executed within a local JMS transaction. The transaction will start when the message is received. If the listener execution fails, message reception will be rolled back.

    And that’s all we have to change. Let’s launch the tests with this configuration.

     

    6 Testing message reception within transactions

    The code from the TestTransactedMessaging class is practically the same as the previous test. The only difference is that it adds a query to the DLQ (dead letter queue). When executed within transactions, if the message reception is rolled back, the broker will send the message to this queue (after all re-deliveries failed).

    I’m skipping the output of the successful receiving as it does not bring anything new.

    testFailedAfterReceiveMessage output:

    Producer|Sending notification | Id: 1
    NotificationProcessor|Received notification | Id: 1 | Redelivery: 1
    AbstractMessageListenerContainer|Execution of JMS message listener failed, and no ErrorHandler has been set.
    java.lang.RuntimeException: error after receiving message
    NotificationProcessor|Received notification | Id: 1 | Redelivery: 2
    AbstractMessageListenerContainer|Execution of JMS message listener failed, and no ErrorHandler has been set.

    java.lang.RuntimeException: error after receiving message
    NotificationProcessor|Received notification | Id: 1 | Redelivery: 5
    AbstractMessageListenerContainer|Execution of JMS message listener failed, and no ErrorHandler has been set.
    java.lang.RuntimeException: error after receiving message
    TestTransactedMessaging|Total items in “incoming” queue: 0
    TestTransactedMessaging|Total items in “dead letter” queue: 1
    TestTransactedMessaging|Total items in DB: 0

    As you can see, the first receiving has failed, and the broker has tried to resend it four more times (as indicated in the maximumRedeliveries property). Since the situation persisted, the message has been sent to the special DLQ queue. In this way, we do not lose the message.

    testFailedAfterProcessingMessage output:

    Producer|Sending notification | Id: 2
    NotificationProcessor|Received notification | Id: 2 | Redelivery: 1
    AbstractMessageListenerContainer|Execution of JMS message listener failed, and no ErrorHandler has been set.
    java.lang.RuntimeException: error after processing message
    NotificationProcessor|Received notification | Id: 2 | Redelivery: 2
    TestTransactedMessaging|Total items in “incoming” queue: 0
    TestTransactedMessaging|Total items in “dead letter” queue: 0
    TestTransactedMessaging|Total items in DB: 2

    In this case, this is what happened:

    1. The listener retrieved the message
    2. It stored the message to the DB
    3. Listener execution failed
    4. The broker resends the message. Since the situation has been solved, the listener stores the message to the DB (again). The message has been duplicated.

     

    7 Conclusion

    Adding local transactions to the message reception avoids losing messages. What we have to take into account is that duplicate messages can occur, so our listener will have to detect it, or our processing will have to be idempotent to process it again without any problem. If this is not possible, we will have to go for distributed transactions, since they support transactions that involve different resources.