rabbitmq 1.0.0

Download zip

rabbitmq

RabbitMQ connector for Aussom, built on the standard RabbitMQ Java client (com.rabbitmq:amqp-client 5.31.x, AMQP 0-9-1). Connects to any standard RabbitMQ server: single node, multi-node address lists, plain TCP or TLS. RabbitMQ runs as its own server process, so this module is remote-only - there is no embedded broker.

The client needs no local storage: it reads and writes no files.

Classes

Class Purpose
RabbitMqConnection Connection: URI or piecewise config, TLS, automatic recovery, address lists
RabbitMqChannel Exchanges, queues, bindings, publish, get, ack/nack, QoS, confirms, transactions
RabbitMqConsumer Synchronous pull receiving over push consume
RabbitMqMessage Bodies, AMQP properties, headers, whole-message map forms, ack convenience
RabbitMqListener Aussom Server ClientListener connector: push-style consumption on a server-managed thread

include rabbitmq.rabbitmq; pulls in everything; individual files can be included instead (for example include rabbitmq.channel;).

Quick start

include rabbitmq.rabbitmq;

con = new RabbitMqConnection("amqp://user:pass@mq.example.com:5672");
con.connect();
chan = con.openChannel();

q = chan.queueDeclare("orders", true);     // durable queue
chan.publish("", q, "hello");              // "" = default exchange

m = chan.get(q);
c.println(m.getBody());                    // hello

con.close();

Messages route by publishing to an exchange with a routing key; bindings decide which queues receive them. The empty-string exchange is the default exchange: the routing key is then the queue name.

Exchanges and routing

chan.exchangeDeclare("orders", "topic", true);
q = chan.queueDeclare("orders.eu", true);
chan.queueBind(q, "orders", "orders.*.eu");

chan.publish("orders", "orders.created.eu", "for the EU queue");

Exchange types: direct (exact key), fanout (every bound queue), topic (* one word, # any words), and headers (match on header values through the binding args map).

Messages as plain data

Everything that is a key/value table crosses the boundary as a native Aussom map. A whole message can be one literal:

chan.publish("orders", "orders.created", {
    "body": "order 1234",
    "persistent": true,
    "correlationId": "abc-123",
    "headers": { "region": "us-west", "retries": 0 }
});

Incoming messages dump the same shape back out:

m = chan.get("orders.eu", false);
data = m.toMap();    // body, properties, headers, exchange,
                     // routingKey, deliveryTag, redeliver
m.ack();

RabbitMqMessage also has per-field setters/getters, fromMap(), setProperties() / getProperties(), and Buffer bodies for binary payloads.

Consuming

Pull-style, with back-pressure through the prefetch window:

chan.qos(10);                        // at most 10 unacked in flight
cons = chan.consume("orders.eu");    // manual ack by default
while (true) {
    m = cons.receive(1000);
    if (m != null) {
        c.println(m.getBody());
        m.ack();
    }
}

Under Aussom Server, use the listener instead - the server runtime owns its thread and lifecycle:

include app;
include rabbitmq.listener;

class orders : AppBase {
    public orders() {
        lst = new RabbitMqListener("orders.created");
        lst.setUri("amqp://orders:secret@mq.example.com:5672/shop");
        lst.setQueue("orders.created");
        lst.setDeclareQueue(true, { "x-dead-letter-exchange": "dlx" });
        lst.setPrefetch(20);
        lst.setOnMessage(::onOrder);
        app.registerListener(lst);
    }

    public onOrder(object Msg) {
        console.info("got order: " + Msg.getBody());
        // manual ack mode: acked automatically on clean return
    }
}

The handler runs on the listener thread. In manual ack mode (the default) the listener acknowledges after the handler returns cleanly. When the handler errors, the requeue policy applies: once (default) retries a failure one time and then dead-letters or drops it; always keeps retrying (pair it with a dead-letter exchange or a poison message loops forever); never drops immediately. Network blips heal through the client's automatic recovery underneath the running loop; per-listener enabled / autoRestart tuning comes from applications.yaml keyed by the listener name.

Dead-lettering in three lines

chan.exchangeDeclare("dlx", "direct", true);
dlq = chan.queueDeclare("orders.dead", true);
chan.queueBind(dlq, "dlx", "orders.created");

Any queue declared with { "x-dead-letter-exchange": "dlx" } then routes its rejected (requeue=false) and expired messages to orders.dead.

Request/reply

Composed from publish plus RabbitMQ's built-in direct reply-to pseudo-queue - no queue churn, no extra classes:

// Requester: consume the pseudo-queue (auto-ack), send, wait.
replies = chan.consume("amq.rabbitmq.reply-to", true);
chan.publish("", "rpc.requests", {
    "body": "ping",
    "replyTo": "amq.rabbitmq.reply-to",
    "correlationId": "rr-1"
});
r = replies.receive(5000);

// Responder: answer to the request's replyTo with its correlationId.
req = chan.get("rpc.requests", false);
chan.publish("", req.getReplyTo(), {
    "body": "pong",
    "correlationId": req.getCorrelationId()
});
req.ack();

Feature notes

Building and testing

mvn package

The tests run against a real RabbitMQ server:

RABBITMQ_TEST_URI=amqp://guest:guest@localhost:5672/ mvn test

For interactive development a long-lived broker with the management UI is one line:

docker run -d --name rmq-dev -p 5672:5672 -p 15672:15672 rabbitmq:4-management

The test machinery (Testcontainers) is test scope only; nothing of it is part of the published APAC artifact.

Aussom Server

The module ships its Java connector in a bundled JAR (aussom-rabbitmq.jar). On first include, the module's loader calls app.loadJar("rabbitmq/aussom-rabbitmq.jar") to put it on the classpath. Aussom Server blocks dynamic JAR loading by default, so the server's security manager must allow it or the app fails at startup with one of these errors:

app.loadJar(): Security manager property 'aussom.app.loadjar' is false; dynamic jar loading is disabled on this server.
app.loadJar(): jar 'aussom-rabbitmq.jar' is not permitted. ...

Add a securityManager block to the server section of config.yaml that turns loading on and allowlists this module's JAR:

local:
  server:
    # ...

    # Allow the rabbitmq apac module to load its bundled
    # connector JAR with app.loadJar.
    securityManager:
      - "aussom.app.loadjar": true
      - "aussom.app.loadJarsAllowed": [
        "aussom-rabbitmq.jar"
      ]

Setting "aussom.app.loadAllJars": true instead of the allowlist also works, but naming just the JARs you expect is the safer configuration. No security manager changes are needed when running under the plain Aussom CLI.

Under the server, RabbitMqListener is the push-consumption path (see the listener example above): register it with app.registerListener and the runtime owns its lifecycle - a dedicated thread, reload/shutdown handling, and the auto-restart policy from applications.yaml keyed by the listener name. Outside the server (plain CLI, tests) the loop can be driven directly: startListener() blocks (run it on a Thread), stopListener() ends it.

License

Apache-2.0