class: Resequencer
[50:14] (extern: com.lehman.aussom.Resequencer) extends: object
Resequencer takes messages that arrive out of order and releases them in order, one at a time, by sequence number. It holds back a message that arrives early until the messages that should come before it have been released, then lets the run through. It is an Aussom Server client listener: register it with app.registerListener and the server runtime runs its background sweeper on a dedicated thread. Feed it messages with accept() from any thread. The Resequencer is standalone and generic. It defines no message type and the payload may be any Aussom value. Only the sequence number and payload are required; an optional correlation id lets one Resequencer reorder several independent streams at once, each tracked on its own. The head (lowest buffered message) is released when it is the immediate successor of the last released number (no delay), when the buffer exceeds the capacity, or when it has waited the hold window (setTimeoutMs). A zero hold window releases the first message that arrives at once and drops any later, lower number as late.
Methods
-
Resequencer (
string Name)Creates a new Resequencer.
- @p
Nameis the stable listener name used in logs and applications.yaml.
- @p
-
newResequencer (
string Name) -
setTimeoutMs (
int Ms)Sets the hold window in milliseconds: how long the head of a stream waits for a missing predecessor before it is released anyway. Default 1000. A value of 0 disables the hold (the first message to arrive is taken as the start and released at once).
- @p
Msis an int with the hold window in milliseconds (>= 0). - @r
thisobject
- @p
-
setCapacity (
int N)Sets the maximum number of buffered messages per stream before the head is force-released to bound memory, even past a gap. Default 0 (unbounded).
- @p
Nis an int with the per-stream buffer cap (>= 0). - @r
thisobject
- @p
-
setReapIntervalMs (
int Ms)Sets how often the sweeper runs, in milliseconds. Default 1000.
- @p
Msis an int with the sweep interval in milliseconds (> 0). - @r
thisobject
- @p
-
setOnRelease (
callback Cb)Sets the release handler, called as Cb(SequenceNumber, Payload, CorrelationId) for each released message, in order. CorrelationId is null for the default stream. Required.
- @p
Cbis the release handler callback. - @r
thisobject
- @p
-
setOnGap (
callback Cb)Sets the gap handler, called as Cb(SkippedSequence, CorrelationId) when the timeout or capacity rule skips a missing message. CorrelationId is null for the default stream. Optional.
- @p
Cbis the gap handler callback. - @r
thisobject
- @p
-
accept (
SequenceNumber, Payload, CorrelationId = null)Feeds one message into its stream. Releases any now-deliverable messages on the calling thread.
- @p
SequenceNumberis an int with the message's position in its stream. - @p
Payloadis the message, any Aussom value. - @p
CorrelationIdis an optional string naming an independent stream (default null, the single default stream). - @r
thisobject
- @p
-
acceptMsg (
SequenceNumber, Payload, CorrelationId) -
startListener ()
Runs the sweeper loop on the calling thread, blocking until stopListener() is called. Under Aussom Server the runtime drives this after app.registerListener; called directly for plain-CLI use and tests.
- @r
thisobject
- @r
-
stopListener ()
Signals the sweeper loop to exit.
- @r
thisobject
- @r
-
pauseListener ()
Pauses the timeout and capacity sweeps without dropping buffered state.
- @r
thisobject
- @r
-
resumeListener ()
Resumes the sweeps after pauseListener().
- @r
thisobject
- @r
-
getListenerName ()
Gets the resequencer (listener) name.
- @r
Astring with the name.
- @r
-
getIsRunning ()
Reports whether the sweeper loop is running.
- @r
Abool, true while running.
- @r
-
getIsPaused ()
Reports whether the sweeper is paused.
- @r
Abool, true while paused.
- @r
class: integration_resequencer_module
[23:14] static extends: object
Module JAR loader class. Loads the bundled jar that backs the Resequencer before any extern class resolves.
Methods
- integration_resequencer_module ()