integration_splitter_aggregator 1.0.0

Download zip

integration-splitter-aggregator

The Splitter and Aggregator Enterprise Integration Pattern (EIP) tools for Aussom Server. The Splitter breaks one message into smaller pieces; the Aggregator collects related pieces back into one result. They are a pair: the Splitter breaks a message apart in exactly the way the Aggregator puts it back together.

These tools are for Aussom Server, not the plain CLI. The Aggregator holds state across messages and over time, and the server runtime owns its background thread. See design/integration-design.md for the full design.

The tools work on generic Aussom values. A piece payload can be any Aussom value - a string, a map, a user object, anything. This module defines no message type of its own and depends on no other module.

Include everything with:

include integration_splitter_aggregator.integration_splitter_aggregator;

Components

Class Purpose
Splitter Breaks a message into a list of split messages, stamped for reassembly
Aggregator Collects split messages by correlation id and fires a callback when a group is complete

The split-message convention

The Splitter and the Aggregator agree on one data shape: a split message. It is a native Aussom map that wraps a payload with the metadata needed to regroup it:

Key Type Meaning
correlationId string Ties all pieces of one original message together
sequenceNumber int Position of this piece, 1..sequenceSize
sequenceSize int Total number of pieces in the group
payload any The piece itself - any Aussom value

The Splitter sets all four keys. The Aggregator requires correlationId and payload; it uses sequenceNumber to order the result and sequenceSize for the count completion strategy. Because a split message is just a map, you can also feed the Aggregator pieces that did not come from this Splitter.

Splitter

Splitter breaks one message into a list of split messages. It is stateless, so one instance can be reused for many splits.

sp = new Splitter();
parts = sp.split([ "a", "b", "c" ]);

// parts[0] = { correlationId: <id>, sequenceNumber: 1, sequenceSize: 3, payload: "a" }
// Every piece shares one generated correlation id.

Pass a correlation id to join a flow that already has one:

parts = sp.split([ "a", "b" ], "order-42");

For a payload that is not already a list, give the Splitter a split function that turns the payload into the list of pieces:

sp = new Splitter(::toLines);          // toLines(payload) returns a list
parts = sp.split(bigDocument);

public toLines(Doc) { return Doc.split("\n"); }

An empty list of pieces returns an empty list.

Aggregator

Aggregator collects related split messages by correlation id into groups. When a group is complete it calls onComplete with the group's payloads in sequence order. It is an Aussom Server client listener: register it with app.registerListener and the runtime runs its background reaper on a dedicated thread. Feed it pieces with aggregate from any thread.

The completion strategy is set once with setCompletion:

A timeoutMs also guards the count and predicate strategies: a group that never completes is evicted after the window and onTimeout fires with the partial payloads, so a missing piece cannot leak memory. Always set a timeout in production.

include app;
include integration_splitter_aggregator.integration_splitter_aggregator;

class orders : AppBase {
    public orders() {
        agg = new Aggregator("order.lines");
        agg.setCompletion("count");          // complete when all pieces arrive
        agg.setTimeoutMs(30000);             // give up after 30s, no leak
        agg.setOnComplete(::onAssembled);
        agg.setOnTimeout(::onIncomplete);
        app.registerListener(agg);

        sp = new Splitter();
        for (part : sp.split(order.lines)) {
            // ... route or process each piece, then hand it to the aggregator
            agg.aggregate(part);
        }
    }

    public onAssembled(CorrelationId, Payloads) {
        console.info("order " + CorrelationId + " complete: " + Payloads.size() + " lines");
    }

    public onIncomplete(CorrelationId, Payloads) {
        console.warn("order " + CorrelationId + " timed out with " + Payloads.size() + " lines");
    }
}

Per-listener enabled and autoRestart tuning comes from applications.yaml, keyed by the aggregator name ("order.lines"), with no module code involved.

Method summary

Method Behavior
new Aggregator(string Name) The listener name used in logs and applications.yaml.
setCompletion(string Strategy) "count" (default), "predicate", or "timeout".
setCompletionCallback(callback Cb) Required for the predicate strategy.
setTimeoutMs(int Ms) Group lifetime; required for the timeout strategy, a safety net otherwise.
setReapIntervalMs(int Ms) Reaper sweep interval. Default 1000.
setOnComplete(callback Cb) Called with the reassembled payloads. Required.
setOnTimeout(callback Cb) Called for a group evicted by timeout. Optional.
aggregate(Part) Feeds one split message into its group.
startListener() / stopListener() Run and stop the reaper loop (the runtime drives these under the server).
pauseListener() / resumeListener() Pause and resume the timeout sweeps.
getListenerName() / getIsRunning() / getIsPaused() Introspection.

Enabling the bundled JAR under Aussom Server

The Aggregator is backed by a Java class in the module's bundled JAR (aussom-integration_splitter_aggregator.jar). On first include, the module loader calls app.loadJar(...) to put it on the classpath. Aussom Server blocks dynamic JAR loading by default, so the server's security manager must allow it, or the app fails at startup.

Add a securityManager block to the server section of config.yaml that turns loading on and allowlists this module's JAR:

local:
  server:
    # ...

    # Allow the integration-splitter-aggregator module to load its bundled JAR.
    securityManager:
      - "aussom.app.loadjar": true
      - "aussom.app.loadJarsAllowed": [
        "aussom-integration_splitter_aggregator.jar"
      ]

Building and testing

The aunit suite (integration_splitter_aggregator-test.aus) runs under the Aussom-Server runtime - the runtime the Aggregator listener is built for - not the bare aussom CLI. One-time setup: drop the Aussom-Server developer zip into the project root and install it.

unzip /path/to/aussom-server-<version>.zip
mv aussom-server-<version>/* .
rmdir aussom-server-<version>
java -jar aussom-server.jar -i

-i lays down apps/, logs/, lib/, an aussom-server CLI wrapper, and a config.yaml with generated keys. These are git-ignored; the repo commits only test-config.yaml (a secret-free config that enables app.loadJar for aussom-integration_splitter_aggregator.jar).

The Maven build copies the .aus sources and the bundled JAR into the integration_splitter_aggregator/ package directory, which the test include resolves against. That copy happens during the package phase, after the test phase, so build the package directory once before testing:

mvn package -DskipTests   # builds integration_splitter_aggregator/ (sources + jar)
mvn test                  # runs the JUnit and aunit suites

Tests.java launches the server's test runner against the package directory - equivalent to:

./aussom-server -cf test-config.yaml -t integration_splitter_aggregator-test.aus

AggregatorTests.java is a plain Java-to-Java test of the Aggregator class. After editing any .aus source, re-run mvn package -DskipTests so the package directory picks up the change, then mvn test.