kafka 1.0.0

Download zip

kafka

Apache Kafka connector for Aussom. The full Kafka client API: producers (idempotence and transactions included), consumer groups, share groups (queue semantics), the complete Admin API, and two Aussom Server ClientListener connectors.

Built on kafka-clients 4.3.x. Kafka has no embedded broker, so this module is remote-only: every class connects to an outside Kafka cluster through a bootstrap address.

Classes

Class Purpose
KafkaProducer Sending: sync and fire-and-forget, headers, compression, transactions
KafkaConsumer Consumer-group polling: subscribe/assign, commits, seeks, offset queries
KafkaShareConsumer Queue semantics (broker 4.2+): per-record accept / release / reject
KafkaAdmin Cluster management: topics, configs, ACLs, groups, offsets, quotas, and more
KafkaRecord One received record: key, value, headers, timestamps, delivery count
KafkaListener Aussom Server ClientListener connector for consumer groups
KafkaShareListener Aussom Server ClientListener connector for share groups (queues)

include kafka.kafka; pulls in everything; individual files can be included instead (for example include kafka.producer;).

Key and value payloads are Aussom strings (UTF-8) by default; call setKeyFormat("bytes") / setValueFormat("bytes") on any client to work with Buffer payloads instead. Topic-partitions are { "topic": s, "partition": i } maps and offsets are lists of those maps with an added "offset" - the same shapes everywhere in the module.

Quick start: produce and consume

include kafka.kafka;

prod = new KafkaProducer("broker1:9092");
prod.connect();
md = prod.send("orders.created", "o-1001", "{\"qty\": 3}");
c.println("landed at offset " + md["offset"]);
prod.close();

cons = new KafkaConsumer("broker1:9092", "billing");
cons.setAutoOffsetReset("earliest");
cons.connect();
cons.subscribe([ "orders.created" ]);
recs = cons.poll(1000);
i = 0;
while (i < recs.size()) {
    c.println(recs[i].getKey() + ": " + recs[i].getValue());
    i = i + 1;
}
cons.commitSync();
cons.close();

Admin

adm = new KafkaAdmin("broker1:9092");
adm.connect();
adm.createTopic("orders.created", 6);
cfg = adm.describeConfigs("topic", "orders.created");
c.println(cfg["retention.ms"]["value"]);
adm.close();

Every Admin API lands here: topics, records, configs, ACLs, consumer / classic / share / streams groups, offsets, cluster and quorum describe, partition reassignment and election, log dirs, quotas, SCRAM credentials, delegation tokens, transactions, and client metrics resources. All methods are synchronous and return native Aussom maps and lists.

Share groups: Kafka as a queue

Share groups (broker 4.2+) make a topic behave like a message queue: any number of consumers pull from the same partitions and every record is individually accepted, released (redelivered), or rejected (poison). The broker's delivery-count limit archives records that keep failing.

cons = new KafkaShareConsumer("broker1:9092", "billing-workers");
cons.setAckMode("explicit");
cons.connect();
cons.subscribe([ "orders.created" ]);
recs = cons.poll(1000);
i = 0;
while (i < recs.size()) {
    try {
        this.handle(recs[i]);
        cons.acknowledge(recs[i], "accept");
    } catch (e) {
        cons.acknowledge(recs[i], "release");
    }
    i = i + 1;
}
cons.commitSync();

New share groups start at the latest record by default. To start from the beginning, set the GROUP config before consuming: adm.alterConfigs("group", "billing-workers", [ { "name": "share.auto.offset.reset", "value": "earliest" } ]);.

Aussom Server listeners

Under Aussom Server, continuous consumption belongs in a listener: the server runtime owns the thread, drives the lifecycle across reloads and shutdown, and applies the auto-restart policy from applications.yaml (keyed by the listener name).

include app;
include kafka.listener;

class orders : AppBase {
    public orders() {
        // Per-environment connection settings from the app's
        // properties file; the loaded section matches the server's
        // 'env' in config.yaml.
        props.load("orders-props.yaml");
        p = props.get();

        lst = new KafkaListener("orders.created");
        lst.setBootstrap(p["bootstrap"]);
        lst.setConfigs(p["kafkaSecurity"]);
        lst.setTopics([ "orders.created" ]);
        lst.setAutoOffsetReset("earliest");
        lst.setOnMessage(::onOrder);
        app.registerListener(lst);
    }

    public onOrder(object Rec) {
        console.info("order " + Rec.getKey() + ": " + Rec.getValue());
    }
}

KafkaListener knobs:

Handlers run synchronously on the listener thread, so one poll batch must finish inside max.poll.interval.ms (default 5 minutes) or the group evicts the listener. Slow handlers should lower setMaxPollRecords (for example to 1) or raise setMaxPollIntervalMs.

KafkaShareListener is the queue-flavored connector: accept on handler success and setErrorMode("release" | "reject" | "stop") on failure. Release (the default) redelivers, bounded by the broker's delivery-count limit.

Enabling the connector JAR under Aussom Server

The module ships its Java connector in a bundled JAR (aussom-kafka.jar). On first include, the module's loader calls app.loadJar("kafka/aussom-kafka.jar") to put it on the classpath. Aussom Server blocks dynamic JAR loading by default, so the server's security manager must allow it or the app fails at startup with one of these errors:

app.loadJar(): Security manager property 'aussom.app.loadjar' is false; dynamic jar loading is disabled on this server.
app.loadJar(): jar 'aussom-kafka.jar' is not permitted. ...

Add a securityManager block to the server section of config.yaml that turns loading on and allowlists this module's JAR:

local:
  server:
    # ...

    # Allow the kafka apac module to load its bundled
    # connector JAR with app.loadJar.
    securityManager:
      - "aussom.app.loadjar": true
      - "aussom.app.loadJarsAllowed": [
        "aussom-kafka.jar"
      ]

Setting "aussom.app.loadAllJars": true instead of the allowlist also works, but naming just the JARs you expect is the safer configuration. (This is exactly what test-config.yaml does so the test suite can load the module - see "Building and testing".)

Once the JAR is permitted, register a listener with app.registerListener and the runtime owns its lifecycle - a dedicated thread, reload/shutdown handling, and the auto-restart policy from applications.yaml keyed by the listener name. Outside the server (plain CLI, tests) the loop can be driven directly: startListener() blocks (run it on a Thread), stopListener() ends it.

Security

All security config flows through setConfig / setConfigs on any client: security.protocol, sasl.mechanism, sasl.jaas.config, and the ssl.* keys, so SASL_SSL, SCRAM, and OAUTHBEARER work with no dedicated API.

TLS stores have two supported paths:

  1. PEM inline (recommended under Aussom Server): pass the keys and certs as config strings - ssl.keystore.key, ssl.keystore.certificate.chain, ssl.truststore.certificates - so nothing is read from disk.
  2. Files: call setSslKeystoreFile / setSslTruststoreFile with an app-relative path. Under Aussom Server the path resolves through the server sandbox and must live inside the app's app_data directory. The raw ssl.keystore.location / ssl.truststore.location keys are rejected by setConfig so the sandbox cannot be bypassed.

Store secrets encrypted with the server's props mechanism (props.encrypt / props.decrypt) rather than in plain text - see the listener example above for the loading pattern.

Building and testing

The aunit suite (kafka-test.aus) runs under the Aussom-Server runtime - the runtime this module's listeners are built for - not the bare aussom CLI. One-time setup: drop the Aussom-Server developer zip into the project root and install it.

unzip /path/to/aussom-server-<version>.zip
mv aussom-server-<version>/* .
rmdir aussom-server-<version>
java -jar aussom-server.jar -i

-i lays down apps/, logs/, lib/, an aussom-server CLI wrapper, and a config.yaml with generated keys. These are git-ignored; the repo commits only test-config.yaml (a secret-free config that enables app.loadJar for aussom-kafka.jar). Then:

mvn clean package

The Maven build rebuilds the kafka/ package dir before the test phase, then Tests.java launches the server's test runner against it - equivalent to:

./aussom-server -cf test-config.yaml -t kafka-test.aus

(after a mvn package has produced kafka/aussom-kafka.jar). The other JUnit classes are plain Java-to-Java tests of the Java classes.

The tests need a Kafka broker at localhost:19092. When nothing is listening there, the suite starts a throwaway apache/kafka-native:4.3.0 container automatically (Docker required) and cleans it up afterward. To run against your own broker

docker run -d -p 19092:9092 \
  -e KAFKA_SHARE_COORDINATOR_STATE_TOPIC_REPLICATION_FACTOR=1 \
  -e KAFKA_SHARE_COORDINATOR_STATE_TOPIC_MIN_ISR=1 \
  apache/kafka-native:4.3.0

The delegation-token tests start their own short-lived SASL broker on a random port; everything else shares the fixed-port broker.