kafka
Apache Kafka connector for Aussom. The full Kafka client API:
producers (idempotence and transactions included), consumer groups,
share groups (queue semantics), the complete Admin API, and two
Aussom Server ClientListener connectors.
Built on kafka-clients 4.3.x. Kafka has no embedded broker, so this
module is remote-only: every class connects to an outside Kafka
cluster through a bootstrap address.
Classes
| Class | Purpose |
|---|---|
KafkaProducer |
Sending: sync and fire-and-forget, headers, compression, transactions |
KafkaConsumer |
Consumer-group polling: subscribe/assign, commits, seeks, offset queries |
KafkaShareConsumer |
Queue semantics (broker 4.2+): per-record accept / release / reject |
KafkaAdmin |
Cluster management: topics, configs, ACLs, groups, offsets, quotas, and more |
KafkaRecord |
One received record: key, value, headers, timestamps, delivery count |
KafkaListener |
Aussom Server ClientListener connector for consumer groups |
KafkaShareListener |
Aussom Server ClientListener connector for share groups (queues) |
include kafka.kafka; pulls in everything; individual files can be
included instead (for example include kafka.producer;).
Key and value payloads are Aussom strings (UTF-8) by default; call
setKeyFormat("bytes") / setValueFormat("bytes") on any client to
work with Buffer payloads instead. Topic-partitions are
{ "topic": s, "partition": i } maps and offsets are lists of those
maps with an added "offset" - the same shapes everywhere in the
module.
Quick start: produce and consume
include kafka.kafka;
prod = new KafkaProducer("broker1:9092");
prod.connect();
md = prod.send("orders.created", "o-1001", "{\"qty\": 3}");
c.println("landed at offset " + md["offset"]);
prod.close();
cons = new KafkaConsumer("broker1:9092", "billing");
cons.setAutoOffsetReset("earliest");
cons.connect();
cons.subscribe([ "orders.created" ]);
recs = cons.poll(1000);
i = 0;
while (i < recs.size()) {
c.println(recs[i].getKey() + ": " + recs[i].getValue());
i = i + 1;
}
cons.commitSync();
cons.close();
Admin
adm = new KafkaAdmin("broker1:9092");
adm.connect();
adm.createTopic("orders.created", 6);
cfg = adm.describeConfigs("topic", "orders.created");
c.println(cfg["retention.ms"]["value"]);
adm.close();
Every Admin API lands here: topics, records, configs, ACLs, consumer / classic / share / streams groups, offsets, cluster and quorum describe, partition reassignment and election, log dirs, quotas, SCRAM credentials, delegation tokens, transactions, and client metrics resources. All methods are synchronous and return native Aussom maps and lists.
Share groups: Kafka as a queue
Share groups (broker 4.2+) make a topic behave like a message queue: any number of consumers pull from the same partitions and every record is individually accepted, released (redelivered), or rejected (poison). The broker's delivery-count limit archives records that keep failing.
cons = new KafkaShareConsumer("broker1:9092", "billing-workers");
cons.setAckMode("explicit");
cons.connect();
cons.subscribe([ "orders.created" ]);
recs = cons.poll(1000);
i = 0;
while (i < recs.size()) {
try {
this.handle(recs[i]);
cons.acknowledge(recs[i], "accept");
} catch (e) {
cons.acknowledge(recs[i], "release");
}
i = i + 1;
}
cons.commitSync();
New share groups start at the latest record by default. To start
from the beginning, set the GROUP config before consuming:
adm.alterConfigs("group", "billing-workers", [ { "name": "share.auto.offset.reset", "value": "earliest" } ]);.
Aussom Server listeners
Under Aussom Server, continuous consumption belongs in a listener:
the server runtime owns the thread, drives the lifecycle across
reloads and shutdown, and applies the auto-restart policy from
applications.yaml (keyed by the listener name).
include app;
include kafka.listener;
class orders : AppBase {
public orders() {
// Per-environment connection settings from the app's
// properties file; the loaded section matches the server's
// 'env' in config.yaml.
props.load("orders-props.yaml");
p = props.get();
lst = new KafkaListener("orders.created");
lst.setBootstrap(p["bootstrap"]);
lst.setConfigs(p["kafkaSecurity"]);
lst.setTopics([ "orders.created" ]);
lst.setAutoOffsetReset("earliest");
lst.setOnMessage(::onOrder);
app.registerListener(lst);
}
public onOrder(object Rec) {
console.info("order " + Rec.getKey() + ": " + Rec.getValue());
}
}
KafkaListener knobs:
setCommitMode("batch" | "record" | "auto")- when offsets commit. Batch (the default) commits once per cleanly handled poll batch.setErrorMode("continue" | "seek" | "stop")- what a handler error does. Continue (the default) skips the record; seek rewinds so it redelivers (beware poison records); stop exits the loop and hands control to the server's auto-restart policy.setDeadLetterTopic(name)- copy failed records (withdlt.original.*anddlt.errorheaders) before skipping them. If the copy itself fails the loop stops, so a record is never lost.setOnBatch(cb)- batch dispatch: one callback per poll with a list of records, error modes at batch granularity.setOnAssigned(cb)/setOnRevoked(cb)- rebalance hooks.
Handlers run synchronously on the listener thread, so one poll
batch must finish inside max.poll.interval.ms (default 5 minutes)
or the group evicts the listener. Slow handlers should lower
setMaxPollRecords (for example to 1) or raise
setMaxPollIntervalMs.
KafkaShareListener is the queue-flavored connector: accept on
handler success and setErrorMode("release" | "reject" | "stop") on
failure. Release (the default) redelivers, bounded by the broker's
delivery-count limit.
Enabling the connector JAR under Aussom Server
The module ships its Java connector in a bundled JAR
(aussom-kafka.jar). On first include, the module's loader calls
app.loadJar("kafka/aussom-kafka.jar") to put it on the classpath.
Aussom Server blocks dynamic JAR loading by default, so the server's
security manager must allow it or the app fails at startup with one
of these errors:
app.loadJar(): Security manager property 'aussom.app.loadjar' is false; dynamic jar loading is disabled on this server.
app.loadJar(): jar 'aussom-kafka.jar' is not permitted. ...
Add a securityManager block to the server section of config.yaml
that turns loading on and allowlists this module's JAR:
local:
server:
# ...
# Allow the kafka apac module to load its bundled
# connector JAR with app.loadJar.
securityManager:
- "aussom.app.loadjar": true
- "aussom.app.loadJarsAllowed": [
"aussom-kafka.jar"
]
Setting "aussom.app.loadAllJars": true instead of the allowlist
also works, but naming just the JARs you expect is the safer
configuration. (This is exactly what test-config.yaml does so the
test suite can load the module - see "Building and testing".)
Once the JAR is permitted, register a listener with
app.registerListener and the runtime owns its lifecycle - a
dedicated thread, reload/shutdown handling, and the auto-restart
policy from applications.yaml keyed by the listener name. Outside
the server (plain CLI, tests) the loop can be driven directly:
startListener() blocks (run it on a Thread), stopListener()
ends it.
Security
All security config flows through setConfig / setConfigs on any
client: security.protocol, sasl.mechanism, sasl.jaas.config,
and the ssl.* keys, so SASL_SSL, SCRAM, and OAUTHBEARER work with
no dedicated API.
TLS stores have two supported paths:
- PEM inline (recommended under Aussom Server): pass the keys and
certs as config strings -
ssl.keystore.key,ssl.keystore.certificate.chain,ssl.truststore.certificates- so nothing is read from disk. - Files: call
setSslKeystoreFile/setSslTruststoreFilewith an app-relative path. Under Aussom Server the path resolves through the server sandbox and must live inside the app'sapp_datadirectory. The rawssl.keystore.location/ssl.truststore.locationkeys are rejected bysetConfigso the sandbox cannot be bypassed.
Store secrets encrypted with the server's props mechanism
(props.encrypt / props.decrypt) rather than in plain text - see
the listener example above for the loading pattern.
Building and testing
The aunit suite (kafka-test.aus) runs under the Aussom-Server
runtime - the runtime this module's listeners are built for - not
the bare aussom CLI. One-time setup: drop the Aussom-Server
developer zip into the project root and install it.
unzip /path/to/aussom-server-<version>.zip
mv aussom-server-<version>/* .
rmdir aussom-server-<version>
java -jar aussom-server.jar -i
-i lays down apps/, logs/, lib/, an aussom-server CLI
wrapper, and a config.yaml with generated keys. These are
git-ignored; the repo commits only test-config.yaml (a secret-free
config that enables app.loadJar for aussom-kafka.jar). Then:
mvn clean package
The Maven build rebuilds the kafka/ package dir before the test
phase, then Tests.java launches the server's test runner against
it - equivalent to:
./aussom-server -cf test-config.yaml -t kafka-test.aus
(after a mvn package has produced kafka/aussom-kafka.jar). The
other JUnit classes are plain Java-to-Java tests of the Java
classes.
The tests need a Kafka broker at localhost:19092. When nothing is
listening there, the suite starts a throwaway
apache/kafka-native:4.3.0 container automatically (Docker
required) and cleans it up afterward. To run against your own broker
- which the share-group tests need configured for a single node - start one with:
docker run -d -p 19092:9092 \
-e KAFKA_SHARE_COORDINATOR_STATE_TOPIC_REPLICATION_FACTOR=1 \
-e KAFKA_SHARE_COORDINATOR_STATE_TOPIC_MIN_ISR=1 \
apache/kafka-native:4.3.0
The delegation-token tests start their own short-lived SASL broker on a random port; everything else shares the fixed-port broker.