The Pipeline class is the main orchestrator for executing sequences of processing stages in NeMo Curator.
Import
from nemo_curator.pipeline import Pipeline
Class Definition
class Pipeline:
"""User-facing pipeline definition for composing processing stages."""
def __init__(
self,
name: str,
description: str | None = None,
stages: list[ProcessingStage] | None = None,
config: dict[str, Any] | None = None,
) -> None:
"""Initialize a pipeline.
Args:
name: Name identifier for the pipeline.
description: Optional description of the pipeline's purpose.
stages: Optional list of processing stages to include.
config: Optional pipeline configuration valid across all executors.
"""
Methods
add_stage()
Add a stage to the pipeline.
def add_stage(self, stage: ProcessingStage) -> "Pipeline":
"""Add a processing stage to the pipeline.
Args:
stage: The ProcessingStage to add.
Returns:
Self for method chaining.
"""
build()
Build an execution plan from the pipeline. Decomposes composite stages into execution stages and updates the pipeline in place.
def build(self) -> None:
"""Build the execution plan by decomposing composite stages.
Raises:
ValueError: If the pipeline has no stages.
"""
run()
Execute the pipeline. Calls build() before execution.
def run(
self,
executor: BaseExecutor | None = None,
initial_tasks: list[Task] | None = None,
) -> list[Task] | None:
"""Execute the pipeline.
Args:
executor: Executor to use. Defaults to XennaExecutor.
initial_tasks: Initial tasks to start the pipeline.
Returns:
List of output tasks from the final stage, or None.
"""
describe()
Get a detailed description of the pipeline.
def describe(self) -> str:
"""Get detailed description of pipeline stages and requirements.
Returns:
Human-readable description of the pipeline.
"""
Usage Examples
Basic Pipeline
from nemo_curator.pipeline import Pipeline
from nemo_curator.stages.text.io import ParquetReader, ParquetWriter
from nemo_curator.stages.text.filters import LengthFilter
# Create pipeline with stages
pipeline = Pipeline(
name="text_curation",
description="Basic text curation pipeline",
stages=[
ParquetReader(input_path="/data/input"),
LengthFilter(min_length=100, max_length=10000),
ParquetWriter(output_path="/data/output"),
],
)
# Run the pipeline
results = pipeline.run()
Method Chaining
pipeline = Pipeline(name="my_pipeline")
pipeline.add_stage(stage1).add_stage(stage2).add_stage(stage3)
results = pipeline.run()
Custom Executor
from nemo_curator.backends.xenna import XennaExecutor
executor = XennaExecutor(config={"execution_mode": "streaming"})
results = pipeline.run(executor=executor)
Pipeline Configuration
pipeline = Pipeline(
name="my_pipeline",
config={"custom_key": "value"},
)