integration-core
Core Enterprise Integration Pattern (EIP) tools for Aussom. This module provides a small set of message and concurrency components that you can use in both the Aussom CLI and Aussom Server.
Unlike the server-only connector modules (Kafka, RabbitMQ, ActiveMQ), the
tools here are stateless and synchronous, so they run anywhere. Patterns that
need to hold state over time or move messages across channels - Aggregator,
Resequencer, Wire Tap, Idempotent Receiver - are left to server-specific
integration-<name> modules. See design/integration-design.md for the full
design.
Components
| Class | Purpose |
|---|---|
IMessage |
Standard message value: attributes, content type, body, correlation id, deep copy |
iutils |
Build an IMessage from an Aussom Server HttpReq; generate correlation ids |
ParallelForeach |
Run a callback over each value in a list across a thread pool; gather results |
ScatterGather |
Send one payload to many recipient callbacks in parallel; gather results |
Pipeline |
Run a value through an ordered chain of step callbacks; short-circuit, hooks, error handling |
Include everything with:
include integration_core.integration_core;
IMessage, iutils, and Pipeline are pure Aussom. ParallelForeach and
ScatterGather are backed by a Java thread-pool engine that ships in the
module's bundled JAR (loaded automatically on include).
IMessage
IMessage is the unit the other tools pass around. It holds an attribute map
(headers, query params, and the like), a content type, a body, and a
correlationId. A new message gets a generated correlation id, so every
message can be traced. Setters return the message, so calls chain.
include integration_core.integration_core;
msg = new IMessage();
msg.setType("application/json")
.setBody("{\"id\": 42}")
.setAttributes({ "source": "orders" });
c.println(msg.getCorrelationId()); // generated unless you set one
c.println(msg.getType()); // application/json
// copy() is a deep, independent copy that keeps the same correlation id.
dup = msg.copy();
The body holds native Aussom values: a string for text, JSON, or XML; a
Buffer for binary; or an Aussom object (use type application/aussom).
iutils
iutils is a static helper class.
iutils.newCorrelationId() returns a fresh unique id string.
iutils.message(Req) converts an Aussom Server HttpReq into an IMessage,
using only the request's accessor methods. The headers, method, path, and
query parameters go into the attributes. The correlation id comes from the
X-Correlation-ID header when present, or is generated. The body and type
depend on the content type:
- Form requests (
multipart/form-dataorapplication/x-www-form-urlencoded) use the parsed form data as the body (a map keyed by field name, with any uploaded files asBuffervalues) and the typeapplication/aussom. - Everything else reads the body as a string, with the type taken from the
Content-Typeheader (defaulting totext/plain).
// Inside an Aussom Server route handler that receives an HttpReq:
msg = iutils.message(req);
c.println(msg.getCorrelationId());
c.println(msg.getAttributes()["method"]); // for example "POST"
The result is pure Aussom and holds no server objects, so it stays usable after the request, including in the CLI.
ParallelForeach
ParallelForeach runs a callback once per value in a list, in parallel,
across a bounded thread pool, and blocks until every value is processed. The
callback's return value becomes that value's result, and results come back in
input order.
class App {
public run() {
pfe = new ParallelForeach([ "alpha", "beta", "gamma" ], ::process);
pfe.setNumThreads(4); // default: available processors
pfe.setThreadTimeout(5000); // per-worker ms; 0 (default) = no timeout
results = pfe.run();
// results = [ "ALPHA", "BETA", "GAMMA" ], aligned to input order.
// A value that throws or times out leaves null in its slot.
c.println(results[0]);
}
public process(Item) { return Item.toUpper(); }
}
Set setReturnValues(false) to skip collecting results (for side-effect-only
work); run() then returns null.
ScatterGather
ScatterGather sends the same payload to several recipient callbacks at once,
runs them in parallel, and gathers their results in recipient order.
class App {
public run() {
order = new IMessage();
order.setBody("{\"sku\": \"A-1\", \"qty\": 3}");
sg = new ScatterGather(order, [ ::checkInventory, ::checkCredit, ::checkFraud ]);
results = sg.run();
// results in recipient order: [ inventoryResult, creditResult, fraudResult ].
// A recipient that throws or times out leaves null in its slot.
c.println(results[0]);
}
public checkInventory(Msg) { return "in-stock"; }
public checkCredit(Msg) { return "approved"; }
public checkFraud(Msg) { return "clear"; }
}
ScatterGather shares the setNumThreads, setThreadTimeout, and
setReturnValues settings with ParallelForeach. Recipients share the payload
by reference; a recipient that needs to mutate it should copy() first.
Pipeline
Pipeline runs a value through an ordered list of step callbacks, each step's
output feeding the next. It is the sequential sibling of ParallelForeach.
A step returns the next value to continue, or null to drop the value and stop
the chain. An optional onStep hook fires after each step for tracing or
auditing. Error handling is uniform: with no handler a thrown step propagates;
with setOnError it is caught, routed to the handler, and run returns null.
class App {
public run() {
msg = new IMessage();
msg.setBody("{\"id\": 7}");
p = new Pipeline([ ::validate, ::enrich, ::transform ]);
p.setOnStep(::trace); // observe each step
p.setOnError(::handleError); // optional error handler
out = p.run(msg);
if (out == null) { c.println("dropped or failed"); }
}
// A step returning null drops the message and stops the chain.
public validate(M) { if (M.getBody() == null) { return null; } return M; }
public enrich(M) { M.setAttributes({ "checked": true }); return M; }
public transform(M){ M.setType("application/aussom"); return M; }
public trace(Idx, M) { c.println("step " + Idx + " done"); }
public handleError(Idx, M, Err) { c.println("step " + Idx + " failed: " + Err.getText()); }
}
Build the chain dynamically with add(step). A step is just a callback, so a
step can run a ParallelForeach or a nested Pipeline, letting the tools
compose without glue.
Enabling the bundled JAR under Aussom Server
ParallelForeach and ScatterGather are backed by a Java engine in the
module's bundled JAR (aussom-integration_core.jar). On first include, the
module loader calls app.loadJar("integration_core/aussom-integration_core.jar")
to put it on the classpath. Aussom Server blocks dynamic JAR loading by
default, so the server's security manager must allow it, or the app fails at
startup.
Add a securityManager block to the server section of config.yaml that
turns loading on and allowlists this module's JAR:
local:
server:
# ...
# Allow the integration-core module to load its bundled JAR.
securityManager:
- "aussom.app.loadjar": true
- "aussom.app.loadJarsAllowed": [
"aussom-integration_core.jar"
]
The Aussom CLI does not restrict app.loadJar, so no configuration is needed
there.
Building and testing
The aunit suite (integration_core-test.aus) runs through Tests.java under
the Aussom CLI test runner. The Maven build copies the .aus sources and the
bundled JAR into the integration_core/ package directory, which the test
include resolves against. That copy happens during the package phase, after
the test phase, so a fresh checkout must build the package directory once
before testing:
mvn package -DskipTests # builds integration_core/ (sources + aussom-integration_core.jar)
mvn test # runs the aunit suite
After editing any .aus source, re-run mvn package -DskipTests so the
package directory picks up the change, then mvn test.