XennaExecutor is the production executor that uses Cosmos-Xenna for distributed execution. It’s the default executor used when running pipelines.
Import
from nemo_curator.backends.xenna import XennaExecutor
Class Definition
class XennaExecutor(BaseExecutor):
"""Production executor using Cosmos-Xenna for distributed execution.
Provides:
- Distributed task orchestration
- Resource allocation and management
- Batch processing optimization
- Performance metrics collection
"""
def __init__(
self,
config: dict[str, Any] | None = None,
ignore_head_node: bool = False,
) -> None:
"""Initialize the executor.
Args:
config: Executor configuration dictionary.
ignore_head_node: Not supported. Raises ValueError if True.
"""
Configuration Options
| Option | Type | Default | Description |
|---|---|---|---|
logging_interval | int | 60 | Seconds between progress logs |
ignore_failures | bool | False | Continue on task failures |
max_workers_per_stage | int | None | None | Max workers per stage |
execution_mode | str | "streaming" | "streaming" or "batch" |
cpu_allocation_percentage | float | 0.95 | CPU allocation fraction |
autoscale_interval_s | int | 180 | Autoscaling check interval |
Usage Examples
Default Configuration
from nemo_curator.pipeline import Pipeline
from nemo_curator.backends.xenna import XennaExecutor
pipeline = Pipeline(name="my_pipeline", stages=[...])
# Default executor
results = pipeline.run()
# Equivalent to:
executor = XennaExecutor()
results = pipeline.run(executor=executor)
Custom Configuration
executor = XennaExecutor(config={
"logging_interval": 30,
"ignore_failures": True,
"execution_mode": "batch",
"cpu_allocation_percentage": 0.9,
})
results = pipeline.run(executor=executor)
Streaming vs Batch Mode
Streaming Mode
Processes tasks as they become available:
executor = XennaExecutor(config={
"execution_mode": "streaming",
})Best for:
- Large datasets
- Memory-constrained environments
- Real-time processing
Batch Mode
Waits for all tasks before processing:
executor = XennaExecutor(config={
"execution_mode": "batch",
})Best for:
- Small to medium datasets
- Operations requiring global ordering
- When task dependencies span batches
Methods
execute()
Execute the pipeline stages.
def execute(
self,
stages: list[ProcessingStage],
initial_tasks: list[Task] | None = None,
) -> list[Task]:
"""Execute the pipeline stages.
Args:
stages: List of processing stages to execute.
initial_tasks: Initial tasks (defaults to EmptyTask).
Returns:
List of output tasks from the final stage.
"""
Error Handling
executor = XennaExecutor(config={
"ignore_failures": True, # Continue despite errors
})
try:
results = pipeline.run(executor=executor)
except Exception as e:
# Handle pipeline-level failures
print(f"Pipeline failed: {e}")
Performance Monitoring
The executor automatically collects performance metrics:
results = pipeline.run(executor=executor)
# Each task contains performance data
for task in results:
for perf in task._stage_perf:
print(f"Stage: {perf.stage_name}")
print(f" Duration: {perf.process_time}s")
print(f" Items processed: {perf.num_items_processed}")