↑
Documentation Index
Introduction
Hello World (Using the Java Client)
Work Queues (Using the Java Client)
Publish / Subscribe (Using the Java Client)
Routing (Using the Java Client)
Streams (Using the Java Client)
RabbitMQ Introduction
What Is RabbitMQ?
RabbitMQ is an open-source message broker that allows different parts of an application (or different applications) to communicate asynchronously.
It implements the AMQP protocol (Advanced Message Queuing Protocol), which defines:
how messages are sent
how messages are routed
how queues and exchanges behave
It ensures reliable message delivery even if consumers or producers crash.
Core Concepts in RabbitMQ
A producer is any program that sends messages to RabbitMQ.
channel.basic_publish(exchange='', routing_key='jobs', body='hello')
A receiver receives and processes messages.
channel.basic_consume(queue='jobs', on_message_callback=callback)
A queue is a buffer where messages wait until processed.
An exchange receives messages and routes them to queues (Different routing strategies = different exchange types).
A binding is a rule connecting a queue to an exchange.
A routing key is a message label used by the exchange to decide where messages go.
A connection is a TCP connection to RabbitMQ server.
A channel is a virtual session used to perform messaging operations.
RabbitMQ Exchange Types
Exchange Type
Description
Routing Behavior
direct
Simple mapping using routing key
Only queues with an exact-match routing key receive messages
fanout
Broadcasting
All queues bound to the exchange receive the message
topic
Pattern-based routing
Routing key supports wildcards (* and #)
headers
Routing based on message headers
Flexible but slower
Message Flow in RabbitMQ
A message travels through 3 main stages:
Producer → Exchange
Exchange → Queue
Queue → Consumer
This allows full decoupling between sender and receiver.
RabbitMQ vs Kafka (High-Level)
RabbitMQ
Kafka
Message broker
Distributed log/event streaming
Best for task processing
Best for high-throughput event streams
Point-to-point or pub/sub
Pub/sub only
Lightweight; low latency
High throughput, scalable
Installing and Running RabbitMQ
Local installation (OpenSUSE example):
sudo zypper install rabbitmq-server
sudo systemctl start rabbitmq-server
Or run via Docker:
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
Management UI available at:
http://localhost:15672
Basic Python Producer and Consumer Example
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='', routing_key='hello', body='Hello RabbitMQ!')
connection.close()
Consumer (receive message)
import pika
def callback(ch, method, properties, body):
print("Received:", body.decode())
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
print("Waiting for messages...")
channel.start_consuming()
RabbitMQ automatically delivers messages to consumers.
RabbitMQ Acknowledgements
Consumers send ACKs after processing messages.
If a consumer crashes before an ACK, the message is requeued.
channel.basic_consume(queue='jobs', on_message_callback=callback, auto_ack=False)
Durable Queues & Persistent Messages
Ensure messages survive broker restarts:
channel.queue_declare(queue='jobs', durable=True)
channel.basic_publish(exchange='', routing_key='jobs',
body='hello',
properties=pika.BasicProperties(delivery_mode=2))
delivery_mode=2 makes the message persistent.
RabbitMQ — Hello World with the Java Client
This chapter explains how to build a minimal Hello World message queue using:
A RabbitMQ Server running locally (default: localhost:5672)
A Java Client using the official RabbitMQ Java library
Two small programs:
Sender sends "Hello World" messages to a queue
Receiver receives and prints those messages
Prerequisites
Project Structure
We will create a very small Maven project with two Java classes.
Example structure:
rabbitmq-hello-world-java/
├─ pom.xml
└─ src/
└─ main/
└─ java/
└─ com/
└─ example/
├─ Send.java <-- sender (producer)
└─ Recv.java <-- receiver (consumer)
You can adjust the package name as you like, just keep sender and receiver separate.
Adding the RabbitMQ Java Client Dependency (Maven)
In your pom.xml add the RabbitMQ Java client dependency:
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>rabbitmq-hello-world-java</artifactId>
<version>1.0.0</version>
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.24.0</version> <!-- Example version -->
</dependency>
</dependencies>
</project>
After editing pom.xml, download dependencies:
mvn dependency:resolve
Understanding the Connection to RabbitMQ
Both sender and receiver will:
Create a ConnectionFactory
Set the host (and optionally username/password)
Open a Connection
Create a Channel from that connection
The queue name will be something simple, e.g.:
QUEUE_NAME = "hello";
Both sender and receiver must declare the same queue so that:
Sender publishes messages to that queue.
Receiver consumes messages from that queue.
Sender — Send.java
Create the file src/main/java/com/example/Send.java with the following content:
package com.example;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;
public class Send {
private static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
// 1. Create a connection factory and configure it
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
// factory.setUsername("guest");
// factory.setPassword("guest");
// 2. Create connection and channel using try-with-resources
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 3. Declare the queue (idempotent — only created if it does not exist)
channel.queueDeclare(
QUEUE_NAME,
false, // durable
false, // exclusive
false, // autoDelete
null // arguments
);
// 4. Message to send
String message = "Hello World from Java!";
// 5. Publish the message to the default exchange ("") with routing key = queue name
channel.basicPublish(
"",
QUEUE_NAME,
null,
message.getBytes(StandardCharsets.UTF_8)
);
System.out.println(" [x] Sent '" + message + "'");
} // END try
}
}
queueDeclare is safe to call multiple times, it just ensures the queue exists.
basicPublish("", QUEUE_NAME, ...) uses the default exchange with routing key equal to the queue name.
try (Connection ...) auto-closes the connection and channel.
Receiver — Recv.java
Create the file src/main/java/com/example/Recv.java with the following content:
package com.example;
import com.rabbitmq.client.*;
import java.nio.charset.StandardCharsets;
public class Recv {
private static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
// 1. Create connection factory
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
// factory.setUsername("guest");
// factory.setPassword("guest");
// 2. Create connection and channel
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 3. Ensure the same queue exists
channel.queueDeclare(
QUEUE_NAME,
false,
false,
false,
null
);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
// 4. Define how to process delivered messages (callback)
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(
delivery.getBody(),
StandardCharsets.UTF_8
);
System.out.println(" [x] Received '" + message + "'");
};
CancelCallback cancelCallback = consumerTag -> {
System.out.println(" [!] Consumer was cancelled: " + consumerTag);
};
// 5. Start consuming messages (autoAck = true)
channel.basicConsume(
QUEUE_NAME,
true, // autoAck
deliverCallback,
cancelCallback
);
}
}
The receiver also calls queueDeclare to ensure the queue exists.
basicConsume registers an asynchronous consumer, the program keeps running.
autoAck = true means messages are automatically acknowledged as soon as they are delivered.
Building and Running the Example
First, build the project with Maven:
cd rabbitmq-hello-world-java
mvn package
Run the receiver (consumer) in one terminal:
# From the project root
mvn -q exec:java -Dexec.mainClass="com.example.Recv"
In another terminal, run the sender (producer):
mvn -q exec:java -Dexec.mainClass="com.example.Send"
Expected output:
[x] Sent 'Hello World from Java!'
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'Hello World from Java!'
How the Visually Message Flow Works
[Send.java] --basicPublish--> [""] default exchange --> [queue "hello"] --> [Recv.java]
Common Troubleshooting Tips
Problem: java.net.ConnectException: Connection refused
Check if RabbitMQ is running.
Verify host and port: localhost:5672.
Problem: Authentication failure
Ensure username/password guest/guest are correct.
Note: by default, guest/guest might be restricted to localhost.
For remote hosts, create a dedicated user in RabbitMQ if necessary.
Problem: Receiver doesn't print anything
Ensure the receiver is running before you send messages (for this simple example).
Check that both sender and receiver use the same queue name and same virtual host .
RabbitMQ — Work Queues (Using the Java Client)
What Are Work Queues?
A Work Queue (also called Task Queue) is a RabbitMQ pattern where:
multiple worker processes share a queue
each worker gets some messages
tasks are distributed across workers
Use cases:
background job processing
CPU-heavy or IO-heavy tasks
distributing long-running tasks across multiple workers
We will build three components:
NewTask.java is producer that sends tasks
Worker.java is worker that receives tasks
Multiple Worker instances to demonstrate load balancing
Project Structure
rabbitmq-work-queues/
├─ pom.xml
└─ src/
└─ main/
└─ java/
└─ com/
└─ example/
├─ NewTask.java <-- producer (sends tasks)
└─ Worker.java <-- worker (receives tasks)
Declaring a Durable Queue
Both producer and workers must declare the same durable queue:
boolean durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);
This means:
Queue survives a RabbitMQ restart
Task messages stored on disk (when we publish persistent messages)
Producer (NewTask.java)
Create NewTask.java to send a message representing a task.
package com.example;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
public class NewTask {
private static final String QUEUE_NAME = "task_queue";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// Durable queue
boolean durable = true;
channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
// Create message from args
String message = String.join(" ", argv);
if (message.isEmpty()) {
message = "Hello..."; // default message
}
// Publish persistent message
channel.basicPublish(
"",
QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN, // marks message as durable
message.getBytes("UTF-8")
);
System.out.println(" [x] Sent '" + message + "'");
}
}
}
Example usage:
mvn -q exec:java -Dexec.mainClass="com.example.NewTask" -Dexec.args="Task....."
In the tutorials, the number of dots (.) represents seconds of “work”.
Worker (Worker.java)
receive tasks
simulate work based on . count
manually acknowledge after finishing
package com.example;
import com.rabbitmq.client.*;
public class Worker {
private static final String QUEUE_NAME = "task_queue";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// Durable queue
boolean durable = true;
channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
// Fair dispatch: don't give new messages until worker is done
channel.basicQos(1);
// Define callback for messages
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
doWork(message);
} finally {
System.out.println(" [x] Done");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
// AutoAck = false → manual acknowledgements
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
}
private static void doWork(String task) {
for (char ch : task.toCharArray()) {
if (ch == '.') {
try {
Thread.sleep(1000); // one second per dot
} catch (InterruptedException ignored) {}
}
}
}
}
basicAck tells RabbitMQ the worker finished the task.
If a worker dies, RabbitMQ re-queues the message to another worker.
Fair dispatch using basicQos(1) ensures slower workers get fewer tasks.
Running Multiple Workers
Open two (or more) terminals and run:
# Terminal 1
mvn -q exec:java -Dexec.mainClass="com.example.Worker"
# Terminal 2
mvn -q exec:java -Dexec.mainClass="com.example.Worker"
Now send tasks:
mvn -q exec:java -Dexec.mainClass="com.example.NewTask" -Dexec.args="Hello..."
mvn -q exec:java -Dexec.mainClass="com.example.NewTask" -Dexec.args="Another....."
mvn -q exec:java -Dexec.mainClass="com.example.NewTask" -Dexec.args="Quick."
You will see messages distributed like:
# Worker 1:
[x] Received 'Hello...'
[x] Done
[x] Received 'Quick.'
[x] Done
# Worker 2:
[x] Received 'Another.....'
[x] Done
RabbitMQ dispatches them one by one to each worker.
How Acknowledgements Work
Workers manually acknowledge messages:
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
If a worker:
crashes
disconnects
is killed
RabbitMQ will requeue the task and deliver it to another worker.
This guarantees no message is lost.
Message Durability
Two things make messages survive restarts:
1. Durable queue
channel.queueDeclare("task_queue", true, false, false, null);
2. Persistent messages
MessageProperties.PERSISTENT_TEXT_PLAIN
Even then, RabbitMQ does not guarantee instant disk flush, but in practice persistence is reliable.
Fair Dispatch with basicQos(1)
Without QoS, RabbitMQ sends messages in round-robin style, regardless of worker speed.
With: channel.basicQos(1); RabbitMQ will:
send only one unacknowledged message at a time
wait for basicAck before giving that worker more tasks
This prevents slow workers from being overloaded.
RabbitMQ — Publish/Subscribe (Using the Java Client)
What Is Publish/Subscribe?
Publish/Subscribe (pub/sub) is a messaging pattern where:
Producers publish messages to an exchange
Multiple queues are bound to that exchange
Each consumer receives a copy of each message
Typical use cases:
Broadcasting logs to multiple receivers (e.g. console + file + monitoring)
Sending notifications to multiple services
Fan-out events to several independent consumers
Core Concepts: Exchange & Fanout
Previously, we published directly to a queue (using the default exchange "").
In pub/sub:
We publish to a named exchange (e.g. logs)
The exchange forwards messages to all bound queues
Fanout exchange :
Ignores routing keys
Broadcasts messages to every queue that is bound to it
Producer --> [fanout exchange "logs"] --> [queue A] --> Consumer 1
[queue B] --> Consumer 2
[queue C] --> Consumer 3
Each consumer gets its own copy of the message.
Project Structure
rabbitmq-pubsub-java/
├─ pom.xml
└─ src/
└─ main/
└─ java/
└─ com/
└─ example/
├─ EmitLog.java <-- producer (publishes logs)
└─ ReceiveLogs.java <-- consumer (subscribes to logs)
We reuse the same RabbitMQ Java client dependency as before (in pom.xml):
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.24.0</version>
</dependency>
Declaring the Fanout Exchange
Both producer and consumers will use the same exchange:
String EXCHANGE_NAME = "logs";
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
exchangeDeclare is idempotent (safe to call multiple times) and creates the exchange if it does not exist
BuiltinExchangeType.FANOUT means broadcast to all bound queues.
Producer — EmitLog.java
EmitLog sends a log message to the logs exchange.
package com.example;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;
public class EmitLog {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// Declare fanout exchange
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
// Build message from command-line arguments
String message = String.join(" ", argv);
if (message.isEmpty()) {
message = "info: Hello RabbitMQ Publish/Subscribe!";
}
// Publish to exchange (routing key is ignored by fanout)
channel.basicPublish(
EXCHANGE_NAME,
"",
null,
message.getBytes(StandardCharsets.UTF_8)
);
System.out.println(" [x] Sent '" + message + "'");
}
}
}
We publish to exchange "logs", not to a specific queue.
Routing key is empty ("") because fanout ignores it anyway.
Consumer — ReceiveLogs.java (Ephemeral Queues)
Each consumer will:
create its own exclusive, auto-delete queue
bind that queue to the logs exchange
receive all messages published to the exchange
package com.example;
import com.rabbitmq.client.*;
import java.nio.charset.StandardCharsets;
public class ReceiveLogs {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// Declare fanout exchange
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
// Create a non-durable, exclusive, auto-delete queue with a generated name
String queueName = channel.queueDeclare().getQueue();
// Bind the queue to the exchange
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println(" [*] Waiting for logs in queue '" + queueName +
"'. To exit press CTRL+C");
// Define callback
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(
delivery.getBody(),
StandardCharsets.UTF_8
);
System.out.println(" [x] Received '" + message + "'");
};
CancelCallback cancelCallback = consumerTag -> {
System.out.println(" [!] Consumer cancelled: " + consumerTag);
};
// Start consuming (autoAck = true)
boolean autoAck = true;
channel.basicConsume(queueName, autoAck, deliverCallback, cancelCallback);
}
}
queueDeclare() with no arguments creates a fresh, server-named queue.
It is:
non-durable
exclusive (only this connection can use it)
auto-delete (deleted when connection closes)
Each ReceiveLogs instance gets its own queue bound to "logs".
Running Multiple Subscribers
Start RabbitMQ server, then open three terminals.
Terminal 1 — first subscriber:
mvn -q exec:java -Dexec.mainClass="com.example.ReceiveLogs"
Terminal 2 — second subscriber:
mvn -q exec:java -Dexec.mainClass="com.example.ReceiveLogs"
Terminal 3 — producer:
mvn -q exec:java -Dexec.mainClass="com.example.EmitLog" \
-Dexec.args="info: Hello from pub/sub!"
Expected result: Each ReceiveLogs terminal prints the same message.
# Terminal 1:
[*] Waiting for logs in queue 'amq.gen-...'
[x] Received 'info: Hello from pub/sub!'
# Terminal 2:
[*] Waiting for logs in queue 'amq.gen-...'
[x] Received 'info: Hello from pub/sub!'
This demonstrates that pub/sub delivers a copy of each message to every active subscriber .
Temporary vs Persistent Subscribers
The basic tutorial uses temporary queues :
Queue disappears when the consumer disconnects
Messages are not kept for future subscribers
Good for real-time log viewing
For persistent log storage you could:
Declare a named, durable queue (e.g. "logs_file")
Bind it to the exchange
Run a consumer that writes all messages to a file or DB
// Example: named durable log queue
channel.queueDeclare("logs_file", true, false, false, null);
channel.queueBind("logs_file", EXCHANGE_NAME, "");
Then, even if no subscribers are currently running, messages can be stored in that durable queue.
How Publish/Subscribe Differs from Work Queues
Work Queues :
One queue, many workers
Each message goes to only one worker
Goal: distribute load
Publish/Subscribe :
One exchange, many queues
Each message goes to all queues (and thus all subscribers)
Goal: broadcast the same message to multiple consumers
RabbitMQ — Routing (Using the Java Client)
What Is Routing with Direct Exchanges?
In the Publish/Subscribe (fanout) pattern all bound queues receive every message.
With Routing using a direct exchange :
Messages have a routing key (e.g. info, warning, error).
Each queue is bound to the exchange with specific routing keys.
Only queues whose binding key matches the routing key receive the message.
We will implement:
EmitLogDirect.java — producer that publishes logs with a severity key.
ReceiveLogsDirect.java — consumer that subscribes to specific severities.
Direct Exchange and Routing Keys
Project Structure
Example Maven project layout:
rabbitmq-routing-java/
├─ pom.xml
└─ src/
└─ main/
└─ java/
└─ com/
└─ example/
├─ EmitLogDirect.java
└─ ReceiveLogsDirect.java
pom.xml needs the RabbitMQ client dependency (same as earlier chapters):
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.24.0</version>
</dependency>
Declaring the Direct Exchange
We choose an exchange name, e.g. direct_logs, and type direct :
String EXCHANGE_NAME = "direct_logs";
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
Both the producer and all consumers must declare the same exchange, this call is idempotent.
Producer — EmitLogDirect.java
The producer:
Reads severity (routing key) from the command line.
Reads the log message text.
Publishes the message to the direct_logs exchange using that severity as routing key.
Create EmitLogDirect.java:
package com.example;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;
public class EmitLogDirect {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// Declare direct exchange
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// Severity (routing key)
String severity = argv.length > 0 ? argv[0] : "info";
// Message text
String message;
if (argv.length > 1) {
StringBuilder builder = new StringBuilder();
for (int i = 1; i < argv.length; i++) {
if (i > 1) {
builder.append(" ");
}
builder.append(argv[i]);
}
message = builder.toString();
} else {
message = "Hello from direct routing!";
}
// Publish with routing key = severity
channel.basicPublish(
EXCHANGE_NAME,
severity,
null,
message.getBytes(StandardCharsets.UTF_8)
);
System.out.println(" [x] Sent '" + severity + "': '" + message + "'");
}
}
}
mvn -q exec:java \
-Dexec.mainClass="com.example.EmitLogDirect" \
-Dexec.args="info Just an info log"
mvn -q exec:java \
-Dexec.mainClass="com.example.EmitLogDirect" \
-Dexec.args="error Something went wrong"
Consumer — ReceiveLogsDirect.java
The consumer:
Declares the same direct exchange.
Creates a new, exclusive, auto-delete queue.
Binds that queue to the exchange using one or more severities given on the command line.
Prints all messages for the severities it cares about.
Create ReceiveLogsDirect.java:
package com.example;
import com.rabbitmq.client.*;
import java.nio.charset.StandardCharsets;
public class ReceiveLogsDirect {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// Declare direct exchange
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// Create a fresh, exclusive, auto-delete queue
String queueName = channel.queueDeclare().getQueue();
if (argv.length < 1) {
System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
System.exit(1);
}
// Bind the queue for each severity given
for (String severity : argv) {
channel.queueBind(queueName, EXCHANGE_NAME, severity);
System.out.println(" [*] Bound queue '" + queueName +
"' with routing key '" + severity + "'");
}
System.out.println(" [*] Waiting for logs. To exit press CTRL+C");
// Define callback
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String routingKey = delivery.getEnvelope().getRoutingKey();
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(" [x] Received '" + routingKey + "': '" + message + "'");
};
CancelCallback cancelCallback = consumerTag -> {
System.out.println(" [!] Consumer cancelled: " + consumerTag);
};
// Start consuming (autoAck = true is fine for simple log consumers)
boolean autoAck = true;
channel.basicConsume(queueName, autoAck, deliverCallback, cancelCallback);
}
}
Each consumer can choose its own subset of severities.
Each consumer creates its own queue and binds it with the keys it cares about.
Running Example Scenarios
Start RabbitMQ server if not running.
Terminal 1 — consumer for error only:
mvn -q exec:java \
-Dexec.mainClass="com.example.ReceiveLogsDirect" \
-Dexec.args="error"
Terminal 2 — consumer for info and warning:
mvn -q exec:java \
-Dexec.mainClass="com.example.ReceiveLogsDirect" \
-Dexec.args="info warning"
Terminal 3 — send several logs:
# Info log
mvn -q exec:java \
-Dexec.mainClass="com.example.EmitLogDirect" \
-Dexec.args="info Just some info"
# Warning log
mvn -q exec:java \
-Dexec.mainClass="com.example.EmitLogDirect" \
-Dexec.args="warning Disk almost full"
# Error log
mvn -q exec:java \
-Dexec.mainClass="com.example.EmitLogDirect" \
-Dexec.args="error System failure!"
Expected distribution:
# Terminal 1 (error only):
[*] Bound queue 'amq.gen-...' with routing key 'error'
[*] Waiting for logs. To exit press CTRL+C
[x] Received 'error': 'System failure!'
# Terminal 2 (info + warning):
[*] Bound queue 'amq.gen-...' with routing key 'info'
[*] Bound queue 'amq.gen-...' with routing key 'warning'
[*] Waiting for logs. To exit press CTRL+C
[x] Received 'info': 'Just some info'
[x] Received 'warning': 'Disk almost full'
Note that error logs are not seen by the info/warning consumer, and vice versa.
Routing vs Fanout vs Work Queues
Fanout (Publish/Subscribe):
Broadcast to all bound queues.
Ignores routing keys.
Direct (Routing):
Uses routing keys for exact matching.
Each queue chooses which messages it wants based on binding keys.
Work Queues (with default exchange or direct):
One queue, multiple workers.
Each message is processed by only one worker.
Focus: load distribution, not selective subscription.
Extending the Routing Pattern
RabbitMQ — Streams in Java
RabbitMQ Streams are an append-only log data structure built into RabbitMQ
Messages are stored in a log-like stream and identified by offset.
Designed for high throughput and long message retention.
Consumers can replay messages from any offset (not only "latest").
Prerequisites & Starting RabbitMQ with Streams
Example: start RabbitMQ with Streams via Docker (exposes port 5552 for the stream protocol):
docker run -it --rm --name rabbitmq-stream \
-p 5672:5672 -p 15672:15672 -p 5552:5552 \
-e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS='-rabbitmq_stream advertised_host localhost' \
rabbitmq:3.13
Enable the rabbitmq_stream plugin (if not already):
docker exec rabbitmq-stream rabbitmq-plugins enable rabbitmq_stream
Management UI (optional) is available at http://localhost:15672/ (default user: guest, password: guest)
Project Layout
Example Maven project structure:
rabbitmq-stream-java/
├─ pom.xml
└─ src/
└─ main/
└─ java/
└─ com/
└─ example/
├─ StreamProducer.java
└─ StreamConsumer.java
You can adjust the package name as you like, keep producer and consumer separate for clarity.
Adding the RabbitMQ Stream Java Client Dependency
In pom.xml add the Stream client dependency (artifact name may evolve, check Maven Central if needed):
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>rabbitmq-stream-java</artifactId>
<version>1.0.0</version>
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>stream-client</artifactId>
<version>0.16.0</version> <!-- example version -->
</dependency>
</dependencies>
</project>
mvn dependency:resolve
Understanding the Stream Java Client Basics
The main entry point is Environment:
Handles the connection to the Streams node (port 5552).
Creates streams, producers, and consumers.
The basic flow:
Create an Environment with the stream URI (rabbitmq-stream://...).
Create a stream (if it does not exist).
Create a Producer bound to that stream.
Create a Consumer bound to that stream.
Producer — StreamProducer.java
StreamProducer:
Connects to RabbitMQ Streams.
Creates a stream named hello-stream.
Sends one "Hello Stream" message.
Create StreamProducer.java:
package com.example;
import com.rabbitmq.stream.Environment;
import com.rabbitmq.stream.Producer;
import com.rabbitmq.stream.Message;
import com.rabbitmq.stream.MessageBuilder;
public class StreamProducer {
private static final String STREAM_NAME = "hello-stream";
public static void main(String[] args) throws Exception {
// 1. Create environment (connects to stream port 5552 by default in the URI)
try (Environment environment = Environment.builder()
.uri("rabbitmq-stream://localhost:5552")
.build()) {
// 2. Create the stream if it does not exist
environment.streamCreator()
.stream(STREAM_NAME)
.create();
// 3. Create a producer for that stream
Producer producer = environment.producerBuilder()
.stream(STREAM_NAME)
.build();
// 4. Build a simple message
MessageBuilder messageBuilder = producer.messageBuilder();
Message message = messageBuilder
.addData("Hello Stream from Java!".getBytes())
.build();
// 5. Send the message (asynchronously)
producer.send(message);
System.out.println(" [x] Sent 'Hello Stream from Java!' to stream '" + STREAM_NAME + "'");
// 6. Close producer explicitly
producer.close();
}
}
}
The URI uses the stream protocol : rabbitmq-stream://host:5552, not AMQP amqp://.
streamCreator() is idempotent: it creates the stream if needed.
send is asynchronous; for high-volume producers you might want to handle confirms and errors.
Consumer — StreamConsumer.java
StreamConsumer:
Connects to the same stream.
Starts consuming from the beginning of the stream.
Prints each message payload as text.
Create StreamConsumer.java:
package com.example;
import com.rabbitmq.stream.Environment;
import com.rabbitmq.stream.Consumer;
import com.rabbitmq.stream.OffsetSpecification;
import com.rabbitmq.stream.MessageHandler;
import java.nio.charset.StandardCharsets;
public class StreamConsumer {
private static final String STREAM_NAME = "hello-stream";
public static void main(String[] args) throws Exception {
Environment environment = Environment.builder()
.uri("rabbitmq-stream://localhost:5552")
.build();
// Define how to handle messages
MessageHandler handler = (context, message) -> {
byte[] body = message.getBodyAsBinary();
String text = new String(body, StandardCharsets.UTF_8);
System.out.println(" [x] Received: " + text +
" (offset: " + context.getOffset() + ")");
};
// Start consumer from the beginning of the stream
Consumer consumer = environment.consumerBuilder()
.stream(STREAM_NAME)
.offset(OffsetSpecification.first())
.messageHandler(handler)
.build();
System.out.println(" [*] Consuming from stream '" + STREAM_NAME +
"' starting at the first message. Press CTRL+C to exit.");
// Keep process alive (simple way)
Thread.currentThread().join();
// (Unreachable in this simple example, but good practice for structured shutdown)
// consumer.close();
// environment.close();
}
}
OffsetSpecification.first() means "start from the very first stored message".
Other specifications:
OffsetSpecification.last() — only new messages.
OffsetSpecification.offset(n) — start from a specific offset.
We call Thread.currentThread().join() to keep the main thread alive so the consumer can receive messages.
Building and Running the Example
cd rabbitmq-stream-java
mvn package
Run consumer first in one terminal:
mvn -q exec:java -Dexec.mainClass="com.example.StreamConsumer"
Run producer in another terminal:
mvn -q exec:java -Dexec.mainClass="com.example.StreamProducer"
Expected output:
[x] Sent 'Hello Stream from Java!' to stream 'hello-stream'
[*] Consuming from stream 'hello-stream' starting at the first message. Press CTRL+C to exit.
[x] Received: Hello Stream from Java! (offset: 0)
Streams vs Classic Queues
Classic Queues :
Messages are typically removed once acknowledged.
Consumers usually get each message at most once.
Replay requires extra patterns (dead letter, requeue, etc.).
Streams :
Messages stay on disk for a long time (according to retention policy).
Multiple consumers can read the same message at different times.
Consumers track their offset in the stream and can replay.
Designed for very high throughput and sequential disk access.