RabbitMQ Introduction

  1. What Is RabbitMQ?



  2. Core Concepts in RabbitMQ



  3. RabbitMQ Exchange Types



  4. Message Flow in RabbitMQ



  5. RabbitMQ vs Kafka (High-Level)



  6. Installing and Running RabbitMQ



  7. Basic Python Producer and Consumer Example



  8. RabbitMQ Acknowledgements

  9. channel.basic_consume(queue='jobs', on_message_callback=callback, auto_ack=False)


  10. Durable Queues & Persistent Messages

  11. channel.queue_declare(queue='jobs', durable=True)
    channel.basic_publish(exchange='', routing_key='jobs',
                          body='hello',
                          properties=pika.BasicProperties(delivery_mode=2))



RabbitMQ — Hello World with the Java Client

  1. This chapter explains how to build a minimal Hello World message queue using:


  2. Prerequisites



  3. Project Structure



  4. Adding the RabbitMQ Java Client Dependency (Maven)



  5. Understanding the Connection to RabbitMQ



  6. Sender — Send.java

  7. package com.example;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.nio.charset.StandardCharsets;
    
    public class Send {
    
        private static final String QUEUE_NAME = "hello";
    
        public static void main(String[] args) throws Exception {
            // 1. Create a connection factory and configure it
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            // factory.setUsername("guest");
            // factory.setPassword("guest");
    
            // 2. Create connection and channel using try-with-resources
            try (Connection connection = factory.newConnection();
                 Channel channel = connection.createChannel()) {
    
                // 3. Declare the queue (idempotent — only created if it does not exist)
                channel.queueDeclare(
                        QUEUE_NAME,
                        false,  // durable
                        false,  // exclusive
                        false,  // autoDelete
                        null    // arguments
                );
    
                // 4. Message to send
                String message = "Hello World from Java!";
    
                // 5. Publish the message to the default exchange ("") with routing key = queue name
                channel.basicPublish(
                        "",
                        QUEUE_NAME,
                        null,
                        message.getBytes(StandardCharsets.UTF_8)
                );
    
                System.out.println(" [x] Sent '" + message + "'");
            } // END try
        }
    }


  8. Receiver — Recv.java

  9. package com.example;
    
    import com.rabbitmq.client.*;
    
    import java.nio.charset.StandardCharsets;
    
    public class Recv {
    
        private static final String QUEUE_NAME = "hello";
    
        public static void main(String[] args) throws Exception {
            // 1. Create connection factory
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            // factory.setUsername("guest");
            // factory.setPassword("guest");
    
            // 2. Create connection and channel
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
    
            // 3. Ensure the same queue exists
            channel.queueDeclare(
                    QUEUE_NAME,
                    false,
                    false,
                    false,
                    null
            );
    
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    
            // 4. Define how to process delivered messages (callback)
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(
                        delivery.getBody(),
                        StandardCharsets.UTF_8
                );
                System.out.println(" [x] Received '" + message + "'");
            };
    
            CancelCallback cancelCallback = consumerTag -> {
                System.out.println(" [!] Consumer was cancelled: " + consumerTag);
            };
    
            // 5. Start consuming messages (autoAck = true)
            channel.basicConsume(
                    QUEUE_NAME,
                    true,            // autoAck
                    deliverCallback,
                    cancelCallback
            );
        }
    }


  10. Building and Running the Example



  11. How the Visually Message Flow Works
  12. [Send.java] --basicPublish--> [""] default exchange --> [queue "hello"] --> [Recv.java]


  13. Common Troubleshooting Tips




RabbitMQ — Work Queues (Using the Java Client)

  1. What Are Work Queues?



  2. Project Structure
  3. rabbitmq-work-queues/
    ├─ pom.xml
    └─ src/
       └─ main/
          └─ java/
             └─ com/
                └─ example/
                   ├─ NewTask.java   <-- producer (sends tasks)
                   └─ Worker.java    <-- worker (receives tasks)
    


  4. Declaring a Durable Queue
  5. boolean durable = true;
    channel.queueDeclare("task_queue", durable, false, false, null);
    


  6. Producer (NewTask.java)



  7. Worker (Worker.java)

  8. package com.example;
    
    import com.rabbitmq.client.*;
    
    public class Worker {
    
        private static final String QUEUE_NAME = "task_queue";
    
        public static void main(String[] argv) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
    
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
    
            // Durable queue
            boolean durable = true;
            channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
    
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    
            // Fair dispatch: don't give new messages until worker is done
            channel.basicQos(1);
    
            // Define callback for messages
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
    
                try {
                    doWork(message);
                } finally {
                    System.out.println(" [x] Done");
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                }
            };
    
            // AutoAck = false → manual acknowledgements
            boolean autoAck = false;
    
            channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
        }
    
        private static void doWork(String task) {
            for (char ch : task.toCharArray()) {
                if (ch == '.') {
                    try {
                        Thread.sleep(1000); // one second per dot
                    } catch (InterruptedException ignored) {}
                }
            }
        }
    }
    


  9. Running Multiple Workers



  10. How Acknowledgements Work



  11. Message Durability



  12. Fair Dispatch with basicQos(1)




RabbitMQ — Publish/Subscribe (Using the Java Client)

  1. What Is Publish/Subscribe?



  2. Core Concepts: Exchange & Fanout



  3. Project Structure
  4. rabbitmq-pubsub-java/
    ├─ pom.xml
    └─ src/
       └─ main/
          └─ java/
             └─ com/
                └─ example/
                   ├─ EmitLog.java       <-- producer (publishes logs)
                   └─ ReceiveLogs.java   <-- consumer (subscribes to logs)
    
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.24.0</version>
    </dependency>
    


  5. Declaring the Fanout Exchange

  6. String EXCHANGE_NAME = "logs";
    channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
    



  7. Producer — EmitLog.java

  8. package com.example;
    
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.nio.charset.StandardCharsets;
    
    public class EmitLog {
    
        private static final String EXCHANGE_NAME = "logs";
    
        public static void main(String[] argv) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
    
            try (Connection connection = factory.newConnection();
                 Channel channel = connection.createChannel()) {
    
                // Declare fanout exchange
                channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
    
                // Build message from command-line arguments
                String message = String.join(" ", argv);
                if (message.isEmpty()) {
                    message = "info: Hello RabbitMQ Publish/Subscribe!";
                }
    
                // Publish to exchange (routing key is ignored by fanout)
                channel.basicPublish(
                        EXCHANGE_NAME,
                        "",
                        null,
                        message.getBytes(StandardCharsets.UTF_8)
                );
    
                System.out.println(" [x] Sent '" + message + "'");
            }
        }
    }
    


  9. Consumer — ReceiveLogs.java (Ephemeral Queues)

  10. package com.example;
    
    import com.rabbitmq.client.*;
    
    import java.nio.charset.StandardCharsets;
    
    public class ReceiveLogs {
    
        private static final String EXCHANGE_NAME = "logs";
    
        public static void main(String[] argv) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
    
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
    
            // Declare fanout exchange
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
    
            // Create a non-durable, exclusive, auto-delete queue with a generated name
            String queueName = channel.queueDeclare().getQueue();
    
            // Bind the queue to the exchange
            channel.queueBind(queueName, EXCHANGE_NAME, "");
    
            System.out.println(" [*] Waiting for logs in queue '" + queueName +
                               "'. To exit press CTRL+C");
    
            // Define callback
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(
                        delivery.getBody(),
                        StandardCharsets.UTF_8
                );
                System.out.println(" [x] Received '" + message + "'");
            };
    
            CancelCallback cancelCallback = consumerTag -> {
                System.out.println(" [!] Consumer cancelled: " + consumerTag);
            };
    
            // Start consuming (autoAck = true)
            boolean autoAck = true;
            channel.basicConsume(queueName, autoAck, deliverCallback, cancelCallback);
        }
    }
    


  11. Running Multiple Subscribers



  12. Temporary vs Persistent Subscribers



  13. How Publish/Subscribe Differs from Work Queues




RabbitMQ — Routing (Using the Java Client)

  1. What Is Routing with Direct Exchanges?



  2. Direct Exchange and Routing Keys



  3. Project Structure

  4. rabbitmq-routing-java/
    ├─ pom.xml
    └─ src/
       └─ main/
          └─ java/
             └─ com/
                └─ example/
                   ├─ EmitLogDirect.java
                   └─ ReceiveLogsDirect.java
    
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.24.0</version>
    </dependency>
    


  5. Declaring the Direct Exchange

  6. String EXCHANGE_NAME = "direct_logs";
    channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
    


  7. Producer — EmitLogDirect.java

  8. package com.example;
    
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.nio.charset.StandardCharsets;
    
    public class EmitLogDirect {
    
        private static final String EXCHANGE_NAME = "direct_logs";
    
        public static void main(String[] argv) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
    
            try (Connection connection = factory.newConnection();
                 Channel channel = connection.createChannel()) {
    
                // Declare direct exchange
                channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
    
                // Severity (routing key)
                String severity = argv.length > 0 ? argv[0] : "info";
    
                // Message text
                String message;
                if (argv.length > 1) {
                    StringBuilder builder = new StringBuilder();
                    for (int i = 1; i < argv.length; i++) {
                        if (i > 1) {
                            builder.append(" ");
                        }
                        builder.append(argv[i]);
                    }
                    message = builder.toString();
                } else {
                    message = "Hello from direct routing!";
                }
    
                // Publish with routing key = severity
                channel.basicPublish(
                        EXCHANGE_NAME,
                        severity,
                        null,
                        message.getBytes(StandardCharsets.UTF_8)
                );
    
                System.out.println(" [x] Sent '" + severity + "': '" + message + "'");
            }
        }
    }
    
    mvn -q exec:java \
        -Dexec.mainClass="com.example.EmitLogDirect" \
        -Dexec.args="info Just an info log"
    
    mvn -q exec:java \
        -Dexec.mainClass="com.example.EmitLogDirect" \
        -Dexec.args="error Something went wrong"
    


  9. Consumer — ReceiveLogsDirect.java

  10. package com.example;
    
    import com.rabbitmq.client.*;
    
    import java.nio.charset.StandardCharsets;
    
    public class ReceiveLogsDirect {
    
        private static final String EXCHANGE_NAME = "direct_logs";
    
        public static void main(String[] argv) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
    
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
    
            // Declare direct exchange
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
    
            // Create a fresh, exclusive, auto-delete queue
            String queueName = channel.queueDeclare().getQueue();
    
            if (argv.length < 1) {
                System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
                System.exit(1);
            }
    
            // Bind the queue for each severity given
            for (String severity : argv) {
                channel.queueBind(queueName, EXCHANGE_NAME, severity);
                System.out.println(" [*] Bound queue '" + queueName +
                                   "' with routing key '" + severity + "'");
            }
    
            System.out.println(" [*] Waiting for logs. To exit press CTRL+C");
    
            // Define callback
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String routingKey = delivery.getEnvelope().getRoutingKey();
                String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
                System.out.println(" [x] Received '" + routingKey + "': '" + message + "'");
            };
    
            CancelCallback cancelCallback = consumerTag -> {
                System.out.println(" [!] Consumer cancelled: " + consumerTag);
            };
    
            // Start consuming (autoAck = true is fine for simple log consumers)
            boolean autoAck = true;
            channel.basicConsume(queueName, autoAck, deliverCallback, cancelCallback);
        }
    }
    


  11. Running Example Scenarios



  12. Routing vs Fanout vs Work Queues



  13. Extending the Routing Pattern




RabbitMQ — Streams in Java


  1. RabbitMQ Streams are an append-only log data structure built into RabbitMQ


  2. Prerequisites & Starting RabbitMQ with Streams


  3. Project Layout

  4. rabbitmq-stream-java/
    ├─ pom.xml
    └─ src/
       └─ main/
          └─ java/
             └─ com/
                └─ example/
                   ├─ StreamProducer.java
                   └─ StreamConsumer.java
    


  5. Adding the RabbitMQ Stream Java Client Dependency

  6. <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
                                 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>com.example</groupId>
        <artifactId>rabbitmq-stream-java</artifactId>
        <version>1.0.0</version>
    
        <dependencies>
            <dependency>
                <groupId>com.rabbitmq</groupId>
                <artifactId>stream-client</artifactId>
                <version>0.16.0</version> <!-- example version -->
            </dependency>
        </dependencies>
    </project>
    mvn dependency:resolve


  7. Understanding the Stream Java Client Basics



  8. Producer — StreamProducer.java
  9. package com.example;
    
    import com.rabbitmq.stream.Environment;
    import com.rabbitmq.stream.Producer;
    import com.rabbitmq.stream.Message;
    import com.rabbitmq.stream.MessageBuilder;
    
    public class StreamProducer {
    
        private static final String STREAM_NAME = "hello-stream";
    
        public static void main(String[] args) throws Exception {
            // 1. Create environment (connects to stream port 5552 by default in the URI)
            try (Environment environment = Environment.builder()
                    .uri("rabbitmq-stream://localhost:5552")
                    .build()) {
    
                // 2. Create the stream if it does not exist
                environment.streamCreator()
                        .stream(STREAM_NAME)
                        .create();
    
                // 3. Create a producer for that stream
                Producer producer = environment.producerBuilder()
                        .stream(STREAM_NAME)
                        .build();
    
                // 4. Build a simple message
                MessageBuilder messageBuilder = producer.messageBuilder();
                Message message = messageBuilder
                        .addData("Hello Stream from Java!".getBytes())
                        .build();
    
                // 5. Send the message (asynchronously)
                producer.send(message);
    
                System.out.println(" [x] Sent 'Hello Stream from Java!' to stream '" + STREAM_NAME + "'");
    
                // 6. Close producer explicitly
                producer.close();
            }
        }
    }
    


  10. Consumer — StreamConsumer.java

  11. package com.example;
    
    import com.rabbitmq.stream.Environment;
    import com.rabbitmq.stream.Consumer;
    import com.rabbitmq.stream.OffsetSpecification;
    import com.rabbitmq.stream.MessageHandler;
    
    import java.nio.charset.StandardCharsets;
    
    public class StreamConsumer {
    
        private static final String STREAM_NAME = "hello-stream";
    
        public static void main(String[] args) throws Exception {
            Environment environment = Environment.builder()
                    .uri("rabbitmq-stream://localhost:5552")
                    .build();
    
            // Define how to handle messages
            MessageHandler handler = (context, message) -> {
                byte[] body = message.getBodyAsBinary();
                String text = new String(body, StandardCharsets.UTF_8);
                System.out.println(" [x] Received: " + text +
                                   " (offset: " + context.getOffset() + ")");
            };
    
            // Start consumer from the beginning of the stream
            Consumer consumer = environment.consumerBuilder()
                    .stream(STREAM_NAME)
                    .offset(OffsetSpecification.first())
                    .messageHandler(handler)
                    .build();
    
            System.out.println(" [*] Consuming from stream '" + STREAM_NAME +
                               "' starting at the first message. Press CTRL+C to exit.");
    
            // Keep process alive (simple way)
            Thread.currentThread().join();
    
            // (Unreachable in this simple example, but good practice for structured shutdown)
            // consumer.close();
            // environment.close();
        }
    }


  12. Building and Running the Example



  13. Streams vs Classic Queues