Spring AMQP / RabbitMQ Topic Exchange Example Part 2 – Consumer Application (Manually retrieving/pulling messages from queues)

This is the second part of the article that is focused on retrieving the RabbitMQ messages  manually (Simply pulling the messages from queues whenever the client application needs them).

Click here to view the first part of this article. 

As you can see, in the first part, we have developed a Producer application. In this part, we will be developing a Consumer application to retrieve messages from the queue manually (whenever we need them)

The full source code of this article series can be found at GitHub.

Click here to Download the Source Code. 

Open the source code directory called consumer-manual-pull. In there you can see the source codes related to this article.

 

Consumer Application

Here is the project structure.

Screen Shot 2017-11-12 at 8.13.40 PM.png

 

first make sure that you have defined the valid RabbitMQ server details in the application.properties file. These information will be used by the ConnectionFactory class to make a connection with RabbitMQ server when the RabbitTemplate is used.

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

 

This is the RabbitMQ configuration class and we have configured the RabbitTemplate here.

package com.springbootdev.examples.consumer.config;
import com.springbootdev.examples.consumer.model.Car;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitMqConfig {
@Bean
public MessageConverter messageConverter() {
Jackson2JsonMessageConverter jsonMessageConverter = new Jackson2JsonMessageConverter();
jsonMessageConverter.setClassMapper(classMapper());
return jsonMessageConverter;
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(messageConverter());
return template;
}
@Bean
public DefaultClassMapper classMapper() {
DefaultClassMapper classMapper = new DefaultClassMapper();
Map<String, Class<?>> idClassMapping = new HashMap<>();
idClassMapping.put("com.springbootdev.examples.producer.model.Car", Car.class);
classMapper.setIdClassMapping(idClassMapping);
return classMapper;
}
}

view raw
RabbitMqConfig.java
hosted with ❤ by GitHub

 

As you are already familiar, we have done set of configurations for RabbitTemplate including the message converters. The only strange code here is the  DefaultClassMapper bean.  In order to understand the reason behind this bean, please refer the message segement.

Screen Shot 2017-11-12 at 8.22.22 PM.png

In the MessageProperties, you can see that the __TypeId__  is set to following class declaration. This __TypeId__  information is added by the message converter defined in the Producer application when the POJO is transformed to the JSON format.

 com.springbootdev.examples.producer.model.Car 

When the producer convert the POJO message to the JSON, the type information is added as a header information of the Message Properties.  When the consumer receive the message, the message converter tries to convert the JSON message into the given type in the header.  You can see that the Consumer application has a POJO class called Car. But it is in a different package. (Not in the same package as the header information declared)  So we need to do the class mapping to inform message converter that where the relevant POJO for the  message type is located.

idClassMapping.put("com.springbootdev.examples.producer.model.Car", Car.class);

Then the message converter can convert the incoming message to the relevant POJO format.

 

Now lets look at our consumer service used for receiving messages.

package com.springbootdev.examples.consumer.service;
import com.springbootdev.examples.consumer.model.Car;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
@Service
public class ConsumerService {
@Autowired
private RabbitTemplate rabbitTemplate;
private static final Logger logger = LoggerFactory.getLogger(ConsumerService.class);
private static final String QUEUE_NAME = "all_cars_queue";
@Scheduled(fixedRate = 5000)
public void receive()
{
Object object = rabbitTemplate.receiveAndConvert(QUEUE_NAME);
if (object != null) {
Car car = (Car) object;
logger.info(" received the message [" + car.toString() + "] ");
}
}
}

view raw
ConsumerService.java
hosted with ❤ by GitHub

 

for the demonstration purpose, we will be receiving messages from “all_cars_queue“.  we are using spring scheduled method to retrieve the message from the queue for every 5 seconds.

receiveAndConvert method of the RabbitTemplate will be used to pull/retrieve messages from queue. based on the FIFO principal, we will get the messages from the queue.

 Object object = rabbitTemplate.receiveAndConvert(QUEUE_NAME);

You can run and see the output with following command. This application is developed to be run as a command line application.

mvn spring-boot:run

 

When you run the application You will see the following output in the terminal.

Screen Shot 2017-11-12 at 8.47.56 PM.png

No we have developed the Consumer application that is used to retrieve/pull messages manually.

If you are looking for a way to listen to the queue and retrieve messages automatically ( whenever the queue get a message), please refer the part 3 of this article series.

RabbitMQ Consumer Application (Part 3) – Listening for the message queues and automatically receiving the messages when they are delivered to queues.

 

2 thoughts on “Spring AMQP / RabbitMQ Topic Exchange Example Part 2 – Consumer Application (Manually retrieving/pulling messages from queues)

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s