Category: Spring AQMP

Spring AMQP / RabbitMQ Topic Exchange Example Part 3 – Consumer Application (Automatically receiving messages with Listener)

In this article, we will be developing a RabbitMQ Consumer application for the Publisher application developed at previous article (named as part 1).

Click here to view the Publisher Application related to this article.

We have already developed a consumer application to consume the messages published by the above consumer application. In that consumer application we have focused on retrieving the publish messages manually. In other words, pulling the messages from the RabbitMQ queue whenever the client application needs them.  If you need to look at that article, please click here. 

In this article, we will be focusing on receiving the messages from queues with a listener.  The consumer application will be listening for one or more queues for the incoming messages. whenever an incoming message is delivered to one of those queues, that message will be delivered to the consumer application through the listener (available in the consumer application).

This application is very much similar to the Consumer application we have developed or part 2 of this article series. Therefore all the configuration related classes will be same and i am not going to repeatedly explain them here. If you want know the further details, please refer the explanations in those articles.

The full source code of this application can be found at GitHub.

Click here to download the source code. 

Open the source code directory called consumer-listener. In there you can see the source codes related to this article.

I will explain the most critical component here. That is the ConsumerService.

ConsumerService will be listening to two queues known as all_cars_queue” and nissan_cars_queue“. whenever a message is delivered to any of those queues, that message will be delivered to this consumer service class. 

 

 

 

Spring AMQP / RabbitMQ Topic Exchange Example Part 2 – Consumer Application (Manually retrieving/pulling messages from queues)

This is the second part of the article that is focused on retrieving the RabbitMQ messages  manually (Simply pulling the messages from queues whenever the client application needs them).

Click here to view the first part of this article. 

As you can see, in the first part, we have developed a Producer application. In this part, we will be developing a Consumer application to retrieve messages from the queue manually (whenever we need them)

The full source code of this article series can be found at GitHub.

Click here to Download the Source Code. 

Open the source code directory called consumer-manual-pull. In there you can see the source codes related to this article.

 

Consumer Application

Here is the project structure.

Screen Shot 2017-11-12 at 8.13.40 PM.png

 

first make sure that you have defined the valid RabbitMQ server details in the application.properties file. These information will be used by the ConnectionFactory class to make a connection with RabbitMQ server when the RabbitTemplate is used.

 

This is the RabbitMQ configuration class and we have configured the RabbitTemplate here.

 

As you are already familiar, we have done set of configurations for RabbitTemplate including the message converters. The only strange code here is the  DefaultClassMapper bean.  In order to understand the reason behind this bean, please refer the message segement.

Screen Shot 2017-11-12 at 8.22.22 PM.png

In the MessageProperties, you can see that the __TypeId__  is set to following class declaration. This __TypeId__  information is added by the message converter defined in the Producer application when the POJO is transformed to the JSON format.

 com.springbootdev.examples.producer.model.Car 

When the producer convert the POJO message to the JSON, the type information is added as a header information of the Message Properties.  When the consumer receive the message, the message converter tries to convert the JSON message into the given type in the header.  You can see that the Consumer application has a POJO class called Car. But it is in a different package. (Not in the same package as the header information declared)  So we need to do the class mapping to inform message converter that where the relevant POJO for the  message type is located.

idClassMapping.put("com.springbootdev.examples.producer.model.Car", Car.class);

Then the message converter can convert the incoming message to the relevant POJO format.

 

Now lets look at our consumer service used for receiving messages.

 

for the demonstration purpose, we will be receiving messages from “all_cars_queue“.  we are using spring scheduled method to retrieve the message from the queue for every 5 seconds.

receiveAndConvert method of the RabbitTemplate will be used to pull/retrieve messages from queue. based on the FIFO principal, we will get the messages from the queue.

 Object object = rabbitTemplate.receiveAndConvert(QUEUE_NAME);

You can run and see the output with following command. This application is developed to be run as a command line application.

mvn spring-boot:run

 

When you run the application You will see the following output in the terminal.

Screen Shot 2017-11-12 at 8.47.56 PM.png

No we have developed the Consumer application that is used to retrieve/pull messages manually.

If you are looking for a way to listen to the queue and retrieve messages automatically ( whenever the queue get a message), please refer the part 3 of this article series.

RabbitMQ Consumer Application (Part 3) – Listening for the message queues and automatically receiving the messages when they are delivered to queues.

 

Spring AMQP / RabbitMQ Topic Exchange Example Part 1 – Producer Application

As we are already aware, there are four types of RabbitMQ message exchanges are available.  They can be listed as follows.

  1. Direct Exachange
  2. Topic Exchange
  3. Fanout Exchange
  4. Header Exchange

In this Article, we are going to look at Topic exchange.

What is Topic Exchange?

Same as Direct, but wildcards are allowed in the binding key. ‘#‘ matches zero or more dot-delimited words and ‘*‘ matches exactly one such word.

  • * (star) can substitute for exactly one word.
  • # (hash) can substitute for zero or more words.

Topic exchanges route messages to one or many queues based on matching between a message routing key and the pattern that was used to bind a queue to an exchange. The topic exchange type is often used to implement various publish/subscribe pattern variations. Topic exchanges are commonly used for the multicast routing of messages.

Here is what we are going to do

 

Screen Shot 2017-11-12 at 4.56.34 AM.png

 

In this example we are trying to sort the messages and direct into the right queue based on the manufacturer of the car. we will be getting a huge amount of messages about registered cars in the city. The routing key pattern of every message is as follows.

manufacturer_name.car.type

As you can see, we have created a Topic exchange called vehicle_exchange. In addition, we have created three queues and bound them to the Topic exchange (vehicle_exchange) with following binding keys.

  • toyota_cars_queue  ( binding key – toyota.cars.* )

toyota_cars_queue is bound to the exchange with wildcard biding key toyota.cars.*  This means that the any message starting with routing key toyota.cars and ending with exactly any text should be directed to this queue (toyota_cars_queue).

  • nissan_cars_queue  ( binding key – nissan.cars.* )

In the same way, any message starting with routing key nissan.cars and ending with exactly any text should be directed to the nissan_cars_queue.

  • all_cars_queue  ( binding key – *.cars.#)

According to the binding key, the routing key should start with any text (exactly one) and followed by cars keyword. it can be ended with zero or more texts.

e.g:-  The following set of routing keys can be identified as valid routing key for this binding pattern.

nissan.cars ,  toyota,cars  , any.cars  , anything.cars.everything , anything.cars.anything.and.everything

The following table will show you a list of routing keys and their destination queue(s)

 

routing key delivered to queue(s)
nissan.cars.japan nissan_cars_queue and all_cars_queue
nissan.cars all_cars_queue
toyota.cars.japan.manufactured all_cars_queue
japan.toyota.cars No matching queue. message will be discard
import.nissan.cars.from.japan No matching queue. message will be discard
toyota.cars.manufatured toyota_cars_queue and all_cars_queue
no.latest.cars.toyota No matching queue. message will be discard

Now i believe that you have a clear understanding of how these wildcard binding keys are working with topic exchange.

 

Topic exchange : More to consider

Topic exchange is powerful and can behave like other exchanges.

When a queue is bound with “#” (hash) binding key – it will receive all the messages, regardless of the routing key – like in fanout exchange.

When special characters “*” (star) and “#” (hash) aren’t used in bindings, the topic exchange will behave just like a direct one.

 

Setting UP RabbitMQ Server

 

  • Creating the exchange

Screen Shot 2017-11-12 at 2.07.48 AM.png

 

  • Creating the toyota_cars_queue and bind it to the vehicle_exchange.

Screen Shot 2017-11-12 at 2.33.41 AM.png

 

Screen Shot 2017-11-12 at 2.35.05 AM.png

 

  • Creating the nissan_cars_queue and bind it to the vehicle_exchange.

 

Screen Shot 2017-11-12 at 2.37.40 AM.png

 

Screen Shot 2017-11-12 at 2.38.41 AM.png

 

  • Creating the all_cars_queue and binding it to the vehicle_exchange

 

Screen Shot 2017-11-12 at 2.39.41 AM.png

 

Screen Shot 2017-11-12 at 2.40.13 AM.png

 

Now we have created all  three queues and bound them to the Topic exchange that we have created. Now it is the time to develop a publisher and subscriber application for publish and receive messages from this RabbitMQ server set up.

 

Producer and Consumer Application

Before moving forward, i need to inform you that the fully source code for the Producer and Consumer application can be found at GitHub. You may clone the source code and continue with the article.

click here to download the source code of the application

 

Producer Application

Here is the project structure of the Producer Application.

Screen Shot 2017-11-12 at 2.14.26 PM.png

 

first make sure that you have defined the valid RabbitMQ server details in the application.properties file. These information will be used by the ConnectionFactory class to make a connection with RabbitMQ server when the RabbitTemplate is used.

 

 

Now check whether you have declared/configured the required beans to use Spring AMQP feature with RabbitMQ.

 

As i have mentioned earlier, ConnectionFactory will use the RabbitMQ server details declared in the application.properties to make a connection with RabbitMQ server when the RabbitTemplate is used.

We will be trying to send the POJO instance ( Car object) as the message body. we cannot send it as a java object and we need to convert it into some other common exchange format. We have chosen the JSON as the common exchange format and   RabbitTemplate will use the  Jackson2JsonMessageConverter as a the message converter for converting the POJO into the JSON format.

 

Now we will look at the ProducerService

 

As per the above table. we are having seven routing keys and we are going to publish messages with those routing keys to the exchange called “vehicle_exchange“.  When you run the Producer application, the output will be something like below.

mvn spring-boot:run

 

Screen Shot 2017-11-12 at 6.03.53 PM.png

Now all the messages are published to the RabbitMQ server. So we can check whether the messages are delivered to the relevant queues.

Screen Shot 2017-11-12 at 6.05.41 PM.png

Now we are done with the Producer application for publishing messages for RabbitMQ Topic exchange.  Fully source code (both producer and consumer) of this article can be found at GitHub.  If you need further explanation of writing RabbitMQ consumer application, it is highly recommended to look at one of the below articles.

Spring AMQP / RabbitMQ : Manually pull message(s) from RabbitMQ queue

In the previous article (click here to view the article), consumer application listen to the queue using a listener. Therefore whenever the queue receive a message, the listener who is listening to the queue will get notified. So that the Consumer application will automatically get the message with the help of the listener.

In this article,  the Consumer application will be developed in a way to pull the messages from the queue manually. that means the message(s) will be manually pulled whenever the client application needs them. (I am emphasizing it again that we are not going to use any queue Listener here)

The source code of the Producer application of the previous article (click here to go to the previous article) can be used as it is. But there will be a minor modification for the Consumer application.  The Consumer.java class should be changed as follows to manually retrieve/pull the messages from the defined queue.

 

 

You can see that the Consumer does not listen to the queue and receive messages automatically. It will use the receiveAndConvert() method of the RabbitTemplate to receive messages from the queue. We can set the queue name and messages will be pulled from that queue . The receiveAndConvert()  will use the message converter set in the RabbitTemplate to convert the received message into the desired POJO (Java Object) format. It will be retrieving the messages from the Order Queue based on FIFO principle.

The complete source code of this article can be found at GitHub.

You can download the source and just build and run the application.

Spring Boot and RabbitMQ Direct Exchange Example : Messaging Custom Java Objects and Consumes with a Listener

As we are already aware, there are four types of RabbitMQ message exchanges are available.  They can be listed as follows.

  1. Direct Exachange
  2. Topic Exchange
  3. Fanout Exchange
  4. Header Exchange

 

In this article, i am expecting to demonstrate about how to use direct exchange in RabbitMQ with Spring Boot.  Before moving with the article it is required to have a up and running RabbitMQ server in your development environment.

If not please visit the https://www.rabbitmq.com/download.html  and installed the latest version in your development environment.

If you do not wish to install the RabbitMQ Server, you can use a Docker image. Purpose of this article is to demonstrate how the RabbitMQ message broker is effectively used within Spring Boot. I will be composing next article on how to use RabbitMQ Docker image in your development environment. So for the time being, lets move with the installed RabbitMQ server.

Ok. Lets move forward!

 

What is direct exchange?

In direct exchange, the messages are directly transmitted into the queue(s) who are having the same binding key as the routing key of the message. The binding key of the queue should exactly match the routing key of the message. All the queues that are bound to the direct exchange should have static binding keys (simply this means a constant text).  No wildcard binding keys are supported with direct exchange.

 

This is what we are going to do !

Screen Shot 2017-11-04 at 12.07.02 AM

 

Continue reading “Spring Boot and RabbitMQ Direct Exchange Example : Messaging Custom Java Objects and Consumes with a Listener”