integration_resequencer 1.0.0

Download zip

integration-resequencer

The Resequencer for Aussom Server. A Resequencer takes messages that arrive out of order and releases them in order, one at a time, by sequence number. It is a re-ordering buffer: it holds back a message that arrives early until the messages that should come before it have been released, then lets the run through. Use it wherever a path does not preserve order - parallel processing, multiple queues, retries, or competing consumers - but a downstream step needs the stream back in sequence.

The Resequencer stands on its own. It is not tied to any other component and defines no message type. You hand it a sequence number and a payload; the payload is any Aussom value - a string, an int, a map, a list, a user object, anything. Its only dependency is aussom-server-api.

This tool is for Aussom Server, not the plain CLI. It holds state across messages and over time, and the server runtime owns its background thread. See design/integration-design.md for the full design.

Include it with:

include integration_resequencer.integration_resequencer;

How it works

You feed the Resequencer one message at a time with accept:

rs.accept(SequenceNumber, Payload);                 // single stream
rs.accept(SequenceNumber, Payload, CorrelationId);  // a named stream

Only SequenceNumber (an int) and Payload (any value) are required. CorrelationId is the one optional extra: it lets a single Resequencer reorder several independent streams at once, each tracked on its own. Omit it and every message belongs to one default stream.

The Resequencer always looks at the head - the lowest-numbered message it is holding - and releases it when any of these is true:

When the head is released past a gap (by timeout or capacity), the optional onGap callback fires once for each skipped number.

Releasing in order

onRelease is called once per message, in order:

include app;
include integration_resequencer.integration_resequencer;

class events : AppBase {
    public events() {
        rs = new Resequencer("order.events");
        rs.setTimeoutMs(5000);               // hold a gap at most 5s, then skip
        rs.setOnRelease(::onOrdered);
        rs.setOnGap(::onMissing);
        app.registerListener(rs);

        // Messages processed in parallel arrive out of order; feed each
        // to the resequencer, which forwards them in sequence.
        rs.accept(3, evtC);
        rs.accept(1, evtA);
        rs.accept(2, evtB);
        // onOrdered fires for 1, then 2, then 3 - in order.
    }

    public onOrdered(SequenceNumber, Payload, CorrelationId) {
        console.info("deliver #" + SequenceNumber);
    }

    public onMissing(SkippedSequence, CorrelationId) {
        console.warn("skipped missing #" + SkippedSequence);
    }
}

onRelease is called as Cb(SequenceNumber, Payload, CorrelationId) and onGap as Cb(SkippedSequence, CorrelationId). CorrelationId is null for the default stream.

The hold window

setTimeoutMs is the heart of the Resequencer. It decides how long the head waits for a missing predecessor before it is released anyway.

A gap that never fills would otherwise wait forever, so the hold window doubles as the skip timeout. setCapacity(N) is a second guard that bounds memory: when a stream buffers more than N messages, the head is force-released even before the window elapses.

Named streams

Pass a CorrelationId to reorder several independent streams through one Resequencer:

rs.accept(2, b, "shipment-7");
rs.accept(1, a, "shipment-7");   // released as 1 then 2 for shipment-7
rs.accept(1, x, "shipment-9");   // a separate stream, ordered on its own

Each correlation id keeps its own cursor and buffer. This is the right tool when the set of streams is dynamic - per-order, per-session, per-device ids you only learn at runtime - so you cannot pre-create a Resequencer per stream. For a small, fixed set of streams, separate Resequencer objects are simpler. Note that each correlation id keeps a small resident entry for the life of the listener; prefer separate objects, or accept the footprint, for very high-cardinality ids.

Method summary

Method Behavior
new Resequencer(string Name) The listener name used in logs and applications.yaml.
setOnRelease(callback Cb) Called as Cb(SequenceNumber, Payload, CorrelationId) for each released message, in order. Required.
setTimeoutMs(int Ms) Hold window in ms. Default 1000; 0 disables the hold.
setCapacity(int N) Max buffered messages per stream before the head is force-released. Default 0 (unbounded).
setReapIntervalMs(int Ms) How often the background sweeper runs. Default 1000.
setOnGap(callback Cb) Called as Cb(SkippedSequence, CorrelationId) when a missing message is skipped. Optional.
accept(SequenceNumber, Payload, CorrelationId = null) Feeds one message into its stream.
startListener() / stopListener() Run and stop the sweeper loop (the runtime drives these under the server).
pauseListener() / resumeListener() Pause and resume the timeout/capacity sweeps.
getListenerName() / getIsRunning() / getIsPaused() Introspection.

Per-listener enabled and autoRestart tuning comes from applications.yaml, keyed by the resequencer name, with no module code involved.

Enabling the bundled JAR under Aussom Server

The Resequencer is backed by a Java class in the module's bundled JAR (aussom-integration_resequencer.jar). On first include, the module loader calls app.loadJar(...) to put it on the classpath. Aussom Server blocks dynamic JAR loading by default, so the server's security manager must allow it, or the app fails at startup.

Add a securityManager block to the server section of config.yaml that turns loading on and allowlists this module's JAR:

local:
  server:
    # ...

    # Allow the integration-resequencer module to load its bundled JAR.
    securityManager:
      - "aussom.app.loadjar": true
      - "aussom.app.loadJarsAllowed": [
        "aussom-integration_resequencer.jar"
      ]

Building and testing

The aunit suite (integration_resequencer-test.aus) runs under the Aussom-Server runtime - the runtime the Resequencer listener is built for - not the bare aussom CLI. One-time setup: drop the Aussom-Server developer zip into the project root and install it.

unzip /path/to/aussom-server-<version>.zip
mv aussom-server-<version>/* .
rmdir aussom-server-<version>
java -jar aussom-server.jar -i

-i lays down apps/, logs/, lib/, an aussom-server CLI wrapper, and a config.yaml with generated keys. These are git-ignored; the repo commits only test-config.yaml (a secret-free config that enables app.loadJar for aussom-integration_resequencer.jar).

The Maven build copies the .aus sources and the bundled JAR into the integration_resequencer/ package directory, which the test include resolves against. That copy happens during the package phase, after the test phase, so build the package directory once before testing:

mvn package -DskipTests   # builds integration_resequencer/ (sources + jar)
mvn test                  # runs the JUnit and aunit suites

Tests.java launches the server's test runner against the package directory - equivalent to:

./aussom-server -cf test-config.yaml -t integration_resequencer-test.aus

ResequencerTests.java is a plain Java-to-Java test of the Resequencer class. After editing any .aus source, re-run mvn package -DskipTests so the package directory picks up the change, then mvn test.