Making queries on demand: MongoDB outbound gateway

In order to read data from MongoDb, Spring Integration comes with the MongoDb inbound channel adapter. This adapter uses a poller to continuously retrieve documents from the database. However, sometimes we may need to query the database on demand, based on the result of another endpoint.

Taking advantage of Spring’s extensibility, I implemented a MongoDb outbound gateway. The purpose of this gateway is to react to some request, make a query to the database and return the result.

In order to show you how the gateway works, I will use a simple example and modify it to implement the gateway with different configurations.

This example consists in a messaging gateway as the entry point to the integration flow. Once a message enters the flow, the mongoDb outbound gateway makes a query to the database and the result is then sent to another channel where a service activator will process it.

The source code for these examples and the gateway implementation can be found in my repository.

1 Java DSL example

I implemented the MongoDb static class to ease the definition of the gateway. I took the idea from the Spring Integration Jpa class.

In the following configuration you can see the flow requestPerson. An invocation to PersonService’s send method will send a message to the flow, where the mongoDb outbound gateway will then query the database with a pre-defined query ({id : 1}):

@ComponentScan("xpadro.spring.integration.mongodb")
@IntegrationComponentScan("xpadro.spring.integration.mongodb")
public class JavaDSLQueryConfiguration {
    
    @MessagingGateway
    public interface PersonService {
        
        @Gateway(requestChannel = "requestPerson.input")
        void send(RequestMessage requestMessage);
    }

    @Bean
    @Autowired
    public IntegrationFlow requestPerson(MongoDbFactory mongo) {
        return f -> f
                .handle(outboundGateway(mongo))
                .handle(resultHandler(), "handle");
    }

    @Bean
    public ResultHandler resultHandler() {
        return new ResultHandler();
    }

    private MongoDbOutboundGatewaySpec outboundGateway(MongoDbFactory mongo) {
        return MongoDb.outboundGateway(mongo)
                .query("{id: 1}")
                .collectionNameExpression(new LiteralExpression("person"))
                .expectSingleResult(true)
                .entityClass(Person.class);
    }
}

 

The result handler is a “very useful” component which will log the retrieved person:

public class ResultHandler {

    public void handle(Person person) {
        System.out.println(String.format("Person retrieved: %s", person));
    }
}

 

In order to start the flow, the following application sends a message to the PersonService gateway:

@SpringBootApplication
@EnableIntegration
@Import(JavaDSLQueryConfiguration.class)
public class JavaDSLQueryApplication extends AbstractApplication {

	public static void main(String[] args) {
		ConfigurableApplicationContext context = SpringApplication.run(JavaDSLQueryApplication.class, args);
		new JavaDSLQueryApplication().start(context);
	}

	public void start(ConfigurableApplicationContext context) {
		resetDatabase(context);

		JavaDSLQueryConfiguration.PersonService personService = context.getBean(JavaDSLQueryConfiguration.PersonService.class);
		personService.send(new RequestMessage());
	}
}

 

As a note, the abstract class just contains the logic to set up the database, which is used along all the other examples.

2 Java DSL example with dynamic query expression

The previous example was useful to see how to define the gateway, but having a hardcoded query may not be the most used case.

In this example, the query is defined in the message sent to the integration flow:

personService.send(new RequestMessage("{id : 2}"));

 

In the configuration file, the gateway’s queryExpression property resolves the query dynamically by retrieving the data property of the message’s payload:

private MongoDbOutboundGatewaySpec outboundGateway(MongoDbFactory mongo) {
        return MongoDb.outboundGateway(mongo)
                .queryExpression("payload.data")
                .collectionNameExpression(new LiteralExpression("person"))
                .expectSingleResult(true)
                .entityClass(Person.class);
    }

 

3 Java DSL example returning multiple results

The two previous examples retrieved a single document from the database. In this next example, the query returns a list with all documents matching the query:

In the request message we specify the query to find all documents in the persons collection:

personService.send(new RequestMessage("{}"));

 

In the configuration, we have to remove the expectSingleResult property from the gateway (or set it to false). Additionally, we can specify a limit:

private MongoDbOutboundGatewaySpec outboundGateway(MongoDbFactory mongo) {
        return MongoDb.outboundGateway(mongo)
                .queryExpression("payload.data")
                .collectionNameExpression(new LiteralExpression("person"))
                .entityClass(Person.class)
                .maxResults(2);
    }

 

Finally, we have to define another method in the ResultHandler class to handle multiple results:

public void handle(List<Person> persons) {
        String names = persons.stream().map(Person::getName).collect(Collectors.joining(", "));
        System.out.println(String.format("Persons retrieved: %s", names));
    }

 

4 Java Config example

In this last example, Java Config is used instead of Java DSL to configure the whole flow. On the application’s side everything is the same. We just query the person service for a specific document:

personService.send(new RequestMessage("{id : 3}"));

 

When using Java Config, we have to build the MongoDbExecutor, which is used by the gateway to do the queries.

@ComponentScan("xpadro.spring.integration.mongodb")
@IntegrationComponentScan("xpadro.spring.integration.mongodb")
public class JavaConfigQueryConfiguration {

    @MessagingGateway
    public interface PersonService {

        @Gateway(requestChannel = "personInput")
        void send(RequestMessage requestMessage);
    }

    @Bean
    public ResultServiceActivator resultHandler() {
        return new ResultServiceActivator();
    }

    @Bean
    @ServiceActivator(inputChannel = "personInput")
    public MessageHandler mongodbOutbound(MongoDbFactory mongo) {
        MongoDbExecutor mongoDbExecutor = new MongoDbExecutor(mongo);
        mongoDbExecutor.setCollectionNameExpression(new LiteralExpression("person"));
        mongoDbExecutor.setMongoDbQueryExpression("payload.data");
        mongoDbExecutor.setExpectSingleResult(true);
        mongoDbExecutor.setEntityClass(Person.class);

        MongoDbOutboundGateway gateway = new MongoDbOutboundGateway(mongoDbExecutor);
        gateway.setOutputChannelName("personOutput");

        return gateway;
    }
}

 

Listening to the gateway’s output channel, we define a service activator to handle the retrieved person:

public class ResultServiceActivator {

    @ServiceActivator(inputChannel = "personOutput")
    public void handle(Person person) {
        System.out.println(String.format("Person retrieved: %s", person));
    }
}

 

5 Conclusion

An outbound gateway is suitable when you need to do queries on demand instead of actively polling the database. Currently, this implementation supports setup with Java Config and Java DSL. For now, I haven’t implemented the parsers needed to support XML configuration since I think these two ways of configuration cover the main necessity.

If you found this post useful, please share it or star my repository 🙂

I’m publishing my new posts on Google plus and Twitter. Follow me if you want to be updated with new content.