integration_core 1.0.1

Download zip

integration-core

Core Enterprise Integration Pattern (EIP) tools for Aussom. This module provides a small set of message and concurrency components that you can use in both the Aussom CLI and Aussom Server.

Unlike the server-only connector modules (Kafka, RabbitMQ, ActiveMQ), the tools here are stateless and synchronous, so they run anywhere. Patterns that need to hold state over time or move messages across channels - Aggregator, Resequencer, Wire Tap, Idempotent Receiver - are left to server-specific integration-<name> modules. See design/integration-design.md for the full design.

Components

Class Purpose
IMessage Standard message value: attributes, content type, body, correlation id, deep copy
iutils Build an IMessage from an Aussom Server HttpReq; generate correlation ids
ParallelForeach Run a callback over each value in a list across a thread pool; gather results
ScatterGather Send one payload to many recipient callbacks in parallel; gather results
Pipeline Run a value through an ordered chain of step callbacks; short-circuit, hooks, error handling

Include everything with:

include integration_core.integration_core;

IMessage, iutils, and Pipeline are pure Aussom. ParallelForeach and ScatterGather are backed by a Java thread-pool engine that ships in the module's bundled JAR (loaded automatically on include).

IMessage

IMessage is the unit the other tools pass around. It holds an attribute map (headers, query params, and the like), a content type, a body, and a correlationId. A new message gets a generated correlation id, so every message can be traced. Setters return the message, so calls chain.

include integration_core.integration_core;

msg = new IMessage();
msg.setType("application/json")
   .setBody("{\"id\": 42}")
   .setAttributes({ "source": "orders" });

c.println(msg.getCorrelationId());   // generated unless you set one
c.println(msg.getType());            // application/json

// copy() is a deep, independent copy that keeps the same correlation id.
dup = msg.copy();

The body holds native Aussom values: a string for text, JSON, or XML; a Buffer for binary; or an Aussom object (use type application/aussom).

iutils

iutils is a static helper class.

iutils.newCorrelationId() returns a fresh unique id string.

iutils.message(Req) converts an Aussom Server HttpReq into an IMessage, using only the request's accessor methods. The headers, method, path, and query parameters go into the attributes. The correlation id comes from the X-Correlation-ID header when present, or is generated. The body and type depend on the content type:

// Inside an Aussom Server route handler that receives an HttpReq:
msg = iutils.message(req);
c.println(msg.getCorrelationId());
c.println(msg.getAttributes()["method"]);   // for example "POST"

The result is pure Aussom and holds no server objects, so it stays usable after the request, including in the CLI.

ParallelForeach

ParallelForeach runs a callback once per value in a list, in parallel, across a bounded thread pool, and blocks until every value is processed. The callback's return value becomes that value's result, and results come back in input order.

class App {
    public run() {
        pfe = new ParallelForeach([ "alpha", "beta", "gamma" ], ::process);
        pfe.setNumThreads(4);        // default: available processors
        pfe.setThreadTimeout(5000);  // per-worker ms; 0 (default) = no timeout
        results = pfe.run();

        // results = [ "ALPHA", "BETA", "GAMMA" ], aligned to input order.
        // A value that throws or times out leaves null in its slot.
        c.println(results[0]);
    }

    public process(Item) { return Item.toUpper(); }
}

Set setReturnValues(false) to skip collecting results (for side-effect-only work); run() then returns null.

ScatterGather

ScatterGather sends the same payload to several recipient callbacks at once, runs them in parallel, and gathers their results in recipient order.

class App {
    public run() {
        order = new IMessage();
        order.setBody("{\"sku\": \"A-1\", \"qty\": 3}");

        sg = new ScatterGather(order, [ ::checkInventory, ::checkCredit, ::checkFraud ]);
        results = sg.run();

        // results in recipient order: [ inventoryResult, creditResult, fraudResult ].
        // A recipient that throws or times out leaves null in its slot.
        c.println(results[0]);
    }

    public checkInventory(Msg) { return "in-stock"; }
    public checkCredit(Msg)    { return "approved"; }
    public checkFraud(Msg)     { return "clear"; }
}

ScatterGather shares the setNumThreads, setThreadTimeout, and setReturnValues settings with ParallelForeach. Recipients share the payload by reference; a recipient that needs to mutate it should copy() first.

Pipeline

Pipeline runs a value through an ordered list of step callbacks, each step's output feeding the next. It is the sequential sibling of ParallelForeach.

A step returns the next value to continue, or null to drop the value and stop the chain. An optional onStep hook fires after each step for tracing or auditing. Error handling is uniform: with no handler a thrown step propagates; with setOnError it is caught, routed to the handler, and run returns null.

class App {
    public run() {
        msg = new IMessage();
        msg.setBody("{\"id\": 7}");

        p = new Pipeline([ ::validate, ::enrich, ::transform ]);
        p.setOnStep(::trace);          // observe each step
        p.setOnError(::handleError);   // optional error handler

        out = p.run(msg);
        if (out == null) { c.println("dropped or failed"); }
    }

    // A step returning null drops the message and stops the chain.
    public validate(M) { if (M.getBody() == null) { return null; } return M; }
    public enrich(M)   { M.setAttributes({ "checked": true }); return M; }
    public transform(M){ M.setType("application/aussom"); return M; }

    public trace(Idx, M)       { c.println("step " + Idx + " done"); }
    public handleError(Idx, M, Err) { c.println("step " + Idx + " failed: " + Err.getText()); }
}

Build the chain dynamically with add(step). A step is just a callback, so a step can run a ParallelForeach or a nested Pipeline, letting the tools compose without glue.

Enabling the bundled JAR under Aussom Server

ParallelForeach and ScatterGather are backed by a Java engine in the module's bundled JAR (aussom-integration_core.jar). On first include, the module loader calls app.loadJar("integration_core/aussom-integration_core.jar") to put it on the classpath. Aussom Server blocks dynamic JAR loading by default, so the server's security manager must allow it, or the app fails at startup.

Add a securityManager block to the server section of config.yaml that turns loading on and allowlists this module's JAR:

local:
  server:
    # ...

    # Allow the integration-core module to load its bundled JAR.
    securityManager:
      - "aussom.app.loadjar": true
      - "aussom.app.loadJarsAllowed": [
        "aussom-integration_core.jar"
      ]

The Aussom CLI does not restrict app.loadJar, so no configuration is needed there.

Building and testing

The aunit suite (integration_core-test.aus) runs through Tests.java under the Aussom CLI test runner. The Maven build copies the .aus sources and the bundled JAR into the integration_core/ package directory, which the test include resolves against. That copy happens during the package phase, after the test phase, so a fresh checkout must build the package directory once before testing:

mvn package -DskipTests   # builds integration_core/ (sources + aussom-integration_core.jar)
mvn test                  # runs the aunit suite

After editing any .aus source, re-run mvn package -DskipTests so the package directory picks up the change, then mvn test.