En el dinámico mundo del desarrollo de software, la capacidad de manejar datos en tiempo real y de manera eficiente se ha convertido en una necesidad crítica. Spring Cloud Stream emerge como una solución poderosa para la construcción de aplicaciones basadas en eventos, facilitando la integración con múltiples sistemas de mensajería. En este post analizaremos sus características y aprenderemos a cómo utilizarlo para integrarnos con Apache Kafka.
¿Qué es Spring Cloud Stream?
Es un framework que facilita la creación de microservicios bajo el patrón de Event-Driven, mediante la abstracción del broker de mensajería. Dicho framework está construido con Spring Boot y utiliza Spring Integration para conectarse al broker de mensajería.
Funcionamiento
Spring Cloud Stream proporciona una abstracción del broker de mensajería a través del uso de binders para la creación y recepción de mensajes, basándose en el modelo de publicador/suscriptor.
Los binders básicamente son los responsables de la integración de la aplicación con el broker de mensajería como puede ser Apache Kafka o RabbitMQ entre otros. Para que se puedan integrar con el broker tenemos que definir los bindings, que es donde se especifican cada uno de los destinos (tópico o cola) tanto de entrada como de salida.
Ventajas
La capacidad de abstracción de Spring Cloud Stream para abstraerse del broker permite:
- Integrar nuestra aplicación con distintos brokers de manera sencilla: Esto permite que la aplicación que estemos desarrollando pueda integrarse por ejemplo con Apache Kafka y RabbitMQ al mismo tiempo, simplemente configurando sus binders.
- Cambiar de un broker a otro sin necesidad de modificar el código: Esto es posible gracias a los binders, de manera que cuando queramos que nuestra aplicación se integre con otro broker tan solo tenemos que cambiar el binder.
Configuración
- Para poder la última versión de Spring Cloud Stream en nuestro proyecto tenemos que tener Spring Boot 3 y por tanto Java 17 o superior
- Tener las dependencias del framework y del binder correspondiente al broker con el que nos vamos a integrar, en este caso Apache Kafka.
Podemos añadirlas con Spring Initalizr
O directamente a nuestro proyecto:
-
Spring Cloud Stream
Maven:
org.springframework.cloud
spring-cloud-stream
Gradle:
implementation ‘org.springframework.cloud:spring-cloud-stream’
-
Kafka Binder:
Maven:
org.springframework.cloud
spring-cloud-stream-binder-kafka
Gradle:
implementation ‘org.springframework.cloud:spring-cloud-stream-binder-kafka’
En caso de trabajar con Kafka Streams tenemos que importar el binder spring-cloud-stream-binder-kafka-streams.
Propiedades
Para poder integrarnos con Apache Kafka tenemos que configurar el binder en nuestro fichero de configuración. Veamos un ejemplo de una configuración para un Kafka local:
Establecemos un binder por defecto en la propiedad spring.cloud.stream.defaultBinder
Creamos la configuración para ese binder en la propiedad spring.cloud.stream.binders
Como podemos observar las propiedades se definen en el spring.cloud.stream.kafka.binder, en el ejemplo vemos que se han configurado las siguientes:
- brokers: Lista de brokers. Por defecto: localhost
- requiredAcks: El número de acks requeridos. Por defecto 1
- minPartitionCount: Solo funciona si la propiedad autoCreateTopics o autoAddPartitions está definida. Es el mínimo número de particiones que el binder configura en los tópicos. Por defecto 1
- replicationFactor: El factor de realización de los tópicos autocreados está activo si la propiedad autoCreateTopics está activa. Por defecto 1
- autoCreateTopics: Crea automáticamente los tópicos. Por defecto true
- autoAddPartitions: Crea automática nuevas particiones si es necesario. Por defecto false
- security.protocol: Protocolo de seguridad
- serializer: El tipo de serialización para el valor del mensaje
Producción y consumo de mensajes
Spring Cloud Stream puede implementarse mediante anotaciones o a través de la definición de funciones, tal como veremos a continuación:
Productor
Tan solo tenemos que crear un bean de tipo Supplier
Consumidor
En este caso tenemos que crear un bean de tipo Consumer
Al arrancar la aplicación vemos que el productor envía los mensajes al tópico nuevos-pedidos y el consumidor los consume correctamente tal como podemos ver en las trazas y en la consola :
Así de sencillo es crear un consumidor y productor con Spring Cloud Stream, sin embargo también es posible crear un bean que procese los mensajes, en otras palabras que sea capaz tanto de recibirlos como de producirlos.
Procesador
Para ello tenemos que hacer uso de Function
Tal como vemos en el ejemplo, recibimos los eventos de los pedidos en el tópico nuevos-pedidos pero ahora actualizamos el estado a PROCESADO y enviamos los mensajes al tópico pedidos-procesados.
Definición de bindings y declaración de funciones
Para hacer que el framework sea consciente de las funciones, tenemos que declararlas en nuestro fichero de configuración mediante la propiedad spring.cloud.function.definition, así como definir los bindings para asociar cada función con su destino (tópico):
Uso de StreamBridge para la producción de mensajes
Como podemos observar en el ejemplo, el productor envía los mensajes cada segundo por defecto pero puede que necesitamos que los mensajes se envíen cuando sucede un evento en concreto, para ello el framework nos proporciona la clase StreamBridge:
A continuación solo crearemos el evento del pedido cada vez que llamemos al método POST del API:
Inyectamos la clase StreamBridge en el controlador y posteriormente hacemos uso del método send enviar el pedido, indicando el binding de salida.
Como podemos observar, cada vez que realizamos la llamada se crea el evento, que envía los mensajes al tópico de nuevos-pedidos
Asignación de key a los mensajes
Como podemos observar en los ejemplos anteriores, podemos enviar y producir los mensajes solo indicando el tipo de objeto, sin embargo, solo se envía el valor del mensaje y no su clave ya que es nulo:
Para solventarlo disponemos de la interfaz Message y de la clase MessageBuilder con la que podemos construir los mensajes de manera personalizada.
Volviendo al ejemplo anterior vamos a asignar una key de tipo String, por tanto, lo primero que tenemos que hacer es configurar el tipo de serialización tanto para la key como para el valor:
Posteriormente hacemos uso la clase MessageBuilder para construir el mensaje, utilizando los métodos withPayload para el valor del mensaje y setHeader para la clave, en este caso como estamos trabajando con Kafka tendremos que usar la constante KEY de la clase KafkaHeaders para especificar que el header es de tipo Kafka key.
Ahora ya cuando producimos los mensajes ya cuentan con una clave:
Testing
Spring Cloud Stream cuenta con sus propias anotaciones y clases para poder crear tests de integración, a continuación, veremos cómo crearlos usando un Kafka embebido.
Configuración
Lo primero que tenemos que hacer es tener las siguientes dependencias:
- Test Binder:
Maven:
org.springframework.cloud
spring-cloud-stream-test-binder
test
Gradle:
testImplementation(“org.springframework.cloud:spring-cloud-stream-test-binder”)
- Spring Cloud Stream Test Support:
Maven:
org.springframework.cloud
spring-cloud-stream-test-support
test
Gradle:
testImplementation(“org.springframework.cloud:spring-cloud-stream-test-support”)
- Spring Kafka Test:
En este caso como estamos trabajando con Apache Kafka tenemos que agregar también la dependencia, dado que la necesitaremos para poder crear un Kafka embebido con el que el test binder se va a integrar.
Maven:
org.springframework.kafka
spring-kafka-test
test
Gradle:
testImplementation(“org.springframework.kafka:spring-kafka-test”)
Creación del test
En primer lugar, tenemos que añadir la anotación @EnableTestBinder a nuestro test y también la de @EmbeddedKafka para poder configurar nuestro Kafka embebido, aquí podemos indicar propiedades como por ejemplo el puerto donde se levanta…etc
Asimismo, inyectamos en el test las clases InputDestination y OutputDestination que también nos proporciona el framework.
Probamos la función process que hemos definimos antes, la cual recibe un pedido y lo devuelve cambiando el estado a “procesado”.
En primer lugar, creamos un pedido y lo enviamos al tópico de entrada; “nuevos-pedidos”, de manera que preparamos el mensaje de entrada que espera la función. Para ello creamos un message de tipo pedido y con el inputDestination lo enviamos.
Posteriormente simulamos la respuesta, haciendo uso del outputDestination y con el método receive indicamos el timeout y el tópico de salida; “pedidos-procesados”.
Por último, utilizamos el ObjectMapper para deserializar la respuesta ya que se trata de un array de bytes y finalmente hacemos los asserts para comprobar que el estado es procesado y que el resto de campos viajan correctamente.
Conclusión
En definitiva, Spring Cloud Stream simplifica mucho la creación de microservicios bajo el patrón Event-Driven debido a su capacidad para abstraerse del broker de mensajería haciendo que nuestras aplicaciones sean más mantenibles y escalables, dado que el código está totalmente desacoplado del broker.
Os animo a implementarlo en vuestros proyectos por las ventajas que ofrece y por su facilidad.