A pipeline is a directed acyclic graph (DAG) of processing nodes that Cortex executes on each media asset. Pipelines are defined in Loom (stored in the database) and loaded by Cortex at startup.
Pipeline Structure
A pipeline has the following properties:
| Field | Type | Description |
|---|---|---|
|
String |
Unique identifier for the pipeline (lowercase, alphanumeric, hyphens) |
|
String |
Human-readable description |
|
int |
When multiple pipelines match an asset, higher priority wins |
|
boolean |
Whether this pipeline is active |
|
boolean |
When |
|
PipelineNode |
The single entry-point node that yields media |
|
List<PipelineNode> |
All nodes, in topological order |
Node Graph
Each node in the graph has:
-
A unique
id(e.g.sha256,thumbnail) -
Zero or more upstream connections it reads outputs from
-
Zero or more downstream connections it writes to
-
An optional filter that controls which assets reach this node (e.g. video-only, image-only)
Connections
Nodes are wired via connectTo():
sourceNode.connectTo(sha256Node);
sha256Node.connectTo(thumbnailNode, FilterBranch.PASS);
sha256Node.connectTo(dedupNode, FilterBranch.PASS);
Connections carry a branch label (PASS / FAIL) so a filter node can route assets differently depending on the filter result.
Topological Ordering
DefaultPipeline discovers all nodes reachable from the source node via a BFS walk and sorts them in topological order.
This ensures every node’s dependencies are processed before it runs.
Node Lifecycle
For each media asset, every node in the pipeline goes through this sequence:
1. isEnabled() → skip if node is turned off in options
2. existsResult() → skip if result is already cached (meta-path or Loom)
3. isProcessable() → skip if media type doesn't match (e.g. video-only node on an image)
4. compute() → run the actual analysis
If any step returns false, the node is skipped and the pipeline moves to the next node.
Node Outputs
Nodes publish typed key-value outputs that downstream nodes can consume:
// Publishing an output
ctx.output(OUTPUT_FILE_SIZE, fileSize);
ctx.output(OUTPUT_SHA256, hashValue);
// Reading an upstream output (from a node named "sha256")
String hash = ctx.upstreamOutput("sha256", "sha256");
Output keys are defined as NodeOutputKey<T> constants on each node class.
Result Origins
Each node result carries an origin that explains how the result was obtained:
| Origin | Meaning |
|---|---|
|
The node performed the analysis now |
|
The result was already stored in Loom and loaded from there |
|
The result was read from the local meta-path cache |
Pipeline Executor
ReactivePipelineExecutor drives pipeline execution using RxJava.
It:
-
Streams media assets from the source node.
-
For each asset, runs the topologically ordered nodes in sequence.
-
Collects
NodeResultobjects and emits them asPipelineResultevents. -
Publishes events to the
PipelineEventBus, which forwards them to Loom over WebSocket.
Defining a Pipeline in Code
// Build nodes
LoomSourceNode source = new LoomSourceNode(loomClient);
SHA256Node sha256 = new SHA256Node(loomClient, options, hashOptions);
ThumbnailNode thumbnail = new ThumbnailNode(loomClient, options, thumbOptions);
LoomNode loom = new LoomNode(loomClient, options, loomNodeOptions);
// Wire the graph
source.connectTo(sha256);
sha256.connectTo(thumbnail);
thumbnail.connectTo(loom);
// Build the pipeline
Pipeline pipeline = DefaultPipeline.builder("ingest")
.description("Default ingest pipeline")
.priority(10)
.source(source)
.build();
Dry-Run Mode
When dryRun = true, nodes execute their compute() logic and log what they would do,
but they do not write to the local cache, xattr, or Loom.
This is useful for testing pipeline definitions against real media without side effects.
# Dry-run mode is set via the pipeline definition in Loom (dryRun: true)
# or by passing a flag during process run (TBD in CLI)
Custom Nodes
Any class extending AbstractMediaNode<YourOptions> can be added to a pipeline.
See Cortex Examples for a complete working example and Dagger wiring.
Pipeline Events (WebSocket)
As the executor processes assets, it emits structured events to the Loom WebSocket endpoint:
ws://<loom-host>:8092/api/v1/pipelines/events/ws
Each event identifies the pipeline, node, asset UUID, status (STARTED, COMPLETED, SKIPPED, FAILED), and duration.
The Loom UI subscribes to this stream to display live processing progress.