NVIDIANeMo Curator
Menu

XennaExecutor is the production executor that uses Cosmos-Xenna for distributed execution. It’s the default executor used when running pipelines.

Import

from nemo_curator.backends.xenna import XennaExecutor

Class Definition

class XennaExecutor(BaseExecutor):
    """Production executor using Cosmos-Xenna for distributed execution.

    Provides:
    - Distributed task orchestration
    - Resource allocation and management
    - Batch processing optimization
    - Performance metrics collection
    """

    def __init__(
        self,
        config: dict[str, Any] | None = None,
        ignore_head_node: bool = False,
    ) -> None:
        """Initialize the executor.

        Args:
            config: Executor configuration dictionary.
            ignore_head_node: Not supported. Raises ValueError if True.
        """

Configuration Options

OptionTypeDefaultDescription
logging_intervalint60Seconds between progress logs
ignore_failuresboolFalseContinue on task failures
max_workers_per_stageint | NoneNoneMax workers per stage
execution_modestr"streaming""streaming" or "batch"
cpu_allocation_percentagefloat0.95CPU allocation fraction
autoscale_interval_sint180Autoscaling check interval

Usage Examples

Default Configuration

from nemo_curator.pipeline import Pipeline
from nemo_curator.backends.xenna import XennaExecutor

pipeline = Pipeline(name="my_pipeline", stages=[...])

# Default executor
results = pipeline.run()

# Equivalent to:
executor = XennaExecutor()
results = pipeline.run(executor=executor)

Custom Configuration

executor = XennaExecutor(config={
    "logging_interval": 30,
    "ignore_failures": True,
    "execution_mode": "batch",
    "cpu_allocation_percentage": 0.9,
})

results = pipeline.run(executor=executor)

Streaming vs Batch Mode

Streaming Mode

Processes tasks as they become available:

executor = XennaExecutor(config={
    "execution_mode": "streaming",
})

Best for:

  • Large datasets
  • Memory-constrained environments
  • Real-time processing

Batch Mode

Waits for all tasks before processing:

executor = XennaExecutor(config={
    "execution_mode": "batch",
})

Best for:

  • Small to medium datasets
  • Operations requiring global ordering
  • When task dependencies span batches

Methods

execute()

Execute the pipeline stages.

def execute(
    self,
    stages: list[ProcessingStage],
    initial_tasks: list[Task] | None = None,
) -> list[Task]:
    """Execute the pipeline stages.

    Args:
        stages: List of processing stages to execute.
        initial_tasks: Initial tasks (defaults to EmptyTask).

    Returns:
        List of output tasks from the final stage.
    """

Error Handling

executor = XennaExecutor(config={
    "ignore_failures": True,  # Continue despite errors
})

try:
    results = pipeline.run(executor=executor)
except Exception as e:
    # Handle pipeline-level failures
    print(f"Pipeline failed: {e}")

Performance Monitoring

The executor automatically collects performance metrics:

results = pipeline.run(executor=executor)

# Each task contains performance data
for task in results:
    for perf in task._stage_perf:
        print(f"Stage: {perf.stage_name}")
        print(f"  Duration: {perf.process_time}s")
        print(f"  Items processed: {perf.num_items_processed}")

Source Code

View source on GitHub