In the previous part, we have tried Spring Cloud Stream pre-built component such as Sink, Source and Processor for building message driven microservices.
In this part, we will look at how to create custom binding classes with custom channels for publishing and retrieving messages with RabbitMQ.
Setting up the publisher application
The publisher application is almost similar as the previous article except the bindings and related configurations.
The previous article uses Source class (Spring Cloud Stream built-in component) for configuring the output message channel (@Output) for publishing messages. Here we are not going to use the built-in component and we will be developing a custom output binding class to build and configure the output message channel.
import org.springframework.cloud.stream.annotation.Output; | |
import org.springframework.messaging.MessageChannel; | |
public interface OrderSource | |
{ | |
String OUTPUT = "orderPublishChannel"; | |
@Output(OUTPUT) | |
MessageChannel create(); | |
} |
we have declared a custom Source class with “orderPublishChannel” as the output message channel.
Now we need to bind this OrderSource class in the OrderController.
@Slf4j | |
@EnableBinding(OrderSource.class) | |
@RestController | |
public class OrderController | |
{ | |
@Autowired | |
private OrderSource source; | |
@PostMapping("/orders/publish") | |
public String publishOrder(@RequestBody Order order) | |
{ | |
source.create().send(MessageBuilder.withPayload(order).build()); | |
log.info(order.toString()); | |
return "order_published"; | |
} | |
} |
source.create() will configure the output message channel whose name is “orderPublishChannel“. The published messages will be delegated to the RabbitMQ exchange through the “orderPublishChannel“.
We need to change the application.properties based on the channel name as follows.
spring.cloud.stream.bindings.orderPublishChannel.destination=orders-exchange
Now we have completed the development of the publisher application with custom source bindings for publishing messages. Lets move forward with developing the consumer application.
Setting up the consumer application.
The consumer application is almost similar as the previous article except the bindings and related configurations.
The previous article uses Sink class (Spring Cloud Stream built-in component) for configuring the input message channel (@Input) for retrieving messages. Here we are not going to use the built-in component and we will be developing a custom input binding class to build and configure the input message channel.
we will create a OrderSink class that will configure a input message channel for retrieving messages. The logical name of the channel has been named as “orderReceiveChannel“.
import org.springframework.cloud.stream.annotation.Input; | |
import org.springframework.messaging.SubscribableChannel; | |
public interface OrderSink | |
{ | |
String INPUT = "orderReceiveChannel"; | |
@Input(INPUT) | |
SubscribableChannel receive(); | |
} |
Now we need to bind this custom OrderSink class to the OrderListener as follows. If you observe the class properly, you can note that @StreamListener is targeting the “orderReceiveChannel” for streaming new messages.
import lombok.extern.slf4j.Slf4j; | |
import org.springframework.cloud.stream.annotation.EnableBinding; | |
import org.springframework.cloud.stream.annotation.StreamListener; | |
@Slf4j | |
@EnableBinding(OrderSink.class) | |
public class OrderListener | |
{ | |
@StreamListener(target = OrderSink.INPUT) | |
public void listenForOrder(Order order) | |
{ | |
log.info(" received new order ["+order.toString()+"] "); | |
} | |
} |
@StreamListener will listen to the subscribed queue through the “orderReceiveChannel“.
The related properties in the application.properties should be changed based on the newly created input channel name.
spring.cloud.stream.bindings.orderReceiveChannel.destination=orders-exchange spring.cloud.stream.bindings.orderReceiveChannel.group=orders-queue
Now we have completed the development of the subscriber application with custom source bindings for retrieving messages.
Testing and Running applications.
Here i am assuming that you have successfully developed both publisher and consumer applications and they are up and running. In addition, the RabbitMQ server should also be up and running. Lets publish a message and check whether our custom bindings are working as expected. The steps and process of testing the applications are exactly similar to the previous article. here i will repeat it again for your convenience.
Publishing the message(s)
Lets publish the message.
If you check the log of the publisher application, you can see that the message has been successfully published.
Receiving the message(s)
Now check the log of the Consumer application. You can see that the message is correctly delivered to the consumer application as follows.
The Source Code
The complete source code has been added to the GitHub. Click here to download.
If you have any query, please feel free to drop a message.
One thought on “Message Driven Microservices with Spring Cloud Stream and RabbitMQ (Publish and Subscribe messages) with custom bindings – Part 2”