Spring Integration MongoDB adapters with Java DSL
Integration, mongoDB, Spring, Spring-Boot ·This post explains how to save and retrieve entities from a MongoDB database using Spring Integration. In order to accomplish that, we are going to configure inbound and outbound MongoDB channel adapters using the Java DSL configuration extension. As an example, we are going to build an application to allow you to write orders to a MongoDB store, and then retrieve them for processing.
The application flow can be split in two parts:
-
New orders are sent to the messaging system, where they will be converted to actual products and then stored to MongoDB.
-
On the other hand, another component is continuously polling the database and processing any new product it finds.
The source code can be found in my Spring Integration repository.
1 MessagingGateway – Entering the messaging system
Our application does not know anything about the messaging system. In fact, it will just create new orders and send them to an interface (OrderService):
@SpringBootApplication @EnableIntegration public class MongodbBasicApplication { public static void main(String[] args) { ConfigurableApplicationContext context = SpringApplication.run(MongodbBasicApplication.class, args); new MongodbBasicApplication().start(context); } public void start(ConfigurableApplicationContext context) { resetDatabase(context); Order order1 = new Order("1", true); Order order2 = new Order("2", false); Order order3 = new Order("3", true); InfrastructureConfiguration.OrderService orderService = context.getBean(InfrastructureConfiguration.OrderService.class); orderService.order(order1); orderService.order(order2); orderService.order(order3); } private void resetDatabase(ConfigurableApplicationContext context) { ProductRepository productRepository = context.getBean(ProductRepository.class); productRepository.deleteAll(); } }
Taking an initial look at the configuration, we can see that the OrderService is actually a messaging gateway.
@Configuration @ComponentScan("xpadro.spring.integration.endpoint") @IntegrationComponentScan("xpadro.spring.integration.mongodb") public class InfrastructureConfiguration { @MessagingGateway public interface OrderService { @Gateway(requestChannel = "sendOrder.input") void order(Order order); } ... }
Any order sent to the order method will be introduced to the messaging system as a Message
2 First part – processing orders
The first part of the Spring Integration messaging flow is composed by the following components:
We use a lambda to create an IntegrationFlow definition, which registers a DirectChannel as its input channel. The name of the input channel is resolved as ‘beanName + .input’. Hence, the name is the one we specified in the gateway: ‘sendOrder.input’
@Bean @Autowired public IntegrationFlow sendOrder(MongoDbFactory mongo) { return f -> f .transform(Transformers.converter(orderToProductConverter())) .handle(mongoOutboundAdapter(mongo)); }
The first thing the flow does when receiving a new order is use a transformer to convert the order into a product. To register a transformer we can use the Transformers factory provided by the DSL API. Here, we have different possibilities. The one I chose is using a PayloadTypeConvertingTransformer, which delegates to a converter the transformation of the payload into an object.
public class OrderToProductConverter implements Converter<Order, Product> { @Override public Product convert(Order order) { return new Product(order.getId(), order.isPremium()); } }
The next step in the orders flow is to store the newly created product to the database. Here, we use a MongoDB outbound adapter:
@Bean @Autowired public MessageHandler mongoOutboundAdapter(MongoDbFactory mongo) { MongoDbStoringMessageHandler mongoHandler = new MongoDbStoringMessageHandler(mongo); mongoHandler.setCollectionNameExpression(new LiteralExpression("product")); return mongoHandler; }
If you wonder what the message handler is actually doing internally, it uses a mongoTemplate to save the entity:
@Override protected void handleMessageInternal(Message<?> message) throws Exception { String collectionName = this.collectionNameExpression.getValue(this.evaluationContext, message, String.class); Object payload = message.getPayload(); this.mongoTemplate.save(payload, collectionName); }
3 Second part – processing products
In this second part we have another integration flow for processing products:
In order to retrieve previously created products, we have defined an inbound channel adapter which will continuously be polling the MongoDB database:
@Bean @Autowired public IntegrationFlow processProduct(MongoDbFactory mongo) { return IntegrationFlows.from(mongoMessageSource(mongo), c -> c.poller(Pollers.fixedDelay(3, TimeUnit.SECONDS))) .route(Product::isPremium, this::routeProducts) .handle(mongoOutboundAdapter(mongo)) .get(); }
The MongoDB inbound channel adapter is the one responsible for polling products from the database. We specify the query in the constructor. In this case, we poll one non processed product each time:
@Bean @Autowired public MessageSource<Object> mongoMessageSource(MongoDbFactory mongo) { MongoDbMessageSource messageSource = new MongoDbMessageSource(mongo, new LiteralExpression("{'processed' : false}")); messageSource.setExpectSingleResult(true); messageSource.setEntityClass(Product.class); messageSource.setCollectionNameExpression(new LiteralExpression("product")); return messageSource; }
The router definition shows how the product is sent to a different service activator method depending on the ‘premium’ field:
private RouterSpec<Boolean, MethodInvokingRouter> routeProducts(RouterSpec<Boolean, MethodInvokingRouter> mapping) { return mapping .subFlowMapping(true, sf -> sf.handle(productProcessor(), "fastProcess")) .subFlowMapping(false, sf -> sf.handle(productProcessor(), "process")); }
As a service activator, we have a simple bean which logs a message and sets the product as processed. Then, it will return the product so it can be handled by the next endpoint in the flow.
public class ProductProcessor { public Product process(Product product) { return doProcess(product, String.format("Processing product %s", product.getId())); } public Product fastProcess(Product product) { return doProcess(product, String.format("Fast processing product %s", product.getId())); } private Product doProcess(Product product, String message) { System.out.println(message); product.setProcessed(true); return product; } }
The reason for setting the product as processed is because the next step is to update its status in the database in order to not poll it again. We save it by redirecting the flow to the mongoDb outbound channel adapter again.
4 Conclusion
You have seen what endpoints you do have to use in order to interact with a MongoDB database using Spring Integration. The outbound channel adapter passively saves products to the database, while the inbound channel adapter actively polls the database to retrieve new products.
If you found this post useful, please share it or star my repository. I appreciate it 🙂
I’m publishing my new posts on Google plus and Twitter. Follow me if you want to be updated with new content.