Spring Cloud Stream é um framework que auxilia desenvolvedores com problemas de integração de dados, especialmente em aplicações orientadas a eventos. O objetivo da estrutura é facilitar a aplicação de padrões modernos de arquitetura de microsserviços, dissociar as responsabilidades do aplicativo e portar a lógica de negócios para os agentes de mensagens.

Neste artigo, mostrarei como é fácil implementar o Spring Cloud Stream e como ele pode ser não invasivo no código do aplicativo. Esses benefícios permitem que os desenvolvedores se concentrem no que importa: o negócio.

Abordaremos os seguintes tópicos:

  1. Ligações
  2. Um modelo de programação funcional
  3. Um caso de negócios (uma API de voo)
  4. Implementando a API de voo com Spring Cloud Stream e Kafka
  5. Manipulação de erros

Ligações

Uma ligação (binding) é uma ponte entre o aplicativo e os sistemas de mensagens externos (Kafka, RabbitMQ, etc). A ligação envia e recebe mensagens.

Uma ligação é abstrata e o aplicativo não deve saber ou se importar se é uma implementação Kafka ou RabbitMQ.

Imagem cortesia de docs.spring.io.

Na imagem acima, você pode ver que o núcleo da aplicação depende apenas do binder (ligação) e que o middleware é responsável pelos detalhes específicos do broker (Kafka, RabbitMQ, etc).

Para saber mais sobre isso, leia a seção Bindings na documentação do projeto.

Modelo de Programação Funcional

Desde Spring Cloud Stream v2.1, é possível definir ligações criando beans dos tipos:

java.util.function.[Supplier/Function/Consumer].

Abaixo você pode ver uma ligação que recebe uma String de um canal (tópico, fila), a transforma em maiúscula e a envia para outro canal:

Há também a forma de definição de vínculos baseada em anotações, que é considerada uma forma herdada e não é recomendada para novos aplicativos. Para saber mais sobre isso, você pode ler esta seção na documentação.

Um caso de negócios: API de voo

Para ter uma implementação de exemplo neste post, usaremos o seguinte caso de negócios:

Há a necessidade de um aplicativo que possa gerenciar voos.

Este aplicativo armazenará todas as informações do voo, incluindo o avião, o ponto de partida do voo e o destino. O programa receberá uma mensagem toda vez que um avião chegar a um aeroporto. Assim que um avião chega ao seu destino, o aplicativo deve alterar o status do voo para "chegou".

Mais especificamente, o serviço receberá um evento de avião em um tópico do Kafka chamado plane-events-v1, então ele deverá converter o evento de avião em um evento de voo, enviando-o para o tópico flight-events-v1. Conforme mostrado abaixo, o microsserviço também deve consumir informações do flight-events-v1, alterando o status do voo para "chegou" caso o voo tenha chegado ao seu destino.

Implementando a API Flight com Spring Cloud Stream e Kafka

Para implementar a API de voo, usaremos o Spring Cloud Stream com Kafka e MongoDB.

Em um aplicativo Spring Boot, você pode adicionar o iniciador spring-cloud-starter-stream-kafka para trabalhar com Kafka e Spring Cloud Stream.

Se você usa o Gradle, pode utilizar a seguinte dependência:

Implementação

'org.springframework.cloud:spring-cloud-starter-stream-kafka'

Todo o código-fonte mostrado neste artigo está disponível no GitHub aqui.

Na seção de implementação abaixo, abordaremos cada uma das seguintes áreas:

  1. Processador de eventos de avião;
  2. Consumo de eventos de voo;
  3. Manipulação de erros.

Processador de eventos de avião

Para começar a nossa implementação, iniciaremos com a vinculação responsável por converter um evento de avião em um evento de voo. Os dois eventos são definidos em classes Java conforme mostrado abaixo:

(Veja o código completo do GitHub aqui.)

(Veja o código completo do GitHub aqui.)

Com ambos os eventos definidos, podemos criar a vinculação. Como queremos converter um evento em outro, usamos um tipo de bean java.util.function.Function. Abaixo você pode ver a implementação do processador de mensagens:

(Veja o código completo do GitHub aqui.)

O bean do processador delega a lógica de negócios ao FlightOperationsFacade. Se a lógica de negócios não encontrar o voo, uma exceção será gerada.

Para configurar este bean para funcionar, precisamos definir as seguintes propriedades prefixadas com spring.cloud.stream.bindings:

planeEventProcessor-in-0.destination: O tópico que o bean consumirá.

planeEventProcessor-in-0.group: O ID do grupo do consumidor Kafka.

planeEventProcessor-out-0.group: O tópico de destino para o qual os eventos de voo serão enviados.

Abaixo você pode ver a configuração da aplicação para este bean:

(Veja o código completo do GitHub aqui.)

Consumo de eventos de voo

Para consumir uma mensagem do tópico flight-events-v1, criaremos um bean java.util.function.Consumer conforme mostrado abaixo:

Esse consumidor receberá a mensagem e chamará o módulo de lógica de negócios para cuidar dela.

Quando há mais de um bean de ligação funcional em um projeto, precisamos definir a propriedade spring.cloud.stream.function.definition com uma lista contendo cada bean. Abaixo podemos ver esta configuração na API de voo:

Ao trabalhar com um consumidor, você deverá definir o tópico que ele ouvirá e o grupo ao qual ele pertence. Você pode vê-lo configurado abaixo:

Manipulação de erros

O Spring Cloud Stream facilita a configuração e implementação do processamento de mensagens mortas e novas tentativas.

Processamento de cartas mortas

Muitas vezes, é útil enviar uma mensagem para um tópico especial quando ocorre uma exceção. Depois disso, as mensagens podem ser reprocessadas ou auditadas, dependendo do cenário de negócios.

Com o Spring Cloud Stream, precisamos apenas adicionar duas propriedades prefixadas com spring.cloud.stream.kafka.bindings.<binding-name>.consumer. Estes estão listados abaixo:

enableDlq: Propriedade que habilita o processamento de mensagens mortas. Quando ocorrer uma exceção e não houver mais tentativas configuradas, a mensagem será enviada para o tópico de mensagens mortas dessa ligação.
dlqName: Nome do tópico de mensagens mortas.
Na API de voo, em consumidores de eventos de avião e voo, o processamento de mensagens mortas será ativado. O tópico de mensagens mortas do evento de voo será flight-events-dlq-v1, e o tópico de mensagens mortas do avião será plane-events-dlq-v1.

Abaixo você pode ver o arquivo de configuração do aplicativo para este comportamento:

Novas tentativas

Outro recurso interessante do Spring Cloud Stream é que você pode configurar o número de vezes que uma mensagem será reprocessada. Isso é muito útil para erros transitórios.

Por padrão, todas as mensagens são processadas três vezes, sendo eventualmente processadas com sucesso, enviadas para um tópico de mensagens mortas, se configurado, ou simplesmente descartadas.

Para permitir que você altere o número de tentativas, a propriedade spring.cloud.stream.bindings.<binding-name>.consumer.maxAttempts está disponível.

Você também terá a opção de configurar o tempo máximo de repetição e o tempo decorrido entre as tentativas de processamento. Para saber mais sobre isso, leia a seção de documentação intitulada Retry Template.

Na API de voo, manteremos o comportamento padrão de novas tentativas. No entanto, no caso do processador de eventos de avião, não queremos tentar novamente se a exceção for uma NoFlightFoundException. Como esse erro não é transitório, ele continuará a ocorrer, não importa quantas tentativas sejam feitas.

Para configurar isso, usamos a propriedade spring.cloud.stream.bindings.<binding-name>.consumer.retryable-exceptions. Abaixo você pode ver como ele está configurado na API de voo:

Conclusão

Este artigo mostra como pode ser fácil desenvolver microsserviços orientados a mensagens usando Spring Boot, e como podemos desenvolver código de negócios com interferência mínima em código de estrutura.

Há muito mais sobre o que falar com o Spring Cloud Stream, então recomendo que você leia a documentação e também confira o exemplo de caso de negócios completo no repositório do GitHub.


Autor

Henrique Schmidt