RabbitMQ – Direct Exchange

8 years ago Lalit Bhagtani 0

Direct Exchange :- A direct exchange is an exchange which route messages to queues based on message routing key. The routing key is a message attribute in the message header added by the producer.Producer adds routing key in message header and sends it to direct exchange. After receiving a message, exchange try to match the routing key with the binding key of all the queues bound to it. If match is found, it route the message to the queue whose binding key is matched and if match is not found, it ignored the message.

RabbitMQ Direct Exchange

Example of Direct Exchange :-

First, we create an ExchangeType enum class, which will define all the exchange types.

public enum ExchangeType {
 
  DIRECT("direct"),
  TOPIC("topic"),
  FANOUT("fanout"),
  HEADER("headers");
 
  private final String exchangeName;

  ExchangeType(String exchangeName) {
    this.exchangeName = exchangeName;
  }
 
  public String getExchangeName() {
    return this.exchangeName;
  }

}

Second, we create a RabbitMQConnection class, which will instantiate ConnectionFactory object and set its properties like username, password, virtualhost, host, and port. Since, I am using local RabbitMQ setup, So I will only set host property  as “localhost“, other properties will be set to its default value by RabbitMQ system. This class will instantiate ConnectionFactory object and return it, to its calling method.

 
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class RabbitMQConnection {
 
  public static Connection getConnection(){
    Connection conn = null;
    try{
        ConnectionFactory factory = new ConnectionFactory();
        //factory.setUsername("");
        //factory.setPassword("");
        //factory.setVirtualHost("");
        factory.setHost("localhost");
        //factory.setPort(15672);
        conn = factory.newConnection();
    }catch(Exception e){
        e.printStackTrace();
    }
    return conn;
 }

}

Third, we create a DirectExchange class, which will declare one direct exchange and three different queues and bind it using different routing keys.

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.helper.ExchangeType;

public class DirectExchange {
 
 public static String EXCHANGE_NAME = "direct-exchange";
 public static String QUEUE_NAME_1 = "direct-queue-1";
 public static String QUEUE_NAME_2 = "direct-queue-2";
 public static String QUEUE_NAME_3 = "direct-queue-3";

 public static String ROUTING_KEY_1 = "direct-key-1";
 public static String ROUTING_KEY_2 = "direct-key-2";
 public static String ROUTING_KEY_3 = "direct-key-3";
 
 public void createExchangeAndQueue(){
   try{
       Connection conn = RabbitMQConnection.getConnection();
       if(conn != null){
         Channel channel = conn.createChannel();
         channel.exchangeDeclare(EXCHANGE_NAME, ExchangeType.DIRECT.getExchangeName(), true);
         // First Queue
         channel.queueDeclare(QUEUE_NAME_1, true, false, false, null);
         channel.queueBind(QUEUE_NAME_1, EXCHANGE_NAME, ROUTING_KEY_1);
 
         // Second Queue
         channel.queueDeclare(QUEUE_NAME_2, true, false, false, null);
         channel.queueBind(QUEUE_NAME_2, EXCHANGE_NAME, ROUTING_KEY_2);
 
         // Third Queue
         channel.queueDeclare(QUEUE_NAME_3, true, false, false, null);
         channel.queueBind(QUEUE_NAME_3, EXCHANGE_NAME, ROUTING_KEY_3);
 
         channel.close();
         conn.close();
       }
     }catch(Exception e){
       e.printStackTrace();
     }
  }
}

Fourth, we create a Producer class, which will publish a three messages to direct exchange.

  1. First Direct Message Example” with routing key as “direct-key-1“.
  2. Second Direct Message Example” with routing key as “direct-key-2“.
  3. Third Direct Message Example” with routing key as “direct-key-3“.
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Producer {
 
  private final static String MESSAGE_1 = "First Direct Message Example";
  private final static String MESSAGE_2 = "Second Direct Message Example";
  private final static String MESSAGE_3 = "Third Direct Message Example";
 
  public void publish(){
    try{
       Connection conn = RabbitMQConnection.getConnection();
       if(conn != null){
         Channel channel = conn.createChannel();
         // First message sent by using ROUTING_KEY_1
         channel.basicPublish(DirectExchange.EXCHANGE_NAME, DirectExchange.ROUTING_KEY_1, null, MESSAGE_1.getBytes());
         System.out.println(" Message Sent '" + MESSAGE_1 + "'");
 
         // Second message sent by using ROUTING_KEY_2
         channel.basicPublish(DirectExchange.EXCHANGE_NAME, DirectExchange.ROUTING_KEY_2, null, MESSAGE_2.getBytes());
         System.out.println(" Message Sent '" + MESSAGE_2 + "'");
 
         // Third message sent by using ROUTING_KEY_3
         channel.basicPublish(DirectExchange.EXCHANGE_NAME, DirectExchange.ROUTING_KEY_3, null, MESSAGE_3.getBytes());
         System.out.println(" Message Sent '" + MESSAGE_3 + "'");
         channel.close();
         conn.close();
       }
    }catch(Exception e){
       e.printStackTrace();
    }
  }
}

Fifth, we create a Receiver class, which will create three consumers.

  1. consumer1 will consume from “direct-queue-1” queue.
  2. consumer2 will consume from “direct-queue-2” queue.
  3. consumer3 will consume from “direct-queue-3” queue.
import java.io.IOException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class Receiver {
 
 public void receive(){
   try{
     Connection conn = RabbitMQConnection.getConnection();
     if(conn != null){
       Channel channel = conn.createChannel();
       // Consumer reading from queue 1
       Consumer consumer1 = new DefaultConsumer(channel) {
         @Override
         public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
           String message = new String(body, "UTF-8");
           System.out.println(" Message Received Queue 1 '" + message + "'");
         }
       };
       channel.basicConsume(DirectExchange.QUEUE_NAME_1, true, consumer1);
 
       // Consumer reading from queue 2 
       Consumer consumer2 = new DefaultConsumer(channel) {
         @Override
         public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
           String message = new String(body, "UTF-8");
           System.out.println(" Message Received Queue 2 '" + message + "'");
         }
       };
       channel.basicConsume(DirectExchange.QUEUE_NAME_2, true, consumer2);
 
       // Consumer reading from queue 3 
       Consumer consumer3 = new DefaultConsumer(channel) {
         @Override
         public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
           String message = new String(body, "UTF-8");
           System.out.println(" Message Received Queue 3 '" + message + "'");
         }
       };
       channel.basicConsume(DirectExchange.QUEUE_NAME_3, true, consumer3);
       channel.close();
       conn.close();
     }
   }catch(Exception e){
      e.printStackTrace();
   }
 }

}

Finally, we create a Demo class, which will instantiate Direct Exchange ( DirectExchange ) class and call its createExchangeAndQueue method to declare exchange and queues, then it will instantiate Producer class to call its publish method to send the message and finally it will instantiate Receiver class to call its receive method to consume the message.

public class Demo {
 
 public static void main(String args[]){
   try{
     DirectExchange ex = new DirectExchange();
     ex.createExchangeAndQueue();
 
     // Publish
     Producer produce = new Producer();
     produce.publish();
 
     // Consume
     Receiver receive = new Receiver();
     receive.receive();
   }catch(Exception e){
     e.printStackTrace();
   }
 }
}

Result :- 

RabbitMQ Direct Exchange

That’s all for RabbitMQ – Direct Exchange, for more exchange types visit Headers Exchange, Fanout Exchange, Topic Exchange.If you liked it, please share your thoughts in comments section and share it with others too.