NeMo Curator provides experimental executors for alternative execution backends. These are located in nemo_curator.backends.experimental.
RayActorPoolExecutor
Uses Ray Actor Pool for distributed execution with built-in progress tracking.
Import
from nemo_curator.backends.experimental import RayActorPoolExecutor
Usage
executor = RayActorPoolExecutor(
config={
"pool_size": 8,
},
ignore_head_node=True,
show_progress=True,
progress_interval=10.0,
)
results = pipeline.run(executor=executor)
Configuration
| Option | Type | Default | Description |
|---|---|---|---|
config | dict | None | None | Executor-specific configuration dictionary |
ignore_head_node | bool | False | Exclude head node from execution |
show_progress | bool | True | Display tqdm progress bars during stage execution and shuffle inserts |
progress_interval | float | 10.0 | Minimum interval in seconds between progress bar updates |
BaseExecutor Interface
All executors inherit from BaseExecutor:
from abc import ABC, abstractmethod
from typing import Any
class BaseExecutor(ABC):
"""Base class for all executors."""
def __init__(
self,
config: dict[str, Any] | None = None,
ignore_head_node: bool = False,
) -> None:
"""Initialize executor.
Args:
config: Executor-specific configuration.
ignore_head_node: Exclude head node from execution.
"""
self.config = config or {}
self.ignore_head_node = ignore_head_node
@abstractmethod
def execute(
self,
stages: list[ProcessingStage],
initial_tasks: list[Task] | None = None,
) -> list[Task]:
"""Execute pipeline stages.
Args:
stages: Processing stages to execute.
initial_tasks: Initial tasks (defaults to EmptyTask).
Returns:
Output tasks from final stage.
"""
Creating Custom Executors
from nemo_curator.backends.base import BaseExecutor
from nemo_curator.stages.base import ProcessingStage
from nemo_curator.tasks import Task
class MyCustomExecutor(BaseExecutor):
"""Custom executor implementation."""
def execute(
self,
stages: list[ProcessingStage],
initial_tasks: list[Task] | None = None,
) -> list[Task]:
tasks = initial_tasks or [EmptyTask()]
for stage in stages:
stage.setup({})
new_tasks = []
for task in tasks:
result = stage.process(task)
if result is not None:
if isinstance(result, list):
new_tasks.extend(result)
else:
new_tasks.append(result)
stage.teardown()
tasks = new_tasks
return tasks
Choosing an Executor
| Executor | Best For | Considerations |
|---|---|---|
XennaExecutor | Production workloads | Default choice, most stable |
RayDataExecutor | Ray-native environments | Promoted from experimental in 26.04 |
RayActorPoolExecutor | Fine-grained actor control | Experimental |