integration-splitter-aggregator
The Splitter and Aggregator Enterprise Integration Pattern (EIP) tools for Aussom Server. The Splitter breaks one message into smaller pieces; the Aggregator collects related pieces back into one result. They are a pair: the Splitter breaks a message apart in exactly the way the Aggregator puts it back together.
These tools are for Aussom Server, not the plain CLI. The Aggregator holds
state across messages and over time, and the server runtime owns its background
thread. See design/integration-design.md for the full design.
The tools work on generic Aussom values. A piece payload can be any Aussom
value - a string, a map, a user object, anything. This module defines no
message type of its own and depends on no other module.
Include everything with:
include integration_splitter_aggregator.integration_splitter_aggregator;
Components
| Class | Purpose |
|---|---|
Splitter |
Breaks a message into a list of split messages, stamped for reassembly |
Aggregator |
Collects split messages by correlation id and fires a callback when a group is complete |
The split-message convention
The Splitter and the Aggregator agree on one data shape: a split message.
It is a native Aussom map that wraps a payload with the metadata needed to
regroup it:
| Key | Type | Meaning |
|---|---|---|
correlationId |
string |
Ties all pieces of one original message together |
sequenceNumber |
int |
Position of this piece, 1..sequenceSize |
sequenceSize |
int |
Total number of pieces in the group |
payload |
any | The piece itself - any Aussom value |
The Splitter sets all four keys. The Aggregator requires correlationId and
payload; it uses sequenceNumber to order the result and sequenceSize for
the count completion strategy. Because a split message is just a map, you can
also feed the Aggregator pieces that did not come from this Splitter.
Splitter
Splitter breaks one message into a list of split messages. It is stateless,
so one instance can be reused for many splits.
sp = new Splitter();
parts = sp.split([ "a", "b", "c" ]);
// parts[0] = { correlationId: <id>, sequenceNumber: 1, sequenceSize: 3, payload: "a" }
// Every piece shares one generated correlation id.
Pass a correlation id to join a flow that already has one:
parts = sp.split([ "a", "b" ], "order-42");
For a payload that is not already a list, give the Splitter a split function that turns the payload into the list of pieces:
sp = new Splitter(::toLines); // toLines(payload) returns a list
parts = sp.split(bigDocument);
public toLines(Doc) { return Doc.split("\n"); }
An empty list of pieces returns an empty list.
Aggregator
Aggregator collects related split messages by correlation id into groups.
When a group is complete it calls onComplete with the group's payloads in
sequence order. It is an Aussom Server client listener: register it with
app.registerListener and the runtime runs its background reaper on a
dedicated thread. Feed it pieces with aggregate from any thread.
The completion strategy is set once with setCompletion:
"count"(default) - complete when the number of pieces equals thesequenceSizestamped by the Splitter."predicate"- complete when a callback returnstrue. The callback is called asCb(CorrelationId, Payloads)after each piece."timeout"- complete when the group's lifetime reachestimeoutMs(time-window aggregation, driven by the reaper).
A timeoutMs also guards the count and predicate strategies: a group that
never completes is evicted after the window and onTimeout fires with the
partial payloads, so a missing piece cannot leak memory. Always set a timeout
in production.
include app;
include integration_splitter_aggregator.integration_splitter_aggregator;
class orders : AppBase {
public orders() {
agg = new Aggregator("order.lines");
agg.setCompletion("count"); // complete when all pieces arrive
agg.setTimeoutMs(30000); // give up after 30s, no leak
agg.setOnComplete(::onAssembled);
agg.setOnTimeout(::onIncomplete);
app.registerListener(agg);
sp = new Splitter();
for (part : sp.split(order.lines)) {
// ... route or process each piece, then hand it to the aggregator
agg.aggregate(part);
}
}
public onAssembled(CorrelationId, Payloads) {
console.info("order " + CorrelationId + " complete: " + Payloads.size() + " lines");
}
public onIncomplete(CorrelationId, Payloads) {
console.warn("order " + CorrelationId + " timed out with " + Payloads.size() + " lines");
}
}
Per-listener enabled and autoRestart tuning comes from
applications.yaml, keyed by the aggregator name ("order.lines"), with no
module code involved.
Method summary
| Method | Behavior |
|---|---|
new Aggregator(string Name) |
The listener name used in logs and applications.yaml. |
setCompletion(string Strategy) |
"count" (default), "predicate", or "timeout". |
setCompletionCallback(callback Cb) |
Required for the predicate strategy. |
setTimeoutMs(int Ms) |
Group lifetime; required for the timeout strategy, a safety net otherwise. |
setReapIntervalMs(int Ms) |
Reaper sweep interval. Default 1000. |
setOnComplete(callback Cb) |
Called with the reassembled payloads. Required. |
setOnTimeout(callback Cb) |
Called for a group evicted by timeout. Optional. |
aggregate(Part) |
Feeds one split message into its group. |
startListener() / stopListener() |
Run and stop the reaper loop (the runtime drives these under the server). |
pauseListener() / resumeListener() |
Pause and resume the timeout sweeps. |
getListenerName() / getIsRunning() / getIsPaused() |
Introspection. |
Enabling the bundled JAR under Aussom Server
The Aggregator is backed by a Java class in the module's bundled JAR
(aussom-integration_splitter_aggregator.jar). On first include, the module
loader calls app.loadJar(...) to put it on the classpath. Aussom Server
blocks dynamic JAR loading by default, so the server's security manager must
allow it, or the app fails at startup.
Add a securityManager block to the server section of config.yaml that turns
loading on and allowlists this module's JAR:
local:
server:
# ...
# Allow the integration-splitter-aggregator module to load its bundled JAR.
securityManager:
- "aussom.app.loadjar": true
- "aussom.app.loadJarsAllowed": [
"aussom-integration_splitter_aggregator.jar"
]
Building and testing
The aunit suite (integration_splitter_aggregator-test.aus) runs under the
Aussom-Server runtime - the runtime the Aggregator listener is built for -
not the bare aussom CLI. One-time setup: drop the Aussom-Server developer zip
into the project root and install it.
unzip /path/to/aussom-server-<version>.zip
mv aussom-server-<version>/* .
rmdir aussom-server-<version>
java -jar aussom-server.jar -i
-i lays down apps/, logs/, lib/, an aussom-server CLI wrapper, and a
config.yaml with generated keys. These are git-ignored; the repo commits only
test-config.yaml (a secret-free config that enables app.loadJar for
aussom-integration_splitter_aggregator.jar).
The Maven build copies the .aus sources and the bundled JAR into the
integration_splitter_aggregator/ package directory, which the test include
resolves against. That copy happens during the package phase, after the test
phase, so build the package directory once before testing:
mvn package -DskipTests # builds integration_splitter_aggregator/ (sources + jar)
mvn test # runs the JUnit and aunit suites
Tests.java launches the server's test runner against the package directory -
equivalent to:
./aussom-server -cf test-config.yaml -t integration_splitter_aggregator-test.aus
AggregatorTests.java is a plain Java-to-Java test of the Aggregator class.
After editing any .aus source, re-run mvn package -DskipTests so the package
directory picks up the change, then mvn test.