NVIDIANeMo Curator
Menu

The Pipeline class is the main orchestrator for executing sequences of processing stages in NeMo Curator.

Import

from nemo_curator.pipeline import Pipeline

Class Definition

class Pipeline:
    """User-facing pipeline definition for composing processing stages."""

    def __init__(
        self,
        name: str,
        description: str | None = None,
        stages: list[ProcessingStage] | None = None,
        config: dict[str, Any] | None = None,
    ) -> None:
        """Initialize a pipeline.

        Args:
            name: Name identifier for the pipeline.
            description: Optional description of the pipeline's purpose.
            stages: Optional list of processing stages to include.
            config: Optional pipeline configuration valid across all executors.
        """

Methods

add_stage()

Add a stage to the pipeline.

def add_stage(self, stage: ProcessingStage) -> "Pipeline":
    """Add a processing stage to the pipeline.

    Args:
        stage: The ProcessingStage to add.

    Returns:
        Self for method chaining.
    """

build()

Build an execution plan from the pipeline. Decomposes composite stages into execution stages and updates the pipeline in place.

def build(self) -> None:
    """Build the execution plan by decomposing composite stages.

    Raises:
        ValueError: If the pipeline has no stages.
    """

run()

Execute the pipeline. Calls build() before execution.

def run(
    self,
    executor: BaseExecutor | None = None,
    initial_tasks: list[Task] | None = None,
) -> list[Task] | None:
    """Execute the pipeline.

    Args:
        executor: Executor to use. Defaults to XennaExecutor.
        initial_tasks: Initial tasks to start the pipeline.

    Returns:
        List of output tasks from the final stage, or None.
    """

describe()

Get a detailed description of the pipeline.

def describe(self) -> str:
    """Get detailed description of pipeline stages and requirements.

    Returns:
        Human-readable description of the pipeline.
    """

Usage Examples

Basic Pipeline

from nemo_curator.pipeline import Pipeline
from nemo_curator.stages.text.io import ParquetReader, ParquetWriter
from nemo_curator.stages.text.filters import LengthFilter

# Create pipeline with stages
pipeline = Pipeline(
    name="text_curation",
    description="Basic text curation pipeline",
    stages=[
        ParquetReader(input_path="/data/input"),
        LengthFilter(min_length=100, max_length=10000),
        ParquetWriter(output_path="/data/output"),
    ],
)

# Run the pipeline
results = pipeline.run()

Method Chaining

pipeline = Pipeline(name="my_pipeline")
pipeline.add_stage(stage1).add_stage(stage2).add_stage(stage3)
results = pipeline.run()

Custom Executor

from nemo_curator.backends.xenna import XennaExecutor

executor = XennaExecutor(config={"execution_mode": "streaming"})
results = pipeline.run(executor=executor)

Pipeline Configuration

pipeline = Pipeline(
    name="my_pipeline",
    config={"custom_key": "value"},
)

Source Code