RabbitMQ – Headers Exchange

8 years ago Lalit Bhagtani 0

Headers Exchange :- A headers exchange is an exchange which route messages to queues based on message header values instead of routing key. Producer adds some values in a form of key-value pair in message header and sends it to headers exchange. After receiving a message, exchange try to match all or any (based on the value of “x-match”) header value with the binding value of all the queues bound to it. If match is found, it route the message to the queue whose binding value is matched and if match is not found, it ignored the message.

Producer can add one special value in the header of the message called as “x-match“, The “x-match” can have two different values, “any” or “all“, where “all” is the default value. “all” means all the header key-value pairs must match while “any” means at least one of the headers key-value pairs must match with the binding value of the queue. The value in header key-value pairs can be of any data type like integer, string or even hash.

RabbitMQ Headers Exchange

Example of Headers Exchange:-

First, we create an ExchangeType enum class, which will define all the exchange types.

public enum ExchangeType {
 
  DIRECT("direct"),
  TOPIC("topic"),
  FANOUT("fanout"),
  HEADER("headers");
 
  private final String exchangeName;

  ExchangeType(String exchangeName) {
    this.exchangeName = exchangeName;
  }
 
  public String getExchangeName() {
    return this.exchangeName;
  }

}

Second, we create a RabbitMQConnection class, which will instantiate ConnectionFactory object and set its properties like username, password, virtualhost, host, and port. Since, I am using local RabbitMQ setup, So I will only set host property  as “localhost“, other properties will be set to its default value by RabbitMQ system. This class will instantiate ConnectionFactory object and return it, to its calling method.

 
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class RabbitMQConnection {
 
  public static Connection getConnection(){
    Connection conn = null;
    try{
        ConnectionFactory factory = new ConnectionFactory();
        //factory.setUsername("");
        //factory.setPassword("");
        //factory.setVirtualHost("");
        factory.setHost("localhost");
        //factory.setPort(15672);
        conn = factory.newConnection();
    }catch(Exception e){
        e.printStackTrace();
    }
    return conn;
  }

}



Third, we create a HeaderExchange class, which will declare one headers exchange and three different queues and bind it using different header key-value pairs.

import java.util.HashMap;
import java.util.Map;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.helper.ExchangeType;

public class HeaderExchange {
 
 public static String EXCHANGE_NAME = "header-exchange";
 public static String QUEUE_NAME_1 = "header-queue-1";
 public static String QUEUE_NAME_2 = "header-queue-2";
 public static String QUEUE_NAME_3 = "header-queue-3";
 
 public static String ROUTING_KEY = "";
 
 public void createExchangeAndQueue(){
   Map<String,Object> map = null; 
   try{
      Connection conn = RabbitMQConnection.getConnection();
      if(conn != null){
        Channel channel = conn.createChannel(); 
 
        channel.exchangeDeclare(EXCHANGE_NAME, ExchangeType.HEADER.getExchangeName(), true);
        // First Queue 
        map = new HashMap<String,Object>();	
        map.put("x-match","any");
        map.put("First","A");
        map.put("Fourth","D");
        channel.queueDeclare(QUEUE_NAME_1, true, false, false, null);
        channel.queueBind(QUEUE_NAME_1, EXCHANGE_NAME, ROUTING_KEY ,map);
 
        // Second Queue 
        map = new HashMap<String,Object>();	
        map.put("x-match","any");
        map.put("Fourth","D");
        map.put("Third","C");
        channel.queueDeclare(QUEUE_NAME_2, true, false, false, null);
        channel.queueBind(QUEUE_NAME_2, EXCHANGE_NAME, ROUTING_KEY ,map);
 
        // Third Queue 
        map = new HashMap<String,Object>();	
        map.put("x-match","all");
        map.put("First","A");
        map.put("Third","C"); 
        channel.queueDeclare(QUEUE_NAME_3, true, false, false, null);
        channel.queueBind(QUEUE_NAME_3, EXCHANGE_NAME, ROUTING_KEY ,map);
 
        channel.close();
        conn.close();
      }
   }catch(Exception e){
        e.printStackTrace();
   }
 }

}

Fourth, we create a Producer class, which will publish a three messages to headers exchange.

  1. First Header Message Example” with header values as “First, A“, “Fourth, D“.
  2. Second Header Message Example” with header values as “Third, C“.
  3. Third Header Message Example” with header values as “First, A“, “Third, C“.
import java.util.HashMap;
import java.util.Map;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Producer {
 
 private final static String MESSAGE_1 = "First Header Message Example";
 private final static String MESSAGE_2 = "Second Header Message Example";
 private final static String MESSAGE_3 = "Third Header Message Example";
 
 public void publish(){
   Map<String,Object> map = null;
   BasicProperties props = null; 
   try{
       Connection conn = RabbitMQConnection.getConnection();
       if(conn != null){
         Channel channel = conn.createChannel();
 
         // First message 
         props = new BasicProperties();
         map = new HashMap<String,Object>(); 
         map.put("First","A");
         map.put("Fourth","D");
         props = props.builder().headers(map).build();
         channel.basicPublish(HeaderExchange.EXCHANGE_NAME, HeaderExchange.ROUTING_KEY, props, MESSAGE_1.getBytes());
         System.out.println(" Message Sent '" + MESSAGE_1 + "'");
 
         // Second message 
         props = new BasicProperties();
         map = new HashMap<String,Object>(); 
         map.put("Third","C");
         props = props.builder().headers(map).build();
         channel.basicPublish(HeaderExchange.EXCHANGE_NAME, HeaderExchange.ROUTING_KEY, props, MESSAGE_2.getBytes());
         System.out.println(" Message Sent '" + MESSAGE_2 + "'");
 
         // Third message 
         map = new HashMap<String,Object>();
         props = new BasicProperties();
         map.put("First","A");
         map.put("Third","C");
         props = props.builder().headers(map).build();
         channel.basicPublish(HeaderExchange.EXCHANGE_NAME, HeaderExchange.ROUTING_KEY, props, MESSAGE_3.getBytes());
         System.out.println(" Message Sent '" + MESSAGE_3 + "'");
 
         channel.close();
         conn.close();
       }
   }catch(Exception e){
       e.printStackTrace();
   }
 }
}



Fifth, we create a Receiver class, which will create three consumers.

  1. consumer1 will consume from “header-queue-1” queue.
  2. consumer2 will consume from “header-queue-2” queue.
  3. consumer3 will consume from “header-queue-3” queue.
import java.io.IOException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class Receiver {
 
 public void receive(){
   try{
       Connection conn = RabbitMQConnection.getConnection();
       if(conn != null){
         Channel channel = conn.createChannel();
         // Consumer reading from queue 1
         Consumer consumer1 = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
              String message = new String(body, "UTF-8");
              System.out.println(" Message Received Queue 1 '" + message + "'");
            }
         };
         channel.basicConsume(HeaderExchange.QUEUE_NAME_1, true, consumer1);
 
         // Consumer reading from queue 2 
         Consumer consumer2 = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
              String message = new String(body, "UTF-8");
              System.out.println(" Message Received Queue 2 '" + message + "'");
            }
         };
         channel.basicConsume(HeaderExchange.QUEUE_NAME_2, true, consumer2);
 
         // Consumer reading from queue 3 
         Consumer consumer3 = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
              String message = new String(body, "UTF-8");
              System.out.println(" Message Received Queue 3 '" + message + "'");
            }
         };
         channel.basicConsume(HeaderExchange.QUEUE_NAME_3, true, consumer3);
         channel.close();
         conn.close();
       }
   }catch(Exception e){
      e.printStackTrace();
   }
 }

}

Finally, we create a Demo class, which will instantiate Headers Exchange (HeaderExchange) class and call its createExchangeAndQueue method to declare exchange and queues, then it will instantiate Producer class to call its publish method to send the message and finally it will instantiate Receiver class to call its receive method to consume the message.

public class Demo {
 
 public static void main(String args[]){
   try{
      HeaderExchange ex = new HeaderExchange();
      ex.createExchangeAndQueue();
 
      // Publish
      Producer produce = new Producer();
      produce.publish();
 
      // Consume
      Receiver receive = new Receiver();
      receive.receive();
   }catch(Exception e){
     e.printStackTrace();
   }
 }

}

Result :- 

RabbitMQ Headers Exchange

That’s all for RabbitMQ – Headers Exchange, for more exchange types visit Fanout Exchange, Direct Exchange, Topic Exchange.If you liked it, please share your thoughts in comments section and share it with others too.