In this article, i am not going to explain the basics of Spring Cloud Stream OR the process of creating publishers and subscribers. Those have been clearly described in the Part 1 and Part 2 of this article series.
It is possible to send messages with headers. In the receiving end (consumer application), there can be multiple message handlers (@StreamListener annotated methods) those accepts messages based on the headers of the message.
A copy of the message will be sent to every handler method and they will accept the message if it matches the given condition . The condition is a SpEL expression (Spring Expression Language) that performs checks on header values. The sample condition is given as follows.
@StreamListener(target = OrderSink.INPUT,condition = "headers['payment_mode']=='credit'")
(Please refer the source code the complete code)
In that way, you can use the headers to route messages (message routing) among multiple message handlers. Here we will look at, how to deliver the messages to the correct recipient based on the header.
Continue reading “Message Driven Microservices with Spring Cloud Stream and RabbitMQ (Publish and Subscribe messages) – using @StreamListener for header based routing – Part 3”
In the previous part, we have tried Spring Cloud Stream pre-built component such as Sink, Source and Processor for building message driven microservices.
In this part, we will look at how to create custom binding classes with custom channels for publishing and retrieving messages with RabbitMQ.
Setting up the publisher application
The publisher application is almost similar as the previous article except the bindings and related configurations.
The previous article uses Source class (Spring Cloud Stream built-in component) for configuring the output message channel (@Output) for publishing messages. Here we are not going to use the built-in component and we will be developing a custom output binding class to build and configure the output message channel.
we have declared a custom Source class with “orderPublishChannel” as the output message channel.
Now we need to bind this OrderSource class in the OrderController.
source.create() will configure the output message channel whose name is “orderPublishChannel“. The published messages will be delegated to the RabbitMQ exchange through the “orderPublishChannel“.
We need to change the application.properties based on the channel name as follows.
Now we have completed the development of the publisher application with custom source bindings for publishing messages. Lets move forward with developing the consumer application.
Setting up the consumer application.
The consumer application is almost similar as the previous article except the bindings and related configurations.
The previous article uses Sink class (Spring Cloud Stream built-in component) for configuring the input message channel (@Input) for retrieving messages. Here we are not going to use the built-in component and we will be developing a custom input binding class to build and configure the input message channel.
Continue reading “Message Driven Microservices with Spring Cloud Stream and RabbitMQ (Publish and Subscribe messages) with custom bindings – Part 2”
What is Spring Cloud Stream?
Spring Cloud Stream is a framework that helps in developing message driven or event driven microservices. Spring Cloud Stream uses an underlying message broker (such as RabbitMQ or Kafka) that is used to send and receive messages between services.
When i am writing this article, there are two implementations of the Spring Cloud Stream.
- Spring Cloud Stream implementation that uses RabbitMQ as the underlying message broker.
- Spring Cloud Stream implementation that uses Apache Kafka as the underlying message broker.
High Level Overview of Spring Cloud Stream
An application defines Input and Output channels which are injected by Spring Cloud Stream at runtime. Through the use of so-called Binder implementations, the system connects these channels to external brokers.
The difficult parts are abstracted away by Spring, leaving it up to the developer to simply define the inputs and outputs of the application. How messages are being transformed, directed, transported, received and ingested are all up to the binder implementations. (e.g:- RabbitMQ or Kafka)
Continue reading “Message Driven Microservices with Spring Cloud Stream and RabbitMQ (Publish and Subscribe messages) with Sink, Source and Processor bindings – Part 1”
You can refer the previous parts of this article as follows.
Click here for Part 1
Click here for Part 2
In the previous article (Part 2 of this series), we have discussed how to use Spring Cloud Bus to broadcast the refresh event ( /actuator/bus-refresh) across all the connected services. In here the refresh event should be manually triggered on any service that is connected to the Spring Cloud Bus. (You can select any service as you wish. The only requirement is that it should connect to the Spring Cloud Bus).
The main problem here is that whenever the properties are changed, the refresh event should be manually triggered. Even if it is for just one service, it is still a manual process. What will happen if the developer forgets to manually trigger the refresh event after updating the properties in the remote repository?
Wouldn’t be nicer if there is any way to automate this refresh event triggering whenever the remote repository is changed. In order to achieve this, the config server may need to listen for the events of the remote repository. This can be done with webhook event feature provided by the remote repository providers.
Here is the architecture of the proposed solution.
Continue reading “Spring Cloud Config : Using Git Webhook to Auto Refresh the config changes with Spring Cloud Stream, Spring Cloud Bus and RabbitMQ (Part 3)”
You can refer the part 1 of this article as follows.
Click here for Part 1
The previous article (click here to visit it) has described how to use Spring Cloud Config Server as a centralized location for keeping the configuration properties related to the application services (microservices). The application services will act as Config Clients who will communicate with Config Server to retrieve the properties related to them.
If any property is changed, the related service need to be notified by triggering a refresh event with Spring Boot Actuator (/actuator/refresh). The user will have to manually trigger this refresh event. Once the event is triggered, all the beans annotated with @RefreshScope will be reloaded (the configurations will be re-fetched) from the Config Server.
In a real microservice environment, there will be a large number of independent application services. Therefore is it not practical for the user to manually trigger the refresh event for all the related services whenever a property is changed.
Continue reading “Spring Cloud Config : Refreshing the config changes with Spring Cloud Bus (Part 2)”
In previous article, we have discussed how to use Spring Cloud Bus to broadcast the configuration property changes (occurred in the Spring Cloud Config Server) across distributed services.
Spring Cloud Bus links or connects the distributed services through a lightweight message broker such as Kafka or RabbitMQ. whenever the refresh event is triggered in one service, Spring Cloud Bus will broadcast the refresh event across multiple services (known as Config Clients).
Therefore every Config Client should connect to the underlying message broker (that can be either RabbitMQ or Kafka) of the Spring Cloud Bus to listen for the refresh events published/broadcasted. This will lead every Config Client to keep a connection with message broker implemented in the Spring Cloud Bus.
Continue reading “Spring Cloud Bus: Centralizing Message Broker (RabbitMQ or Kafka) connection properties with Spring Cloud Config Server”
In this article, we will be developing a RabbitMQ Consumer application for the Publisher application developed at previous article (named as part 1).
Click here to view the Publisher Application related to this article.
We have already developed a consumer application to consume the messages published by the above consumer application. In that consumer application we have focused on retrieving the publish messages manually. In other words, pulling the messages from the RabbitMQ queue whenever the client application needs them. If you need to look at that article, please click here.
In this article, we will be focusing on receiving the messages from queues with a listener. The consumer application will be listening for one or more queues for the incoming messages. whenever an incoming message is delivered to one of those queues, that message will be delivered to the consumer application through the listener (available in the consumer application).
This application is very much similar to the Consumer application we have developed or part 2 of this article series. Therefore all the configuration related classes will be same and i am not going to repeatedly explain them here. If you want know the further details, please refer the explanations in those articles.
The full source code of this application can be found at GitHub.
Click here to download the source code.
Open the source code directory called consumer-listener. In there you can see the source codes related to this article.
I will explain the most critical component here. That is the ConsumerService.
ConsumerService will be listening to two queues known as “all_cars_queue” and “nissan_cars_queue“. whenever a message is delivered to any of those queues, that message will be delivered to this consumer service class.