Spring Cloud Stream is a framework that helps developers with data integration problems, especially in event oriented applications. The goal of the framework is to make it easy to apply modern microservices architecture patterns, decouple application responsibilities, and port business logic onto message brokers.
In this article, I will show how easy it is to implement Spring Cloud Stream and how non invasive it can be to the application code. These benefits allow developers to focus on what matters: the business.
We will cover the following topics:
A binding is a bridge between the application and the external messaging systems (Kafka, RabbitMQ, etc). The binding sends and receives messages.
A binding is abstract, and the application should not know or care if it's a Kafka or a RabbitMQ implementation.
Image courtesy of docs.spring.io.
In the image above, you can see that the application core only depends on the binder and that the middleware is responsible for broker (Kafka, RabbitMQ, etc) specific details.
To learn more about this, read the Bindings section in the project documentation.
Since Spring Cloud Stream v2.1, it is possible to define bindings creating beans of types :
java.util.function.[Supplier/Function/Consumer].
Below you can see a binding that receives a String from a channel (topic, queue), transforms it to uppercase, and sends it to another channel:
| @Bean | 
There is also the annotation-based way of defining bindings, which is considered a legacy way and is not recommended for new applications. To learn more about this, you can read this section in the docs.
To have a sample implementation in this post, we will use the following business case:
There is a need for an application that can manage flights.
This application will store all flight information, including the plane, the flight starting point, and the destination. The program will receive a message every time a plane arrives in an airport. Once a plane arrives at its destination, the application should change the flight status to "arrived."
More specifically, the service will receive a plane event in a Kafka topic called plane-events-v1, then it should convert the plane event to a flight event, sending it to the topic flight-events-v1. As shown below, the microservice should also consume information from flight-events-v1, changing the status of the flight to "arrived" if the flight arrived at its destination.
In order to implement the flight API, we will use Spring Cloud Stream with the Kafka binder and MongoDB.
In a Spring Boot Application, you can add the spring-cloud-starter-stream-kafka starter to work with Kafka and Spring Cloud Stream.
If you use Gradle, you can use the following dependency:
| Implementation 'org.springframework.cloud:spring-cloud-starter-stream-kafka' | 
All of the source code shown in this article is available on GitHub here.
In the implementation section below, we will address each of the following areas:
To begin our implementation, we will start with the binding responsible for converting a plane event into a flight event. The two events are defined in Java classes as shown below:
| @AllArgsConstructor | 
(See full GitHub code here.)
| @AllArgsConstructor | 
(See full GitHub code here.)
With both events defined, we can create the binding. Because we want to convert one event into another, we use a java.util.function.Function type of bean. Below you can see the implementation of the message processor:
| @Component("planeEventProcessor") | 
(See full GitHub code here.)
The processor bean delegates the business logic to the FlightOperationsFacade. If the business logic does not find the flight, an exception is raised.
In order to configure this bean to work, we need to set the following properties prefixed with spring.cloud.stream.bindings:
planeEventProcessor-in-0.destination: The topic that the bean will consume.
planeEventProcessor-in-0.group: The group id of the Kafka consumer.
planeEventProcessor-out-0.group: The destination topic that the flight events will be sent to.
Below you can see the application configuration for this bean:
|       bindings: | 
(See full GitHub code here.)
To consume a message from the topic flight-events-v1, we will create a java.util.function.Consumer bean as shown below:
| @Component("flightEventConsumer") | 
(See full GitHub code here.)
This consumer will receive the message and call the business logic module to take care of it.
When there is more than one functional binding bean in a project, we need to define the property spring.cloud.stream.function.definition with a list containing each bean. Below we can see this configuration in the Flight API:
| spring: | 
(See full GitHub code here.)
When working with a consumer, you'll be required to define the topic that it will listen to and the group that it belongs to. You can see it configured below:
|         flightEventConsumer-in-0: | 
(See full GitHub code here.)
Spring Cloud Stream makes it easy to configure and implement dead letter processing and retries.
A lot of times, it is useful to send a message to a special topic when an exception happens. After this, messages can be reprocessed or audited, depending on the business scenario.
With Spring Cloud Stream, we only need to add two properties prefixed with spring.cloud.stream.kafka.bindings.<binding-name>.consumer. These are listed below:
In the flight API, in both plane and flight event consumers, the dead letter processing will be enabled. The flight event's dead letter topic will be flight-events-dlq-v1, and the plane's dead letter topic will be plane-events-dlq-v1.
Below you can see the application configuration file for this behaviour:
|       kafka: | 
(See full GitHub code here.)
Another nice feature of Spring Cloud Stream is that you can configure the number of times a message will be reprocessed. This is very useful for transient errors.
By default, all messages are processed three times, at which point they are processed successfully, sent to a dead letter topic if configured, or just dropped.
To allow you to change the number of retries, the property spring.cloud.stream.bindings.<binding-name>.consumer.maxAttempts is available.
You will also have the option to configure the maximum retry time and the time that elapses between processing attempts. To learn more about this, read the documentation section titled Retry Template and retryBackoff.
In the flight API, we will keep the default behaviour of retries. However, in the case of the plane event processor, we do not want to retry if the exception is a NoFlightFoundException. Because this error is not transient, it will continue to happen no matter how many tries are attempted.
In order to configure this, we use the spring.cloud.stream.bindings.<binding-name>.consumer.retryable-exceptions property. Below you can see how it is configured in the Flight API:
|       bindings: | 
(See full GitHub code here.)
This article showcases how easy it can be to develop message-driven microservices using Spring Boot and how we can develop business code with minimal interference for framework code.
There's a lot more to talk about with Spring Cloud Stream, so I recommend that you read the documentation and also check out the complete business case example in the GitHub repository.