RabbitMQ – Fanout Exchange

8 years ago Lalit Bhagtani 0

Fanout Exchange :- A fanout exchange is an exchange which routes the received message to all the queues bound to it. When the producer sends the message to fanout exchange, it copies the message and routes to all the queues that are bound to it. It just ignores the routing key or any pattern matching provided by the producer. This type of exchange is useful when the same message is needs to be stored in one or more queues.

RabbitMQ Fanout Exchange

Example :-

First, we create an ExchangeType enum class, which will define all the exchange types.

public enum ExchangeType {
 
  DIRECT("direct"),
  TOPIC("topic"),
  FANOUT("fanout"),
  HEADER("headers");
 
  private final String exchangeName;

  ExchangeType(String exchangeName) {
    this.exchangeName = exchangeName;
  }
 
  public String getExchangeName() {
    return this.exchangeName;
  }

}

Second, we create a RabbitMQConnection class, which will instantiate ConnectionFactory object and set its properties like username, password, virtualhost, host, and port. Since, I am using local RabbitMQ setup, So I will only set host property  as “localhost“, other properties will be set to its default value by RabbitMQ system. This class will instantiate ConnectionFactory object and return it, to its calling method.

 
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class RabbitMQConnection {
 
  public static Connection getConnection(){
    Connection conn = null;
    try{
        ConnectionFactory factory = new ConnectionFactory();
        //factory.setUsername("");
        //factory.setPassword("");
        //factory.setVirtualHost("");
        factory.setHost("localhost");
        //factory.setPort(15672);
        conn = factory.newConnection();
    }catch(Exception e){
        e.printStackTrace();
    }
    return conn;
 }

}

Third, we create a FanoutExchange class, which will declare one fanout exchange and three different queues and bind it to the fanout exchange using different routing keys.

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.helper.ExchangeType;

public class FanoutExchange {
 
  public static String EXCHANGE_NAME = "fanout-exchange";
  public static String QUEUE_NAME_1 = "fanout-queue-1";
  public static String QUEUE_NAME_2 = "fanout-queue-2";
  public static String QUEUE_NAME_3 = "fanout-queue-3";
  
  public static String ROUTING_KEY = "";
 
 public void createExchangeAndQueue(){
   try{
       Connection conn = RabbitMQConnection.getConnection();
       if(conn != null){
         Channel channel = conn.createChannel();
         channel.exchangeDeclare(EXCHANGE_NAME, ExchangeType.FANOUT.getExchangeName(), true);
         // First Queue
         channel.queueDeclare(QUEUE_NAME_1, true, false, false, null);
         channel.queueBind(QUEUE_NAME_1, EXCHANGE_NAME, ROUTING_KEY);
 
         // Second Queue
         channel.queueDeclare(QUEUE_NAME_2, true, false, false, null);
         channel.queueBind(QUEUE_NAME_2, EXCHANGE_NAME, ROUTING_KEY);
 
         // Third Queue
         channel.queueDeclare(QUEUE_NAME_3, true, false, false, null);
         channel.queueBind(QUEUE_NAME_3, EXCHANGE_NAME, ROUTING_KEY);
 
         channel.close();
         conn.close();
       }
     }catch(Exception e){
       e.printStackTrace();
     }
  }
}

Fourth, we create a Producer class, which will publish a message “Hello Fanout Example” to fanout exchange with routing key as empty string.

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Producer {
 
  private final static String MESSAGE = "Hello Fanout Example";
 
  public void publish(){
    try{
       Connection conn = RabbitMQConnection.getConnection();
       if(conn != null){
         Channel channel = conn.createChannel();
         channel.basicPublish(FanoutExchange.EXCHANGE_NAME, FanoutExchange.ROUTING_KEY, null, MESSAGE.getBytes());
         System.out.println(" Message Sent '" + MESSAGE + "'");
         channel.close();
         conn.close();
       }
    }catch(Exception e){
       e.printStackTrace();
    }
  }
}

Fifth, we create a Receiver class, which will create three consumers.

  1. consumer1 will consume from “fanout-queue-1” queue.
  2. consumer2 will consume from “fanout-queue-2” queue.
  3. consumer3 will consume from “fanout-queue-3” queue.
import java.io.IOException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class Receiver {
 
 public void receive(){
   try{
     Connection conn = RabbitMQConnection.getConnection();
     if(conn != null){
       Channel channel = conn.createChannel();
       // Consumer reading from queue 1
       Consumer consumer1 = new DefaultConsumer(channel) {
         @Override
         public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
           String message = new String(body, "UTF-8");
           System.out.println(" Message Received Queue 1 '" + message + "'");
         }
       };
       channel.basicConsume(FanoutExchange.QUEUE_NAME_1, true, consumer1);
 
       // Consumer reading from queue 2 
       Consumer consumer2 = new DefaultConsumer(channel) {
         @Override
         public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
           String message = new String(body, "UTF-8");
           System.out.println(" Message Received Queue 2 '" + message + "'");
         }
       };
       channel.basicConsume(FanoutExchange.QUEUE_NAME_2, true, consumer2);
 
       // Consumer reading from queue 3 
       Consumer consumer3 = new DefaultConsumer(channel) {
         @Override
         public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
           String message = new String(body, "UTF-8");
           System.out.println(" Message Received Queue 3 '" + message + "'");
         }
       };
       channel.basicConsume(FanoutExchange.QUEUE_NAME_3, true, consumer3);
       channel.close();
       conn.close();
     }
   }catch(Exception e){
      e.printStackTrace();
   }
 }

}

Finally, we create a Demo class, which will instantiate Fanout Exchange ( FanoutExchange ) class and call its createExchangeAndQueue method to declare exchange and queues, then it will instantiate Producer class to call its publish method to send the message and finally it will instantiate Receiver class to call its receive method to consume the message.

public class Demo {
 
 public static void main(String args[]){
   try{
     FanoutExchange ex = new FanoutExchange();
     ex.createExchangeAndQueue();
 
     // Publish
     Producer produce = new Producer();
     produce.publish();
 
     // Consume
     Receiver receive = new Receiver();
     receive.receive();
   }catch(Exception e){
     e.printStackTrace();
   }
 }
}

Result:-

RabbitMQ Fanout Exchange

That’s all for RabbitMQ – Fanout Exchange, for more exchange types visit Headers Exchange, Direct Exchange, Topic Exchange. If you liked it, please share your thoughts in comments section and share it with others too.