NVIDIANeMo Curator
Menu

NeMo Curator provides experimental executors for alternative execution backends. These are located in nemo_curator.backends.experimental.

RayActorPoolExecutor

Uses Ray Actor Pool for distributed execution with built-in progress tracking.

Import

from nemo_curator.backends.experimental import RayActorPoolExecutor

Usage

executor = RayActorPoolExecutor(
    config={
        "pool_size": 8,
    },
    ignore_head_node=True,
    show_progress=True,
    progress_interval=10.0,
)

results = pipeline.run(executor=executor)

Configuration

OptionTypeDefaultDescription
configdict | NoneNoneExecutor-specific configuration dictionary
ignore_head_nodeboolFalseExclude head node from execution
show_progressboolTrueDisplay tqdm progress bars during stage execution and shuffle inserts
progress_intervalfloat10.0Minimum interval in seconds between progress bar updates

BaseExecutor Interface

All executors inherit from BaseExecutor:

from abc import ABC, abstractmethod
from typing import Any

class BaseExecutor(ABC):
    """Base class for all executors."""

    def __init__(
        self,
        config: dict[str, Any] | None = None,
        ignore_head_node: bool = False,
    ) -> None:
        """Initialize executor.

        Args:
            config: Executor-specific configuration.
            ignore_head_node: Exclude head node from execution.
        """
        self.config = config or {}
        self.ignore_head_node = ignore_head_node

    @abstractmethod
    def execute(
        self,
        stages: list[ProcessingStage],
        initial_tasks: list[Task] | None = None,
    ) -> list[Task]:
        """Execute pipeline stages.

        Args:
            stages: Processing stages to execute.
            initial_tasks: Initial tasks (defaults to EmptyTask).

        Returns:
            Output tasks from final stage.
        """

Creating Custom Executors

from nemo_curator.backends.base import BaseExecutor
from nemo_curator.stages.base import ProcessingStage
from nemo_curator.tasks import Task

class MyCustomExecutor(BaseExecutor):
    """Custom executor implementation."""

    def execute(
        self,
        stages: list[ProcessingStage],
        initial_tasks: list[Task] | None = None,
    ) -> list[Task]:
        tasks = initial_tasks or [EmptyTask()]

        for stage in stages:
            stage.setup({})
            new_tasks = []
            for task in tasks:
                result = stage.process(task)
                if result is not None:
                    if isinstance(result, list):
                        new_tasks.extend(result)
                    else:
                        new_tasks.append(result)
            stage.teardown()
            tasks = new_tasks

        return tasks

Choosing an Executor

ExecutorBest ForConsiderations
XennaExecutorProduction workloadsDefault choice, most stable
RayDataExecutorRay-native environmentsPromoted from experimental in 26.04
RayActorPoolExecutorFine-grained actor controlExperimental

Source Code

View source on GitHub