NVIDIANeMo Curator
Menu

Custom Data Loading

Create custom data loading pipelines using Curator. This guide shows how to build modular stages that run on Curator’s distributed processing.

How It Works

Curator uses the same 3-step pipeline pattern described in Data Acquisition Concepts for custom data loading. Each step maps to one or more abstract base classes with corresponding processing stages that compose into pipelines.

Architecture Overview

For detailed information about the core components and data flow, see Data Acquisition Concepts and Data Loading Concepts.

Implementation Guide

1. Create Directory Structure

your_data_source/
├── __init__.py
├── stage.py           # Main composite stage
├── url_generation.py  # URL generation logic
├── download.py        # Download implementation
├── iterator.py        # File iteration logic
└── extract.py         # Data extraction logic (optional)

2. Build Core Components

URL Generator (url_generation.py)

from dataclasses import dataclass
from nemo_curator.stages.text.download import URLGenerator

@dataclass
class CustomURLGenerator(URLGenerator):
    def generate_urls(self) -> list[str]:
        """Generate list of URLs to download."""
        # Your URL generation logic here
        return [
            "https://example.com/dataset1.zip",
            "https://example.com/dataset2.zip",
        ]

Document Downloader (download.py)

from nemo_curator.stages.text.download import DocumentDownloader

class CustomDownloader(DocumentDownloader):
    def __init__(self, download_dir: str):
        super().__init__(download_dir=download_dir)

    def _get_output_filename(self, url: str) -> str:
        """Extract filename from URL."""
        return url.split("/")[-1]

    def _download_to_path(self, url: str, path: str) -> tuple[bool, str | None]:
        """Download file from URL to local path."""
        # Custom download logic
        # Return (success_bool, error_message)
        try:
            # ... download implementation ...
            return True, None
        except Exception as e:
            return False, str(e)

Document Iterator (iterator.py)

import json
from collections.abc import Iterator
from typing import Any
from nemo_curator.stages.text.download import DocumentIterator

class CustomIterator(DocumentIterator):
    def __init__(self, log_frequency: int = 1000):
        super().__init__()
        self._log_frequency = log_frequency

    def iterate(self, file_path: str) -> Iterator[dict[str, Any]]:
        """Iterate over records in a file."""
        # Custom iteration logic to load local file and return documents
        for record in load_local_file_fn(file_path):
            yield {"content": record_content, "metadata": record_metadata, "id": record_id}

    def output_columns(self) -> list[str]:
        """Define output columns."""
        return ["content", "metadata", "id"]

Document Extractor (extract.py)

from typing import Any
from nemo_curator.stages.text.download import DocumentExtractor

class CustomExtractor(DocumentExtractor):
    def __init__(self):
        super().__init__()

    def extract(self, record: dict[str, str]) -> dict[str, Any] | None:
        """Transform raw record to final format."""
        # Skip invalid records
        if not record.get("content"):
            return None

        # Extract and clean text
        cleaned_text = self._clean_text(record["content"])

        # Generate unique ID if not present
        doc_id = record.get("id", self._generate_id(cleaned_text))

        return {
            "text": cleaned_text,
            "id": doc_id,
            "source": record.get("metadata", {}).get("source", "unknown")
        }

    def input_columns(self) -> list[str]:
        return ["content", "metadata", "id"]

    def output_columns(self) -> list[str]:
        return ["text", "id", "source"]

    def _clean_text(self, text: str) -> str:
        """Clean and normalize text."""
        # Your text cleaning logic here
        return text.strip()

    def _generate_id(self, text: str) -> str:
        """Generate unique ID for text."""
        import hashlib
        return hashlib.md5(text.encode()).hexdigest()[:16]

3. Create Composite Stage (stage.py)

from nemo_curator.stages.text.download import DocumentDownloadExtractStage
from nemo_curator.stages.base import ProcessingStage
from .url_generation import CustomURLGenerator
from .download import CustomDownloader
from .iterator import CustomIterator
from .extract import CustomExtractor

class CustomDataStage(DocumentDownloadExtractStage):
    """Custom data loading stage combining all components."""

    def __init__(
        self,
        download_dir: str = "./custom_downloads",
        url_limit: int | None = None,
        record_limit: int | None = None,
        add_filename_column: bool | str = True,
    ):
        self.url_generator = CustomURLGenerator()
        self.downloader = CustomDownloader(download_dir=download_dir)
        self.iterator = CustomIterator()
        self.extractor = CustomExtractor()

        # Initialize the parent composite stage
        super().__init__(
            url_generator=self.url_generator,
            downloader=self.downloader,
            iterator=self.iterator,
            extractor=self.extractor,  # Optional - remove if not needed
            url_limit=url_limit,
            record_limit=record_limit,
            add_filename_column=add_filename_column,
        )
        self.name = "custom_data"

    def decompose(self) -> list[ProcessingStage]:
        """Decompose this composite stage into its constituent stages."""
        return self.stages

    def get_description(self) -> str:
        """Get a description of this composite stage."""
        return "Custom data"

Usage Examples

Basic Pipeline

from nemo_curator.core.client import RayClient
from nemo_curator.pipeline import Pipeline
from your_data_source.stage import CustomDataStage
from nemo_curator.stages.text.io.writer.jsonl import JsonlWriter

def main():
    # Initialize Ray client
    ray_client = RayClient()
    ray_client.start()

    # Create pipeline
    pipeline = Pipeline(
        name="custom_data_pipeline",
        description="Load and process custom dataset"
    )

    # Create custom data loading stage
    data_stage = CustomDataStage(...)

    pipeline.add_stage(data_stage)

    # Save the results to JSONL
    pipeline.add_stage(JsonlWriter(...))

    # Run pipeline
    print("Starting pipeline...")
    results = pipeline.run()

    # Stop Ray client
    ray_client.stop()

if __name__ == "__main__":
    main()

For executor options and configuration, refer to Execution Backends.

Parameters Reference

ParameterTypeDescriptionDefault
url_generatorURLGeneratorCustom URL generation implementationRequired
downloaderDocumentDownloaderCustom download implementationRequired
iteratorDocumentIteratorCustom file iteration implementationRequired
extractorDocumentExtractor | NoneOptional extraction/transformation stepNone
url_limitint | NoneMaximum number of URLs to processNone
record_limitint | NoneMaximum records per fileNone
add_filename_columnbool | strAdd filename column to output; if str, uses it as the column name (default name: “file_name”)True

Output Format

Processed data flows through the pipeline as DocumentBatch tasks containing Pandas DataFrames or PyArrow Tables:

Example Output Schema

{
    "text": "This is the processed document text",
    "id": "unique-document-id",
    "source": "example.com",
    "file_name": "dataset1.jsonl"  # If add_filename_column=True (default column name)
}