Pipeline Mechanism

A pipeline is a directed acyclic graph (DAG) of processing nodes that Cortex executes on each media asset. Pipelines are defined in Loom (stored in the database) and loaded by Cortex at startup.

Pipeline Structure

A pipeline has the following properties:

Field Type Description

name

String

Unique identifier for the pipeline (lowercase, alphanumeric, hyphens)

description

String

Human-readable description

priority

int

When multiple pipelines match an asset, higher priority wins

enabled

boolean

Whether this pipeline is active

dryRun

boolean

When true, nodes log their actions but do not persist results

sourceNode

PipelineNode

The single entry-point node that yields media

nodes

List<PipelineNode>

All nodes, in topological order

Node Graph

Each node in the graph has:

  • A unique id (e.g. sha256, thumbnail)

  • Zero or more upstream connections it reads outputs from

  • Zero or more downstream connections it writes to

  • An optional filter that controls which assets reach this node (e.g. video-only, image-only)

Connections

Nodes are wired via connectTo():

sourceNode.connectTo(sha256Node);
sha256Node.connectTo(thumbnailNode, FilterBranch.PASS);
sha256Node.connectTo(dedupNode,     FilterBranch.PASS);

Connections carry a branch label (PASS / FAIL) so a filter node can route assets differently depending on the filter result.

Topological Ordering

DefaultPipeline discovers all nodes reachable from the source node via a BFS walk and sorts them in topological order. This ensures every node’s dependencies are processed before it runs.

Node Lifecycle

For each media asset, every node in the pipeline goes through this sequence:

1. isEnabled()   → skip if node is turned off in options
2. existsResult() → skip if result is already cached (meta-path or Loom)
3. isProcessable() → skip if media type doesn't match (e.g. video-only node on an image)
4. compute()     → run the actual analysis

If any step returns false, the node is skipped and the pipeline moves to the next node.

Node Outputs

Nodes publish typed key-value outputs that downstream nodes can consume:

// Publishing an output
ctx.output(OUTPUT_FILE_SIZE, fileSize);
ctx.output(OUTPUT_SHA256, hashValue);

// Reading an upstream output (from a node named "sha256")
String hash = ctx.upstreamOutput("sha256", "sha256");

Output keys are defined as NodeOutputKey<T> constants on each node class.

Result Origins

Each node result carries an origin that explains how the result was obtained:

Origin Meaning

COMPUTED

The node performed the analysis now

REMOTE

The result was already stored in Loom and loaded from there

CACHED

The result was read from the local meta-path cache

Pipeline Executor

ReactivePipelineExecutor drives pipeline execution using RxJava. It:

  1. Streams media assets from the source node.

  2. For each asset, runs the topologically ordered nodes in sequence.

  3. Collects NodeResult objects and emits them as PipelineResult events.

  4. Publishes events to the PipelineEventBus, which forwards them to Loom over WebSocket.

Defining a Pipeline in Code

// Build nodes
LoomSourceNode source    = new LoomSourceNode(loomClient);
SHA256Node    sha256    = new SHA256Node(loomClient, options, hashOptions);
ThumbnailNode thumbnail = new ThumbnailNode(loomClient, options, thumbOptions);
LoomNode      loom      = new LoomNode(loomClient, options, loomNodeOptions);

// Wire the graph
source.connectTo(sha256);
sha256.connectTo(thumbnail);
thumbnail.connectTo(loom);

// Build the pipeline
Pipeline pipeline = DefaultPipeline.builder("ingest")
    .description("Default ingest pipeline")
    .priority(10)
    .source(source)
    .build();

Dry-Run Mode

When dryRun = true, nodes execute their compute() logic and log what they would do, but they do not write to the local cache, xattr, or Loom. This is useful for testing pipeline definitions against real media without side effects.

# Dry-run mode is set via the pipeline definition in Loom (dryRun: true)
# or by passing a flag during process run (TBD in CLI)

Custom Nodes

Any class extending AbstractMediaNode<YourOptions> can be added to a pipeline. See Cortex Examples for a complete working example and Dagger wiring.

Pipeline Events (WebSocket)

As the executor processes assets, it emits structured events to the Loom WebSocket endpoint:

ws://<loom-host>:8092/api/v1/pipelines/events/ws

Each event identifies the pipeline, node, asset UUID, status (STARTED, COMPLETED, SKIPPED, FAILED), and duration. The Loom UI subscribes to this stream to display live processing progress.