integration-wiretap
The Wiretap for Aussom Server. A Wiretap is the classic EIP Wire Tap: as
messages move through your flow, you hand each one to the Wiretap so it can be
monitored, logged, audited, or debugged on the side - without slowing or
changing the main flow. In essence, the Wiretap is an async queue with a
handler. You submit a message with a fast tap call that only enqueues and
returns; a background worker thread later drains the queue and runs your
handler, which does the actual tap work.
The Wiretap stands on its own. It is not tied to any other component and
defines no message type. You hand it one message - any Aussom value, a string,
an int, a map, a list, a user object, anything - and it never looks inside
it. Its only dependency is aussom-server-api.
This tool is for Aussom Server, not the plain CLI. The tap work runs out of
band from the main flow, and the server runtime owns the background thread. See
design/integration-design.md for the full design.
Include it with:
include integration_wiretap.integration_wiretap;
How it works
You submit one message at a time with tap:
wt.tap(Message); // returns at once; Message is handled on the side
tap does one thing the caller can feel: it puts the message on a queue and
returns. The main flow is unaffected and continues on its own. A single
background worker takes messages off the queue, in arrival order, and calls your
handler for each one.
The guiding rule is that the tap must never slow or break the main flow.
tap only enqueues; all real work happens on the background thread. A slow or
failing handler can never back-pressure or break the path being observed.
Tapping messages
Set a handler with setOnTap; it is called once per message, in order:
include app;
include integration_wiretap.integration_wiretap;
class orders : AppBase {
public orders() {
wt = new Wiretap("order.audit");
wt.setOnTap(::auditMessage);
app.registerListener(wt);
// Tap each order inline; the call returns at once and the order is
// audited on the side, off the main flow.
wt.tap(order);
}
public auditMessage(Message) {
console.info("audit: " + Message.get("id"));
}
}
The handler is called as Cb(Message). The message is whatever you passed to
tap, untouched.
The Wiretap does not copy the message, so the same value you still hold is what
the handler receives later. Treat Message as read-only in the handler; if
your main flow will change the message after tap, pass a copy.
Generic and minimal
tap takes one message and nothing else. Any condition on what to tap - sampling,
filtering by type or content - lives in your own code before you call tap:
if (order.get("total") > 1000) {
wt.tap(order); // only tap the large orders
}
This keeps the Wiretap generic: it taps whatever it is given. For a separate tap channel, instantiate another Wiretap.
The tap queue
The queue is bounded so a slow handler cannot grow it without limit.
setCapacity(N)sets the maximum number of queued messages. The default is high (100000) so the Wiretap absorbs bursty traffic without dropping.- When the queue is full,
tapdrops the new message rather than block - the main flow must never stall.tapstill returns normally; only the side copy is lost. getDroppedCount()returns how many messages have been dropped, so a chronically slow handler is visible. The Wiretap also logs the first drop and then periodically.
This "best-effort tap" is the standard trade-off for a monitoring tap: observation must not become back-pressure on the thing being observed.
Method summary
| Method | Behavior |
|---|---|
new Wiretap(string Name) |
The listener name used in logs and applications.yaml. |
setOnTap(callback Cb) |
Called as Cb(Message) for each tapped message, in order. Required. |
setCapacity(int N) |
Max queued messages before new ones are dropped. Default 100000. |
tap(Message) |
Enqueues Message for the handler and returns at once. |
getDroppedCount() |
Number of messages dropped because the queue was full. |
startListener() / stopListener() |
Run and stop the worker loop (the runtime drives these under the server). |
pauseListener() / resumeListener() |
Pause and resume delivery (queued messages wait, none are dropped for the pause). |
getListenerName() / getIsRunning() / getIsPaused() |
Introspection. |
Per-listener enabled and autoRestart tuning comes from applications.yaml,
keyed by the wiretap name, with no module code involved.
Enabling the bundled JAR under Aussom Server
The Wiretap is backed by a Java class in the module's bundled JAR
(aussom-integration_wiretap.jar). On first include, the module loader calls
app.loadJar(...) to put it on the classpath. Aussom Server blocks dynamic JAR
loading by default, so the server's security manager must allow it, or the app
fails at startup.
Add a securityManager block to the server section of config.yaml that turns
loading on and allowlists this module's JAR:
local:
server:
# ...
# Allow the integration-wiretap module to load its bundled JAR.
securityManager:
- "aussom.app.loadjar": true
- "aussom.app.loadJarsAllowed": [
"aussom-integration_wiretap.jar"
]
Building and testing
The aunit suite (integration_wiretap-test.aus) runs under the Aussom-Server
runtime - the runtime the Wiretap listener is built for - not the bare aussom
CLI. One-time setup: drop the Aussom-Server developer zip into the project root
and install it.
unzip /path/to/aussom-server-<version>.zip
mv aussom-server-<version>/* .
rmdir aussom-server-<version>
java -jar aussom-server.jar -i
-i lays down apps/, logs/, lib/, an aussom-server CLI wrapper, and a
config.yaml with generated keys. These are git-ignored; the repo commits only
test-config.yaml (a secret-free config that enables app.loadJar for
aussom-integration_wiretap.jar).
The Maven build copies the .aus sources and the bundled JAR into the
integration_wiretap/ package directory, which the test include resolves
against. That copy happens during the package phase, after the test phase, so
build the package directory once before testing:
mvn package -DskipTests # builds integration_wiretap/ (sources + jar)
mvn test # runs the JUnit and aunit suites
Tests.java launches the server's test runner against the package directory -
equivalent to:
./aussom-server -cf test-config.yaml -t integration_wiretap-test.aus
WiretapTests.java is a plain Java-to-Java test of the Wiretap class. After
editing any .aus source, re-run mvn package -DskipTests so the package
directory picks up the change, then mvn test.