Introduction to messaging with Spring JMS

In this post I will show you how to configure a standalone application in order to see different ways of sending and receiving messages using Spring JMS. Basically, I will divide the examples into the following sections:

The source code with all the examples shown in this article is available at my Github repository.

1 Configuring the provider

The first thing we need to do is to configure the ConnectionFactory. The connection factory is part of the JMS specification and allows the application to create connections with the JMS provider:

<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    <property name="brokerURL" value="vm://embedded?broker.persistent=false"/>
</bean>

The factory is used to create a connection which is then used to create a session. In the following examples, we won’t need to care about this since the JmsTemplate class will do this for us.

Spring provides its own ConnectionFactory implementations, which are specified below:

The JmsTemplate aggressively opens and closes resources like sessions since it assumes that are cached by the connectionFactory. Using the CachingConnectionFactory will improve its performance. In our example, we will define a cachingConnectionFactory passing our previously defined AMQ connectionFactory to its targetConnectionFactory property:

<bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
    <property name="targetConnectionFactory" ref="connectionFactory"/>
</bean>

 

2 Point-to-point messaging (queue)

This Destination implementation consists in sending a message to a single consumer. The producer will send a message to the queue where it will be retrieved by the consumer.

point-to-point diagram

 

The consumer will actively retrieve the message from the queue (synchronous reception) or it will retrieve the message passively (asynchronous reception). Now we will see an example of each.

2.1 Synchronous reception

2.1.1 Configuration

Spring JMS uses JmsTemplate class for message production and synchronous message reception. This template is a central class of Spring JMS, and helps us by:

Let’s configure the jmsTemplate:

<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory" ref="cachingConnectionFactory"/>
    <property name="defaultDestination" ref="syncTestQueue"/>
</bean>

This template will handle our point-to-point messaging. To use topics it will need further configuration, which will be shown in the following sections.

The Queue destination is defined below:

<bean id="syncTestQueue" class="org.apache.activemq.command.ActiveMQQueue">
    <constructor-arg value="test.sync.queue"/>
</bean>

In our example, a producer will send a message to this queue and a consumer will retrieve it.

2.1.2 The producer

@Component("producer")
public class Producer {
    @Autowired
    @Qualifier("jmsTemplate")
    private JmsTemplate jmsTemplate;
    
    public void convertAndSendMessage(Notification notification) {
        jmsTemplate.convertAndSend(notification);
    }

 

This producer will use a jmsTemplate to send a message. Note the @Component annotation, this class will be auto detected and registered as a bean.

It is also important to see that we are passing a Notification object to the jmsTemplate method. If we do not define a message converter, the template will register a SimpleMessageConverter by default (check JmsTemplate constructor). This converter will be able to convert the following types:

If the object being sent is not an instance of any of the previous list, it will throw a MessageConversionException. The common cause of this exception is that your object is not implementing Serializable interface.

In this case, it will convert our Notification object to an ObjectMessage and send it to its default destination, which we previously defined as “test.sync.queue“.

2.1.3 The consumer

@Component
public class SyncReceiver {
    @Autowired
    private JmsTemplate jmsTemplate;
    
    public Notification receive() {
        return (Notification) jmsTemplate.receiveAndConvert("test.sync.queue");
    }
}

You should use this method carefully as it blocks the current thread until it receives the message. You should better define a timeout in case there’s a problem receiving the message. The jmsTemplate has no timeout setter method defined. You will need to define it when configuring the AMQ connection factory:

<property name=”sendTimeout” value=”5000″/>

2.1.4 The test

@ContextConfiguration(locations = {
    "/xpadro/spring/jms/config/jms-config.xml", 
    "/xpadro/spring/jms/config/app-config.xml"})
@RunWith(SpringJUnit4ClassRunner.class)
public class TestSyncMessaging {
    
    @Autowired
    private Producer producer;
    
    @Autowired
    private SyncReceiver syncReceiver;
    
    @Test
    public void testSynchronizedReceiving() throws InterruptedException {
        Notification notification = new Notification("1", "this is a message");
        //Sends the message to the jmsTemplate's default destination
        producer.convertAndSendMessage(notification);
        Thread.sleep(2000);
        
        Notification receivedNotification = syncReceiver.receive();
        assertNotNull(receivedNotification);
        assertEquals("this is a message", receivedNotification.getMessage());
    }
}

 

2.2 Asynchronous reception

Spring lets you receive messages asynchronously in two different ways:

The following example will show the second approach.

2.2.1 Configuration

We can use the same jmsTemplate we configured in the previous example. In this case we will configure another queue where the producer will send its message and a consumer that will act as a listener:

<bean id="asyncTestQueue" class="org.apache.activemq.command.ActiveMQQueue">
    <constructor-arg value="test.async.queue"/>
</bean>

<jms:listener-container connection-factory="connectionFactory">
    <jms:listener destination="test.async.queue" ref="asyncReceiver" method="receiveMessage"/>
</jms:listener-container>

 

We are configuring a consumer which will be the asyncReceiver bean, and the listener container will invoke its receiveMessage method when a message arrives to the test.async.queue.

2.2.2 The producer

The producer will be the same defined in the previous section, but it will send the message to a different queue:

public void convertAndSendMessage(String destination, Notification notification) {
    jmsTemplate.convertAndSend(destination, notification);
}

2.2.3 The consumer

@Component("asyncReceiver")
public class AsyncReceiver {
    @Autowired
    private NotificationRegistry registry;
    
    public void receiveMessage(Notification notification) {
        registry.registerNotification(notification);
    }
}

 

As you can see, it’s a simple Java class. It does not need to implement any interface. The consumer saves received notifications to a registry. This registry will be used by the test class to assert that notifications arrived correctly.

2.2.4 The test

@ContextConfiguration(locations = {
    "/xpadro/spring/jms/config/jms-config.xml", 
    "/xpadro/spring/jms/config/app-config.xml"})
@RunWith(SpringJUnit4ClassRunner.class)
public class TestAsyncMessaging {
    
    @Autowired
    private Producer producer;
    
    @Autowired
    private NotificationRegistry registry;
    
    @Test
    public void testAsynchronizedReceiving() throws InterruptedException {
        Notification notification = new Notification("2", "this is another message");
        producer.convertAndSendMessage("test.async.queue", notification);
        Thread.sleep(2000);
        
        assertEquals(1, registry.getReceivedNotifications().size());
        assertEquals("this is another message", registry.getReceivedNotifications().get(0).getMessage());
    }
}

 

3 Publish-subscribe messaging (topic)

The message is sent to a topic, where it will be distributed to all consumers that are subscribed to this topic.

publish-subscribe diagram

 

3.1 Configuration

We will need another jmsTemplate since the template we configured before is set to work with queues.

<bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory" ref="cachingConnectionFactory"/>
    <property name="pubSubDomain" value="true"/>
</bean>

We just need to configure its destination accessor by defining its pubSubDomain property and set its value to true. The default value is false (point-to-point).

Next, we configure a new destination, which will be the topic for this example:

<bean id="testTopic" class="org.apache.activemq.command.ActiveMQTopic">
    <constructor-arg value="test.topic"/>
</bean>

Finally, we define the listeners. We define two listeners to make sure both consumers receive the message sent to the topic by the producer.

<jms:listener-container connection-factory="connectionFactory" destination-type="topic">
    <jms:listener destination="test.topic" ref="asyncTopicFooReceiver" method="receive"/>
    <jms:listener destination="test.topic" ref="asyncTopicBarReceiver" method="receive"/>
</jms:listener-container>

You may notice a difference in the listener configuration. We need to change the destination type of the listener container, which is set to queue by default. Just set its value to topic and we are done.

3.2 The producer

@Component("producer")
public class Producer {
    @Autowired
    @Qualifier("jmsTopicTemplate")
    private JmsTemplate jmsTopicTemplate;
    
    public void convertAndSendTopic(Notification notification) {
        jmsTopicTemplate.convertAndSend("test.topic", notification);
    }
}

3.3 The consumer

@Component("asyncTopicBarReceiver")
public class AsyncTopicBarReceiver {
    @Autowired
    private NotificationRegistry registry;
    
    public void receive(Notification notification) {
        registry.registerNotification(notification);
    }
}

The asyncTopicFooReceiver has the same method.

3.4 The test

@ContextConfiguration(locations = {
    "/xpadro/spring/jms/config/jms-config.xml", 
    "/xpadro/spring/jms/config/app-config.xml"})
@RunWith(SpringJUnit4ClassRunner.class)
public class TestTopicMessaging {
    
    @Autowired
    private Producer producer;
    
    @Autowired
    private NotificationRegistry registry;
    
    @Test
    public void testTopicSending() throws InterruptedException {
        Notification notification = new Notification("3", "this is a topic");
        producer.convertAndSendTopic(notification);
        Thread.sleep(2000);
        
        assertEquals(2, registry.getReceivedNotifications().size());
        assertEquals("this is a topic", registry.getReceivedNotifications().get(0).getMessage());
        assertEquals("this is a topic", registry.getReceivedNotifications().get(1).getMessage());
    }
}