integration_core 1.0.2

Download zip

integration-core

Core Enterprise Integration Pattern (EIP) tools for Aussom. This module provides a small set of message and concurrency components that you can use in both the Aussom CLI and Aussom Server.

Unlike the server-only connector modules (Kafka, RabbitMQ, ActiveMQ), the tools here are synchronous and run anywhere - in the CLI or the Server. Patterns that move messages across channels or need server infrastructure - Aggregator, Resequencer, Wire Tap - are left to server-specific integration-<name> modules. See design/integration-design.md for the full design.

Components

Class Purpose
IMessage Standard message value: attributes, content type, body, correlation id, deep copy
iutils Build an IMessage from an Aussom Server HttpReq; generate correlation ids
ParallelForeach Run a callback over each value in a list across a thread pool; gather results
ScatterGather Send one payload to many recipient callbacks in parallel; gather results
Pipeline Run a value through an ordered chain of step callbacks; short-circuit, hooks, error handling
IdempotentReceiver Thread-safe duplicate filter: track seen message ids, in memory or in a JDBC table, so each id is handled once

Include everything with:

include integration_core.integration_core;

IMessage, iutils, Pipeline, and IdempotentReceiver are pure Aussom (IdempotentReceiver uses the stdlib concurrent library for thread safety). ParallelForeach and ScatterGather are backed by a Java thread-pool engine that ships in the module's bundled JAR (loaded automatically on include).

IMessage

IMessage is the unit the other tools pass around. It holds an attribute map (headers, query params, and the like), a content type, a body, and a correlationId. A new message gets a generated correlation id, so every message can be traced. Setters return the message, so calls chain.

include integration_core.integration_core;

msg = new IMessage();
msg.setType("application/json")
   .setBody("{\"id\": 42}")
   .setAttributes({ "source": "orders" });

c.println(msg.getCorrelationId());   // generated unless you set one
c.println(msg.getType());            // application/json

// copy() is a deep, independent copy that keeps the same correlation id.
dup = msg.copy();

The body holds native Aussom values: a string for text, JSON, or XML; a Buffer for binary; or an Aussom object (use type application/aussom).

iutils

iutils is a static helper class.

iutils.newCorrelationId() returns a fresh unique id string.

iutils.message(Req) converts an Aussom Server HttpReq into an IMessage, using only the request's accessor methods. The headers, method, path, and query parameters go into the attributes. The correlation id comes from the X-Correlation-ID header when present, or is generated. The body and type depend on the content type:

// Inside an Aussom Server route handler that receives an HttpReq:
msg = iutils.message(req);
c.println(msg.getCorrelationId());
c.println(msg.getAttributes()["method"]);   // for example "POST"

The result is pure Aussom and holds no server objects, so it stays usable after the request, including in the CLI.

ParallelForeach

ParallelForeach runs a callback once per value in a list, in parallel, across a bounded thread pool, and blocks until every value is processed. The callback's return value becomes that value's result, and results come back in input order.

class App {
    public run() {
        pfe = new ParallelForeach([ "alpha", "beta", "gamma" ], ::process);
        pfe.setNumThreads(4);        // default: available processors
        pfe.setThreadTimeout(5000);  // per-worker ms; 0 (default) = no timeout
        results = pfe.run();

        // results = [ "ALPHA", "BETA", "GAMMA" ], aligned to input order.
        // A value that throws or times out leaves null in its slot.
        c.println(results[0]);
    }

    public process(Item) { return Item.toUpper(); }
}

Set setReturnValues(false) to skip collecting results (for side-effect-only work); run() then returns null.

ScatterGather

ScatterGather sends the same payload to several recipient callbacks at once, runs them in parallel, and gathers their results in recipient order.

class App {
    public run() {
        order = new IMessage();
        order.setBody("{\"sku\": \"A-1\", \"qty\": 3}");

        sg = new ScatterGather(order, [ ::checkInventory, ::checkCredit, ::checkFraud ]);
        results = sg.run();

        // results in recipient order: [ inventoryResult, creditResult, fraudResult ].
        // A recipient that throws or times out leaves null in its slot.
        c.println(results[0]);
    }

    public checkInventory(Msg) { return "in-stock"; }
    public checkCredit(Msg)    { return "approved"; }
    public checkFraud(Msg)     { return "clear"; }
}

ScatterGather shares the setNumThreads, setThreadTimeout, and setReturnValues settings with ParallelForeach. Recipients share the payload by reference; a recipient that needs to mutate it should copy() first.

Pipeline

Pipeline runs a value through an ordered list of step callbacks, each step's output feeding the next. It is the sequential sibling of ParallelForeach.

A step returns the next value to continue, or null to drop the value and stop the chain. An optional onStep hook fires after each step for tracing or auditing. Error handling is uniform: with no handler a thrown step propagates; with setOnError it is caught, routed to the handler, and run returns null.

class App {
    public run() {
        msg = new IMessage();
        msg.setBody("{\"id\": 7}");

        p = new Pipeline([ ::validate, ::enrich, ::transform ]);
        p.setOnStep(::trace);          // observe each step
        p.setOnError(::handleError);   // optional error handler

        out = p.run(msg);
        if (out == null) { c.println("dropped or failed"); }
    }

    // A step returning null drops the message and stops the chain.
    public validate(M) { if (M.getBody() == null) { return null; } return M; }
    public enrich(M)   { M.setAttributes({ "checked": true }); return M; }
    public transform(M){ M.setType("application/aussom"); return M; }

    public trace(Idx, M)       { c.println("step " + Idx + " done"); }
    public handleError(Idx, M, Err) { c.println("step " + Idx + " failed: " + Err.getText()); }
}

Build the chain dynamically with add(step). A step is just a callback, so a step can run a ParallelForeach or a nested Pipeline, letting the tools compose without glue.

IdempotentReceiver

IdempotentReceiver is a duplicate filter. You hand it the id of an incoming message and receive(Id) returns true the first time that id is seen and false for every repeat, so you process a message once and drop redelivered copies. The id is any string you choose - a message id, a business key, a hash of the body - and the message itself is never passed in.

include integration_core.integration_core;

r = new IdempotentReceiver();
c.println(r.receive("evt-1"));   // true  (new, process it)
c.println(r.receive("evt-1"));   // false (duplicate, drop it)
c.println(r.receive("evt-2"));   // true  (new)

receive is safe to call from many threads at once - create one receiver and share it across worker threads. The check ("seen this id?") and the record happen as one atomic step, guarded by a single-permit Semaphore from the stdlib concurrent library, so a duplicate is caught exactly once no matter how many threads race the same id.

By default the seen ids live in memory and are lost when the process exits. For a store that survives a restart and can be shared across processes, point the receiver at a connected Jdbc object with setJdbc, then call setup() once to create the table:

include integration_core.integration_core;
include jdbc;

db = new Jdbc();
db.setConnectionInfo("org.sqlite.JDBC", "jdbc:sqlite:dedup.db", "", "");
db.connect();

r = new IdempotentReceiver();
r.setJdbc(db).setup();             // setTableName("my_ids") overrides the default table

c.println(r.receive("order-1"));   // true  (new)
c.println(r.receive("order-1"));   // false (duplicate)

The JDBC backend stores each id in a table whose id column is the PRIMARY KEY, so a single INSERT is the duplicate check; the same id from another process is caught by the constraint. The Jdbc class works the same in the CLI and the Server, and both ship with sqlite.

Id expiry and cleanup are out of scope: the set of seen ids grows until you trim it (for the JDBC backend, an operator-run delete; for the in-memory backend, dropping the receiver).

Enabling the bundled JAR under Aussom Server

ParallelForeach and ScatterGather are backed by a Java engine in the module's bundled JAR (aussom-integration_core.jar). On first include, the module loader calls app.loadJar("integration_core/aussom-integration_core.jar") to put it on the classpath. Aussom Server blocks dynamic JAR loading by default, so the server's security manager must allow it, or the app fails at startup.

Add a securityManager block to the server section of config.yaml that turns loading on and allowlists this module's JAR:

local:
  server:
    # ...

    # Allow the integration-core module to load its bundled JAR.
    securityManager:
      - "aussom.app.loadjar": true
      - "aussom.app.loadJarsAllowed": [
        "aussom-integration_core.jar"
      ]

The Aussom CLI does not restrict app.loadJar, so no configuration is needed there.

Building and testing

The aunit suite (integration_core-test.aus) runs through Tests.java under the Aussom CLI test runner. The Maven build copies the .aus sources and the bundled JAR into the integration_core/ package directory, which the test include resolves against. That copy happens during the package phase, after the test phase, so a fresh checkout must build the package directory once before testing:

mvn package -DskipTests   # builds integration_core/ (sources + aussom-integration_core.jar)
mvn test                  # runs the aunit suite

After editing any .aus source, re-run mvn package -DskipTests so the package directory picks up the change, then mvn test.