Avenue Code Snippets

Processing Messages with Spring Cloud Stream and Kafka

Written by Henrique Schmidt | 6/9/21 5:00 PM

Spring Cloud Stream is a framework that helps developers with data integration problems, especially in event oriented applications. The goal of the framework is to make it easy to apply modern microservices architecture patterns, decouple application responsibilities, and port business logic onto message brokers.

In this article, I will show how easy it is to implement Spring Cloud Stream and how non invasive it can be to the application code. These benefits allow developers to focus on what matters: the business.

We will cover the following topics:

  1. Bindings
  2. A functional programming model
  3. A business case (a flight API)
  4. Implementing the flight API with Spring Cloud Stream and Kafka
  5. Error handling
Bindings

A binding is a bridge between the application and the external messaging systems (Kafka, RabbitMQ, etc). The binding sends and receives messages.

A binding is abstract, and the application should not know or care if it's a Kafka or a RabbitMQ implementation.

 

Image courtesy of docs.spring.io.

In the image above, you can see that the application core only depends on the binder and that the middleware is responsible for broker (Kafka, RabbitMQ, etc) specific details.

To learn more about this, read the Bindings section in the project documentation.

Functional Programming Model

Since Spring Cloud Stream v2.1, it is possible to define bindings creating beans of types :

java.util.function.[Supplier/Function/Consumer].

Below you can see a binding that receives a String from a channel (topic, queue), transforms it to uppercase, and sends it to another channel:

@Bean
public Function<String, String> uppercase() {
    return value -> {
        System.out.println("Received: " + value);
  return value.toUpperCase()
    };
}

There is also the annotation-based way of defining bindings, which is considered a legacy way and is not recommended for new applications. To learn more about this, you can read this section in the docs.

A Business Case: Flight API

To have a sample implementation in this post, we will use the following business case:

There is a need for an application that can manage flights. 

This application will store all flight information, including the plane, the flight starting point, and the destination.  The program will receive a message every time a plane arrives in an airport. Once a plane arrives at its destination, the application should change the flight status to "arrived."

More specifically, the service will receive a plane event in a Kafka topic called plane-events-v1, then it should convert the plane event to a flight event, sending it to the topic flight-events-v1. As shown below, the microservice should also consume information from flight-events-v1, changing the status of the flight to "arrived" if the flight arrived at its destination.

Implementing the Flight API with Spring Cloud Stream and Kafka

In order to implement the flight API, we will use Spring Cloud Stream with the Kafka binder and MongoDB.

In a Spring Boot Application, you can add the spring-cloud-starter-stream-kafka starter to work with Kafka and Spring Cloud Stream.

If you use Gradle, you can use the following dependency:

Implementation

'org.springframework.cloud:spring-cloud-starter-stream-kafka'

All of the source code shown in this article is available on GitHub here

In the implementation section below, we will address each of the following areas:

  1. Plane event processor
  2. Flight event consumption
  3. Error handling
Plane Event Processor

To begin our implementation, we will start with the binding responsible for converting a plane event into a flight event. The two events are defined in Java classes as shown below:

@AllArgsConstructor
@NoArgsConstructor
@Getter
class PlaneEvent {
    private String planeId;
    private String currentAirport;
}

(See full GitHub code here.)

@AllArgsConstructor
@Getter
class FlightEvent {
    private final String flightId;
    private final String currentAirport;
}

(See full GitHub code here.)

With both events defined, we can create the binding. Because we want to convert one event into another, we use a java.util.function.Function type of bean. Below you can see the implementation of the message processor:

@Component("planeEventProcessor")
@AllArgsConstructor
class PlaneEventProcessor implements Function<PlaneEvent, FlightEvent> {

    private final FlightOperations flightOperations;

    @Override
    public FlightEvent apply(PlaneEvent planeEvent) {
        String planeId = planeEvent.getPlaneId();
        Flight flight = flightOperations.findConfirmedFlightByPlaneId(planeId)
                .orElseThrow(() -> new NoFlightFoundException(String.format("No flight found for plane id %s", planeId)));
        return new FlightEvent(flight.getId(), planeEvent.getCurrentAirport());
    }
}

(See full GitHub code here.)

The processor bean delegates the business logic to the FlightOperationsFacade. If the business logic does not find the flight, an exception is raised.

In order to configure this bean to work, we need to set the following properties prefixed with spring.cloud.stream.bindings:

planeEventProcessor-in-0.destination: The topic that the bean will consume.

planeEventProcessor-in-0.group: The group id of the Kafka consumer.

planeEventProcessor-out-0.group: The destination topic that the flight events will be sent to.

Below you can see the application configuration for this bean:

      bindings:
        planeEventProcessor-in-0:
          destination: plane-events-v1
          group: flight-api
        planeEventProcessor-out-0:
          destination: flight-events-v1

(See full GitHub code here.)

Flight Event Consumption

To consume a message from the topic flight-events-v1, we will create a java.util.function.Consumer bean as shown below:

@Component("flightEventConsumer")
@AllArgsConstructor
class FlightEventConsumer implements Consumer<FlightEvent> {

    private final FlightOperations flightOperations;

    @Override
    public void accept(FlightEvent flightEvent) {
        Airport airport = new Airport(flightEvent.getCurrentAirport());
        flightOperations.flightArrivedIn(flightEvent.getFlightId(), airport);
    }
}

(See full GitHub code here.)

This consumer will receive the message and call the business logic module to take care of it. 

When there is more than one functional binding bean in a project, we need to define the property spring.cloud.stream.function.definition with a list containing each bean. Below we can see this configuration in the Flight API:

spring:
  cloud:
    stream:
      function:
        definition: planeEventProcessor;flightEventConsumer

(See full GitHub code here.)

When working with a consumer, you'll be required to define the topic that it will listen to and the group that it belongs to. You can see it configured below:

        flightEventConsumer-in-0:
          destination: flight-events-v1
          group: flight-api

(See full GitHub code here.)

Error Handling

Spring Cloud Stream makes it easy to configure and implement dead letter processing and retries.

Dead Letter Processing

A lot of times, it is useful to send a message to a special topic when an exception happens. After this, messages can be reprocessed or audited, depending on the business scenario.

With Spring Cloud Stream, we only need to add two properties prefixed with spring.cloud.stream.kafka.bindings.<binding-name>.consumer.  These are listed below:

  1. enableDlq: Property that enables dead letter processing. When an exception happens and there are no more retries configured, the message will be sent to the dead letter topic of this binding.
  2. dlqName: Name of the dead letter topic.

In the flight API, in both plane and flight event consumers, the dead letter processing will be enabled. The flight event's dead letter topic will be flight-events-dlq-v1, and the plane's dead letter topic will be plane-events-dlq-v1.

Below you can see the application configuration file for this behaviour:

      kafka:
        bindings:
          planeEventProcessor-in-0:
            consumer:
              enableDlq: true
              dlqName: plane-events-dlq-v1
          flightEventConsumer-in-0:
            consumer:
              enableDlq: true
              dlqName: flight-events-dlq-v1

(See full GitHub code here.)

Retries

Another nice feature of Spring Cloud Stream is that you can configure the number of times a message will be reprocessed. This is very useful for transient errors.

By default, all messages are processed three times, at which point they are processed successfully, sent to a dead letter topic if configured, or just dropped.

To allow you to change the number of retries, the property spring.cloud.stream.bindings.<binding-name>.consumer.maxAttempts is available.

You will also have the option to configure the maximum retry time and the time that elapses between processing attempts. To learn more about this, read the documentation section titled Retry Template and retryBackoff.

In the flight API, we will keep the default behaviour of retries. However, in the case of the plane event processor, we do not want to retry if the exception is a NoFlightFoundException. Because this error is not transient, it will continue to happen no matter how many tries are attempted.

In order to configure this, we use the spring.cloud.stream.bindings.<binding-name>.consumer.retryable-exceptions property. Below you can see how it is configured in the Flight API:

      bindings:
        planeEventProcessor-in-0:
          destination: plane-events-v1
          group: flight-api
          consumer:
            retryable-exceptions: io.henriquels25.cloudstream.demo.flightapi.plane.infra.stream.NoFlightFoundException: false

(See full GitHub code here.)

Conclusion

This article showcases how easy it can be to develop message-driven microservices using Spring Boot and how we can develop business code with minimal interference for framework code. 

There's a lot more to talk about with Spring Cloud Stream, so I recommend that you read the documentation and also check out the complete business case example in the GitHub repository