integration-core
Core Enterprise Integration Pattern (EIP) tools for Aussom. This module provides a small set of message and concurrency components that you can use in both the Aussom CLI and Aussom Server.
Unlike the server-only connector modules (Kafka, RabbitMQ, ActiveMQ), the
tools here are synchronous and run anywhere - in the CLI or the Server.
Patterns that move messages across channels or need server infrastructure -
Aggregator, Resequencer, Wire Tap - are left to server-specific
integration-<name> modules. See design/integration-design.md for the full
design.
Components
| Class | Purpose |
|---|---|
IMessage |
Standard message value: attributes, content type, body, correlation id, deep copy |
iutils |
Build an IMessage from an Aussom Server HttpReq; generate correlation ids |
ParallelForeach |
Run a callback over each value in a list across a thread pool; gather results |
ScatterGather |
Send one payload to many recipient callbacks in parallel; gather results |
Pipeline |
Run a value through an ordered chain of step callbacks; short-circuit, hooks, error handling |
IdempotentReceiver |
Thread-safe duplicate filter: track seen message ids, in memory or in a JDBC table, so each id is handled once |
Include everything with:
include integration_core.integration_core;
IMessage, iutils, Pipeline, and IdempotentReceiver are pure Aussom
(IdempotentReceiver uses the stdlib concurrent library for thread safety).
ParallelForeach and ScatterGather are backed by a Java thread-pool engine
that ships in the module's bundled JAR (loaded automatically on include).
IMessage
IMessage is the unit the other tools pass around. It holds an attribute map
(headers, query params, and the like), a content type, a body, and a
correlationId. A new message gets a generated correlation id, so every
message can be traced. Setters return the message, so calls chain.
include integration_core.integration_core;
msg = new IMessage();
msg.setType("application/json")
.setBody("{\"id\": 42}")
.setAttributes({ "source": "orders" });
c.println(msg.getCorrelationId()); // generated unless you set one
c.println(msg.getType()); // application/json
// copy() is a deep, independent copy that keeps the same correlation id.
dup = msg.copy();
The body holds native Aussom values: a string for text, JSON, or XML; a
Buffer for binary; or an Aussom object (use type application/aussom).
iutils
iutils is a static helper class.
iutils.newCorrelationId() returns a fresh unique id string.
iutils.message(Req) converts an Aussom Server HttpReq into an IMessage,
using only the request's accessor methods. The headers, method, path, and
query parameters go into the attributes. The correlation id comes from the
X-Correlation-ID header when present, or is generated. The body and type
depend on the content type:
- Form requests (
multipart/form-dataorapplication/x-www-form-urlencoded) use the parsed form data as the body (a map keyed by field name, with any uploaded files asBuffervalues) and the typeapplication/aussom. - Everything else reads the body as a string, with the type taken from the
Content-Typeheader (defaulting totext/plain).
// Inside an Aussom Server route handler that receives an HttpReq:
msg = iutils.message(req);
c.println(msg.getCorrelationId());
c.println(msg.getAttributes()["method"]); // for example "POST"
The result is pure Aussom and holds no server objects, so it stays usable after the request, including in the CLI.
ParallelForeach
ParallelForeach runs a callback once per value in a list, in parallel,
across a bounded thread pool, and blocks until every value is processed. The
callback's return value becomes that value's result, and results come back in
input order.
class App {
public run() {
pfe = new ParallelForeach([ "alpha", "beta", "gamma" ], ::process);
pfe.setNumThreads(4); // default: available processors
pfe.setThreadTimeout(5000); // per-worker ms; 0 (default) = no timeout
results = pfe.run();
// results = [ "ALPHA", "BETA", "GAMMA" ], aligned to input order.
// A value that throws or times out leaves null in its slot.
c.println(results[0]);
}
public process(Item) { return Item.toUpper(); }
}
Set setReturnValues(false) to skip collecting results (for side-effect-only
work); run() then returns null.
ScatterGather
ScatterGather sends the same payload to several recipient callbacks at once,
runs them in parallel, and gathers their results in recipient order.
class App {
public run() {
order = new IMessage();
order.setBody("{\"sku\": \"A-1\", \"qty\": 3}");
sg = new ScatterGather(order, [ ::checkInventory, ::checkCredit, ::checkFraud ]);
results = sg.run();
// results in recipient order: [ inventoryResult, creditResult, fraudResult ].
// A recipient that throws or times out leaves null in its slot.
c.println(results[0]);
}
public checkInventory(Msg) { return "in-stock"; }
public checkCredit(Msg) { return "approved"; }
public checkFraud(Msg) { return "clear"; }
}
ScatterGather shares the setNumThreads, setThreadTimeout, and
setReturnValues settings with ParallelForeach. Recipients share the payload
by reference; a recipient that needs to mutate it should copy() first.
Pipeline
Pipeline runs a value through an ordered list of step callbacks, each step's
output feeding the next. It is the sequential sibling of ParallelForeach.
A step returns the next value to continue, or null to drop the value and stop
the chain. An optional onStep hook fires after each step for tracing or
auditing. Error handling is uniform: with no handler a thrown step propagates;
with setOnError it is caught, routed to the handler, and run returns null.
class App {
public run() {
msg = new IMessage();
msg.setBody("{\"id\": 7}");
p = new Pipeline([ ::validate, ::enrich, ::transform ]);
p.setOnStep(::trace); // observe each step
p.setOnError(::handleError); // optional error handler
out = p.run(msg);
if (out == null) { c.println("dropped or failed"); }
}
// A step returning null drops the message and stops the chain.
public validate(M) { if (M.getBody() == null) { return null; } return M; }
public enrich(M) { M.setAttributes({ "checked": true }); return M; }
public transform(M){ M.setType("application/aussom"); return M; }
public trace(Idx, M) { c.println("step " + Idx + " done"); }
public handleError(Idx, M, Err) { c.println("step " + Idx + " failed: " + Err.getText()); }
}
Build the chain dynamically with add(step). A step is just a callback, so a
step can run a ParallelForeach or a nested Pipeline, letting the tools
compose without glue.
IdempotentReceiver
IdempotentReceiver is a duplicate filter. You hand it the id of an incoming
message and receive(Id) returns true the first time that id is seen and
false for every repeat, so you process a message once and drop redelivered
copies. The id is any string you choose - a message id, a business key, a hash
of the body - and the message itself is never passed in.
include integration_core.integration_core;
r = new IdempotentReceiver();
c.println(r.receive("evt-1")); // true (new, process it)
c.println(r.receive("evt-1")); // false (duplicate, drop it)
c.println(r.receive("evt-2")); // true (new)
receive is safe to call from many threads at once - create one receiver and
share it across worker threads. The check ("seen this id?") and the record
happen as one atomic step, guarded by a single-permit Semaphore from the
stdlib concurrent library, so a duplicate is caught exactly once no matter how
many threads race the same id.
By default the seen ids live in memory and are lost when the process exits. For
a store that survives a restart and can be shared across processes, point the
receiver at a connected Jdbc object with setJdbc, then call setup() once
to create the table:
include integration_core.integration_core;
include jdbc;
db = new Jdbc();
db.setConnectionInfo("org.sqlite.JDBC", "jdbc:sqlite:dedup.db", "", "");
db.connect();
r = new IdempotentReceiver();
r.setJdbc(db).setup(); // setTableName("my_ids") overrides the default table
c.println(r.receive("order-1")); // true (new)
c.println(r.receive("order-1")); // false (duplicate)
The JDBC backend stores each id in a table whose id column is the PRIMARY KEY,
so a single INSERT is the duplicate check; the same id from another process is
caught by the constraint. The Jdbc class works the same in the CLI and the
Server, and both ship with sqlite.
Id expiry and cleanup are out of scope: the set of seen ids grows until you trim it (for the JDBC backend, an operator-run delete; for the in-memory backend, dropping the receiver).
Enabling the bundled JAR under Aussom Server
ParallelForeach and ScatterGather are backed by a Java engine in the
module's bundled JAR (aussom-integration_core.jar). On first include, the
module loader calls app.loadJar("integration_core/aussom-integration_core.jar")
to put it on the classpath. Aussom Server blocks dynamic JAR loading by
default, so the server's security manager must allow it, or the app fails at
startup.
Add a securityManager block to the server section of config.yaml that
turns loading on and allowlists this module's JAR:
local:
server:
# ...
# Allow the integration-core module to load its bundled JAR.
securityManager:
- "aussom.app.loadjar": true
- "aussom.app.loadJarsAllowed": [
"aussom-integration_core.jar"
]
The Aussom CLI does not restrict app.loadJar, so no configuration is needed
there.
Building and testing
The aunit suite (integration_core-test.aus) runs through Tests.java under
the Aussom CLI test runner. The Maven build copies the .aus sources and the
bundled JAR into the integration_core/ package directory, which the test
include resolves against. That copy happens during the package phase, after
the test phase, so a fresh checkout must build the package directory once
before testing:
mvn package -DskipTests # builds integration_core/ (sources + aussom-integration_core.jar)
mvn test # runs the aunit suite
After editing any .aus source, re-run mvn package -DskipTests so the
package directory picks up the change, then mvn test.