Spring Integration MongoDB adapters with Java DSL

This post explains how to save and retrieve entities from a MongoDB database using Spring Integration. In order to accomplish that, we are going to configure inbound and outbound MongoDB channel adapters using the Java DSL configuration extension. As an example, we are going to build an application to allow you to write orders to a MongoDB store, and then retrieve them for processing.

The application flow can be split in two parts:

The source code can be found in my Spring Integration repository.

1 MessagingGateway – Entering the messaging system

Our application does not know anything about the messaging system. In fact, it will just create new orders and send them to an interface (OrderService):

@SpringBootApplication
@EnableIntegration
public class MongodbBasicApplication {
    
    public static void main(String[] args) {
        ConfigurableApplicationContext context = SpringApplication.run(MongodbBasicApplication.class, args);
        new MongodbBasicApplication().start(context);
    }
    
    public void start(ConfigurableApplicationContext context) {
        resetDatabase(context);
        
        Order order1 = new Order("1", true);
        Order order2 = new Order("2", false);
        Order order3 = new Order("3", true);
        
        InfrastructureConfiguration.OrderService orderService = context.getBean(InfrastructureConfiguration.OrderService.class);
        
        orderService.order(order1);
        orderService.order(order2);
        orderService.order(order3);
    }
    
    private void resetDatabase(ConfigurableApplicationContext context) {
        ProductRepository productRepository = context.getBean(ProductRepository.class);
        productRepository.deleteAll();
    }
}

 

Taking an initial look at the configuration, we can see that the OrderService is actually a messaging gateway.

@Configuration
@ComponentScan("xpadro.spring.integration.endpoint")
@IntegrationComponentScan("xpadro.spring.integration.mongodb")
public class InfrastructureConfiguration {

    @MessagingGateway
    public interface OrderService {

        @Gateway(requestChannel = "sendOrder.input")
        void order(Order order);
    }
    
    ...
}

 

Any order sent to the order method will be introduced to the messaging system as a Message through the ‘sendOrder.input’ direct channel.

2 First part – processing orders

The first part of the Spring Integration messaging flow is composed by the following components:

mongoDB channel adapters - processing order diagram

 

We use a lambda to create an IntegrationFlow definition, which registers a DirectChannel as its input channel. The name of the input channel is resolved as ‘beanName + .input’. Hence, the name is the one we specified in the gateway: ‘sendOrder.input’

@Bean
@Autowired
public IntegrationFlow sendOrder(MongoDbFactory mongo) {
    return f -> f
        .transform(Transformers.converter(orderToProductConverter()))
        .handle(mongoOutboundAdapter(mongo));
}

 

The first thing the flow does when receiving a new order is use a transformer to convert the order into a product. To register a transformer we can use the Transformers factory provided by the DSL API. Here, we have different possibilities. The one I chose is using a PayloadTypeConvertingTransformer, which delegates to a converter the transformation of the payload into an object.

public class OrderToProductConverter implements Converter<Order, Product> {

    @Override
    public Product convert(Order order) {
        return new Product(order.getId(), order.isPremium());
    }
}

 

The next step in the orders flow is to store the newly created product to the database. Here, we use a MongoDB outbound adapter:

@Bean
@Autowired
public MessageHandler mongoOutboundAdapter(MongoDbFactory mongo) {
    MongoDbStoringMessageHandler mongoHandler = new MongoDbStoringMessageHandler(mongo);
    mongoHandler.setCollectionNameExpression(new LiteralExpression("product"));
    return mongoHandler;
}

 

If you wonder what the message handler is actually doing internally, it uses a mongoTemplate to save the entity:

@Override
protected void handleMessageInternal(Message<?> message) throws Exception {
    String collectionName = this.collectionNameExpression.getValue(this.evaluationContext, message, String.class);
    Object payload = message.getPayload();
    
    this.mongoTemplate.save(payload, collectionName);
}

 

3 Second part – processing products

In this second part we have another integration flow for processing products:

mongoDB channel adapters - processing product diagram

 

In order to retrieve previously created products, we have defined an inbound channel adapter which will continuously be polling the MongoDB database:

@Bean
@Autowired
public IntegrationFlow processProduct(MongoDbFactory mongo) {
    return IntegrationFlows.from(mongoMessageSource(mongo), c -> c.poller(Pollers.fixedDelay(3, TimeUnit.SECONDS)))
        .route(Product::isPremium, this::routeProducts)
        .handle(mongoOutboundAdapter(mongo))
        .get();
}

 

The MongoDB inbound channel adapter is the one responsible for polling products from the database. We specify the query in the constructor. In this case, we poll one non processed product each time:

@Bean
@Autowired
public MessageSource<Object> mongoMessageSource(MongoDbFactory mongo) {
    MongoDbMessageSource messageSource = new MongoDbMessageSource(mongo, new LiteralExpression("{'processed' : false}"));
    messageSource.setExpectSingleResult(true);
    messageSource.setEntityClass(Product.class);
    messageSource.setCollectionNameExpression(new LiteralExpression("product"));
    
    return messageSource;
}

 

The router definition shows how the product is sent to a different service activator method depending on the ‘premium’ field:

private RouterSpec<Boolean, MethodInvokingRouter> routeProducts(RouterSpec<Boolean, MethodInvokingRouter> mapping) {
    return mapping
        .subFlowMapping(true, sf -> sf.handle(productProcessor(), "fastProcess"))
        .subFlowMapping(false, sf -> sf.handle(productProcessor(), "process"));
}

 

As a service activator, we have a simple bean which logs a message and sets the product as processed. Then, it will return the product so it can be handled by the next endpoint in the flow.

public class ProductProcessor {

    public Product process(Product product) {
        return doProcess(product, String.format("Processing product %s", product.getId()));
    }

    public Product fastProcess(Product product) {
        return doProcess(product, String.format("Fast processing product %s", product.getId()));
    }

    private Product doProcess(Product product, String message) {
        System.out.println(message);
        product.setProcessed(true);
        return product;
    }
}

 

The reason for setting the product as processed is because the next step is to update its status in the database in order to not poll it again. We save it by redirecting the flow to the mongoDb outbound channel adapter again.

4 Conclusion

You have seen what endpoints you do have to use in order to interact with a MongoDB database using Spring Integration. The outbound channel adapter passively saves products to the database, while the inbound channel adapter actively polls the database to retrieve new products.

If you found this post useful, please share it or star my repository. I appreciate it 🙂

I’m publishing my new posts on Google plus and Twitter. Follow me if you want to be updated with new content.