integration-resequencer
The Resequencer for Aussom Server. A Resequencer takes messages that arrive out of order and releases them in order, one at a time, by sequence number. It is a re-ordering buffer: it holds back a message that arrives early until the messages that should come before it have been released, then lets the run through. Use it wherever a path does not preserve order - parallel processing, multiple queues, retries, or competing consumers - but a downstream step needs the stream back in sequence.
The Resequencer stands on its own. It is not tied to any other component and
defines no message type. You hand it a sequence number and a payload; the
payload is any Aussom value - a string, an int, a map, a list, a user
object, anything. Its only dependency is aussom-server-api.
This tool is for Aussom Server, not the plain CLI. It holds state across
messages and over time, and the server runtime owns its background thread. See
design/integration-design.md for the full design.
Include it with:
include integration_resequencer.integration_resequencer;
How it works
You feed the Resequencer one message at a time with accept:
rs.accept(SequenceNumber, Payload); // single stream
rs.accept(SequenceNumber, Payload, CorrelationId); // a named stream
Only SequenceNumber (an int) and Payload (any value) are required.
CorrelationId is the one optional extra: it lets a single Resequencer reorder
several independent streams at once, each tracked on its own. Omit it and every
message belongs to one default stream.
The Resequencer always looks at the head - the lowest-numbered message it is holding - and releases it when any of these is true:
- In sequence - the head is the immediate successor of the last released number. It is forwarded with no delay.
- Timeout - the head has waited the hold window (
setTimeoutMs, default 1000 ms). This releases the very first message of a stream (it waits one window so any lower, delayed message can sort ahead of it) and eventually skips a gap that never fills. - Capacity - the buffer has grown past
setCapacity(N). The head is released to bound memory, even past a gap.
When the head is released past a gap (by timeout or capacity), the optional
onGap callback fires once for each skipped number.
Releasing in order
onRelease is called once per message, in order:
include app;
include integration_resequencer.integration_resequencer;
class events : AppBase {
public events() {
rs = new Resequencer("order.events");
rs.setTimeoutMs(5000); // hold a gap at most 5s, then skip
rs.setOnRelease(::onOrdered);
rs.setOnGap(::onMissing);
app.registerListener(rs);
// Messages processed in parallel arrive out of order; feed each
// to the resequencer, which forwards them in sequence.
rs.accept(3, evtC);
rs.accept(1, evtA);
rs.accept(2, evtB);
// onOrdered fires for 1, then 2, then 3 - in order.
}
public onOrdered(SequenceNumber, Payload, CorrelationId) {
console.info("deliver #" + SequenceNumber);
}
public onMissing(SkippedSequence, CorrelationId) {
console.warn("skipped missing #" + SkippedSequence);
}
}
onRelease is called as Cb(SequenceNumber, Payload, CorrelationId) and onGap
as Cb(SkippedSequence, CorrelationId). CorrelationId is null for the
default stream.
The hold window
setTimeoutMs is the heart of the Resequencer. It decides how long the head
waits for a missing predecessor before it is released anyway.
- Default (1000 ms) or any positive value - the buffer reorders. The first message of a stream and any message after a gap may be delayed up to one window; in-order messages flow with no delay. Set the window to roughly the worst-case spread between when related messages arrive.
setTimeoutMs(0)- no hold. The first message to arrive becomes the baseline and is released at once, in-order messages pass straight through, and any later message numbered below where the stream has advanced is dropped as late. Use this for streams that are normally in order and should not pay the initial delay.
A gap that never fills would otherwise wait forever, so the hold window doubles
as the skip timeout. setCapacity(N) is a second guard that bounds memory: when
a stream buffers more than N messages, the head is force-released even before
the window elapses.
Named streams
Pass a CorrelationId to reorder several independent streams through one
Resequencer:
rs.accept(2, b, "shipment-7");
rs.accept(1, a, "shipment-7"); // released as 1 then 2 for shipment-7
rs.accept(1, x, "shipment-9"); // a separate stream, ordered on its own
Each correlation id keeps its own cursor and buffer. This is the right tool when the set of streams is dynamic - per-order, per-session, per-device ids you only learn at runtime - so you cannot pre-create a Resequencer per stream. For a small, fixed set of streams, separate Resequencer objects are simpler. Note that each correlation id keeps a small resident entry for the life of the listener; prefer separate objects, or accept the footprint, for very high-cardinality ids.
Method summary
| Method | Behavior |
|---|---|
new Resequencer(string Name) |
The listener name used in logs and applications.yaml. |
setOnRelease(callback Cb) |
Called as Cb(SequenceNumber, Payload, CorrelationId) for each released message, in order. Required. |
setTimeoutMs(int Ms) |
Hold window in ms. Default 1000; 0 disables the hold. |
setCapacity(int N) |
Max buffered messages per stream before the head is force-released. Default 0 (unbounded). |
setReapIntervalMs(int Ms) |
How often the background sweeper runs. Default 1000. |
setOnGap(callback Cb) |
Called as Cb(SkippedSequence, CorrelationId) when a missing message is skipped. Optional. |
accept(SequenceNumber, Payload, CorrelationId = null) |
Feeds one message into its stream. |
startListener() / stopListener() |
Run and stop the sweeper loop (the runtime drives these under the server). |
pauseListener() / resumeListener() |
Pause and resume the timeout/capacity sweeps. |
getListenerName() / getIsRunning() / getIsPaused() |
Introspection. |
Per-listener enabled and autoRestart tuning comes from applications.yaml,
keyed by the resequencer name, with no module code involved.
Enabling the bundled JAR under Aussom Server
The Resequencer is backed by a Java class in the module's bundled JAR
(aussom-integration_resequencer.jar). On first include, the module loader
calls app.loadJar(...) to put it on the classpath. Aussom Server blocks
dynamic JAR loading by default, so the server's security manager must allow it,
or the app fails at startup.
Add a securityManager block to the server section of config.yaml that turns
loading on and allowlists this module's JAR:
local:
server:
# ...
# Allow the integration-resequencer module to load its bundled JAR.
securityManager:
- "aussom.app.loadjar": true
- "aussom.app.loadJarsAllowed": [
"aussom-integration_resequencer.jar"
]
Building and testing
The aunit suite (integration_resequencer-test.aus) runs under the
Aussom-Server runtime - the runtime the Resequencer listener is built for -
not the bare aussom CLI. One-time setup: drop the Aussom-Server developer zip
into the project root and install it.
unzip /path/to/aussom-server-<version>.zip
mv aussom-server-<version>/* .
rmdir aussom-server-<version>
java -jar aussom-server.jar -i
-i lays down apps/, logs/, lib/, an aussom-server CLI wrapper, and a
config.yaml with generated keys. These are git-ignored; the repo commits only
test-config.yaml (a secret-free config that enables app.loadJar for
aussom-integration_resequencer.jar).
The Maven build copies the .aus sources and the bundled JAR into the
integration_resequencer/ package directory, which the test include resolves
against. That copy happens during the package phase, after the test phase, so
build the package directory once before testing:
mvn package -DskipTests # builds integration_resequencer/ (sources + jar)
mvn test # runs the JUnit and aunit suites
Tests.java launches the server's test runner against the package directory -
equivalent to:
./aussom-server -cf test-config.yaml -t integration_resequencer-test.aus
ResequencerTests.java is a plain Java-to-Java test of the Resequencer class.
After editing any .aus source, re-run mvn package -DskipTests so the package
directory picks up the change, then mvn test.