rabbitmq
RabbitMQ connector for Aussom, built on the standard RabbitMQ Java
client (com.rabbitmq:amqp-client 5.31.x, AMQP 0-9-1). Connects to
any standard RabbitMQ server: single node, multi-node address lists,
plain TCP or TLS. RabbitMQ runs as its own server process, so this
module is remote-only - there is no embedded broker.
The client needs no local storage: it reads and writes no files.
Classes
| Class | Purpose |
|---|---|
RabbitMqConnection |
Connection: URI or piecewise config, TLS, automatic recovery, address lists |
RabbitMqChannel |
Exchanges, queues, bindings, publish, get, ack/nack, QoS, confirms, transactions |
RabbitMqConsumer |
Synchronous pull receiving over push consume |
RabbitMqMessage |
Bodies, AMQP properties, headers, whole-message map forms, ack convenience |
RabbitMqListener |
Aussom Server ClientListener connector: push-style consumption on a server-managed thread |
include rabbitmq.rabbitmq; pulls in everything; individual files can
be included instead (for example include rabbitmq.channel;).
Quick start
include rabbitmq.rabbitmq;
con = new RabbitMqConnection("amqp://user:pass@mq.example.com:5672");
con.connect();
chan = con.openChannel();
q = chan.queueDeclare("orders", true); // durable queue
chan.publish("", q, "hello"); // "" = default exchange
m = chan.get(q);
c.println(m.getBody()); // hello
con.close();
Messages route by publishing to an exchange with a routing key; bindings decide which queues receive them. The empty-string exchange is the default exchange: the routing key is then the queue name.
Exchanges and routing
chan.exchangeDeclare("orders", "topic", true);
q = chan.queueDeclare("orders.eu", true);
chan.queueBind(q, "orders", "orders.*.eu");
chan.publish("orders", "orders.created.eu", "for the EU queue");
Exchange types: direct (exact key), fanout (every bound queue),
topic (* one word, # any words), and headers (match on header
values through the binding args map).
Messages as plain data
Everything that is a key/value table crosses the boundary as a native Aussom map. A whole message can be one literal:
chan.publish("orders", "orders.created", {
"body": "order 1234",
"persistent": true,
"correlationId": "abc-123",
"headers": { "region": "us-west", "retries": 0 }
});
Incoming messages dump the same shape back out:
m = chan.get("orders.eu", false);
data = m.toMap(); // body, properties, headers, exchange,
// routingKey, deliveryTag, redeliver
m.ack();
RabbitMqMessage also has per-field setters/getters, fromMap(),
setProperties() / getProperties(), and Buffer bodies for binary
payloads.
Consuming
Pull-style, with back-pressure through the prefetch window:
chan.qos(10); // at most 10 unacked in flight
cons = chan.consume("orders.eu"); // manual ack by default
while (true) {
m = cons.receive(1000);
if (m != null) {
c.println(m.getBody());
m.ack();
}
}
Under Aussom Server, use the listener instead - the server runtime owns its thread and lifecycle:
include app;
include rabbitmq.listener;
class orders : AppBase {
public orders() {
lst = new RabbitMqListener("orders.created");
lst.setUri("amqp://orders:secret@mq.example.com:5672/shop");
lst.setQueue("orders.created");
lst.setDeclareQueue(true, { "x-dead-letter-exchange": "dlx" });
lst.setPrefetch(20);
lst.setOnMessage(::onOrder);
app.registerListener(lst);
}
public onOrder(object Msg) {
console.info("got order: " + Msg.getBody());
// manual ack mode: acked automatically on clean return
}
}
The handler runs on the listener thread. In manual ack mode (the
default) the listener acknowledges after the handler returns cleanly.
When the handler errors, the requeue policy applies: once (default)
retries a failure one time and then dead-letters or drops it;
always keeps retrying (pair it with a dead-letter exchange or a
poison message loops forever); never drops immediately. Network
blips heal through the client's automatic recovery underneath the
running loop; per-listener enabled / autoRestart tuning comes
from applications.yaml keyed by the listener name.
Dead-lettering in three lines
chan.exchangeDeclare("dlx", "direct", true);
dlq = chan.queueDeclare("orders.dead", true);
chan.queueBind(dlq, "dlx", "orders.created");
Any queue declared with { "x-dead-letter-exchange": "dlx" } then
routes its rejected (requeue=false) and expired messages to
orders.dead.
Request/reply
Composed from publish plus RabbitMQ's built-in direct reply-to pseudo-queue - no queue churn, no extra classes:
// Requester: consume the pseudo-queue (auto-ack), send, wait.
replies = chan.consume("amq.rabbitmq.reply-to", true);
chan.publish("", "rpc.requests", {
"body": "ping",
"replyTo": "amq.rabbitmq.reply-to",
"correlationId": "rr-1"
});
r = replies.receive(5000);
// Responder: answer to the request's replyTo with its correlationId.
req = chan.get("rpc.requests", false);
chan.publish("", req.getReplyTo(), {
"body": "pong",
"correlationId": req.getCorrelationId()
});
req.ack();
Feature notes
- Queue features are
x-arguments onqueueDeclare, not separate APIs:x-message-ttl,x-expires,x-max-length,x-dead-letter-exchange,x-max-priority,x-queue-type(classic/quorum/stream),x-single-active-consumer. - RabbitMQ 4.2+ denies transient non-exclusive queues by default
(a deprecated feature). Declare queues durable
(
queueDeclare(name, true)) or exclusive; server-namedqueueDeclare()queues are exclusive and still fine. - Publisher confirms:
confirmSelect(), thenwaitForConfirms()/waitForConfirmsOrDie(). Mutually exclusive with transactions. - Transactions:
txSelect()/txCommit()/txRollback()batch publishes and acks on the channel. - Mandatory returns: publish with
Mandatory = trueand drain unroutable messages withpollReturned(). - TLS:
useTls()uses the JVM truststore with hostname verification;useTlsNoVerify()accepts self-signed certs for development only and logs a warning. - Threading: connections are thread safe; channels are not. Use each channel from one thread - the same rule as the Java client.
- guest/guest only works from localhost; RabbitMQ itself refuses it remotely. Create a real user for anything else.
- No deserialization surface: bodies are plain bytes (string or Buffer). There is no object-message machinery to secure.
Building and testing
mvn package
The tests run against a real RabbitMQ server:
- With Docker installed, the suite starts a throwaway
rabbitmq:4-alpinecontainer on a random port and removes it afterward. This is the default - just runmvn test. - To use an existing broker instead (or without Docker), set
RABBITMQ_TEST_URI, for example:
RABBITMQ_TEST_URI=amqp://guest:guest@localhost:5672/ mvn test
For interactive development a long-lived broker with the management UI is one line:
docker run -d --name rmq-dev -p 5672:5672 -p 15672:15672 rabbitmq:4-management
The test machinery (Testcontainers) is test scope only; nothing of it is part of the published APAC artifact.
Aussom Server
The module ships its Java connector in a bundled JAR
(aussom-rabbitmq.jar). On first include, the module's loader calls
app.loadJar("rabbitmq/aussom-rabbitmq.jar") to put it on the
classpath. Aussom Server blocks dynamic JAR loading by default, so
the server's security manager must allow it or the app fails at
startup with one of these errors:
app.loadJar(): Security manager property 'aussom.app.loadjar' is false; dynamic jar loading is disabled on this server.
app.loadJar(): jar 'aussom-rabbitmq.jar' is not permitted. ...
Add a securityManager block to the server section of config.yaml
that turns loading on and allowlists this module's JAR:
local:
server:
# ...
# Allow the rabbitmq apac module to load its bundled
# connector JAR with app.loadJar.
securityManager:
- "aussom.app.loadjar": true
- "aussom.app.loadJarsAllowed": [
"aussom-rabbitmq.jar"
]
Setting "aussom.app.loadAllJars": true instead of the allowlist
also works, but naming just the JARs you expect is the safer
configuration. No security manager changes are needed when running
under the plain Aussom CLI.
Under the server, RabbitMqListener is the push-consumption path
(see the listener example above): register it with
app.registerListener and the runtime owns its lifecycle - a
dedicated thread, reload/shutdown handling, and the auto-restart
policy from applications.yaml keyed by the listener name. Outside
the server (plain CLI, tests) the loop can be driven directly:
startListener() blocks (run it on a Thread), stopListener()
ends it.
License
Apache-2.0