NVIDIANeMo Curator
Menu

Data Acquisition Concepts

This guide covers the core concepts for acquiring and processing text data from remote sources in NeMo Curator. Data acquisition focuses on downloading, extracting, and converting remote data sources into the DocumentBatch format for further processing.

Overview

Data acquisition in NeMo Curator follows a three-stage architecture:

  1. Generate URLs: Discover and generate download URLs from minimal input.
  2. Download: Retrieve raw data files from remote sources.
  3. Iterate and Extract: Extract individual records from downloaded containers and convert raw content to clean, structured text.

This process transforms diverse remote data sources into a standardized DocumentBatch that can be used throughout the text curation pipeline.

Core Components

The data acquisition framework consists of abstract base classes that define the acquisition workflow:

URLGenerator

Generates URLs for downloading from minimal input configuration. Override the generate_urls method, which returns a list of URLs for the user to download.

Example Implementation:

from dataclasses import dataclass
from nemo_curator.stages.text.download import URLGenerator

@dataclass
class CustomURLGenerator(URLGenerator):
    def generate_urls(self):
        # Custom URL generation logic
        urls = []
        ...
        return urls

DocumentDownloader

Connects to and downloads data from remote repositories. Override _get_output_filename and _download_to_path, which are called by the underlying download function, designed to be idempotent.

Example Implementation:

from nemo_curator.stages.text.download import DocumentDownloader

class CustomDownloader(DocumentDownloader):
    def __init__(self, download_dir: str):
        super().__init__(download_dir=download_dir)

    def _get_output_filename(self, url: str) -> str:
        # Custom logic to extract filename from URL
        return url.split("/")[-1]

    def _download_to_path(self, url: str, path: str) -> tuple[bool, str | None]:
        # Custom download logic
        # Return (success_bool, error_message)
        try:
            # ... download implementation ...
            return True, None
        except Exception as e:
            return False, str(e)

DocumentIterator

Extracts individual records from downloaded containers. Override iterate to load the file at the given path and yield records, and override output_columns to declare the output schema. Records are passed directly to the extractor (if provided) inline during iteration.

Example Implementation:

from collections.abc import Iterator
from typing import Any
from nemo_curator.stages.text.download import DocumentIterator

class CustomIterator(DocumentIterator):
    def __init__(self, log_frequency: int = 1000):
        super().__init__()
        self._log_frequency = log_frequency

    def iterate(self, file_path: str) -> Iterator[dict[str, Any]]:
        # Custom iteration logic to load local file and return documents
        for record in load_local_file_fn(file_path):
            yield {"content": record_content, "metadata": record_metadata}

    def output_columns(self) -> list[str]:
        return ["content", "metadata"]

DocumentExtractor (Optional)

DocumentExtractor transforms individual records and is optional. When provided to DocumentIterateExtractStage, it processes each record inline during iteration rather than as a separate stage.

Example Implementation:

from typing import Any
from nemo_curator.stages.text.download import DocumentExtractor

class CustomExtractor(DocumentExtractor):
    def __init__(self):
        super().__init__()

    def extract(self, record: dict[str, str]) -> dict[str, Any] | None:
        # Custom extraction logic
        cleaned_text = clean_content(record["content"])
        detected_lang = detect_language(cleaned_text)
        return {"text": cleaned_text, "language": detected_lang}

    def input_columns(self):
        return ["content", "metadata"]

    def output_columns(self):
        return ["text", "language"]

Supported Data Sources

NeMo Curator provides built-in support for major public text datasets:

Integration with Pipeline Architecture

The data acquisition process seamlessly integrates with NeMo Curator’s pipeline-based architecture. The DocumentDownloadExtractStage handles parallel processing through the distributed computing framework.

Acquisition Workflow

from nemo_curator.core.client import RayClient
from nemo_curator.pipeline import Pipeline
from nemo_curator.stages.text.download import DocumentDownloadExtractStage
from nemo_curator.stages.text.io.writer.jsonl import JsonlWriter
from nemo_curator.stages.base import ProcessingStage

# Create composite stage
class CustomDownloadExtractStage(DocumentDownloadExtractStage):
    def __init__(
        self,
        download_dir: str = "./custom_downloads",
        url_limit: int | None = None,
        record_limit: int | None = None,
        add_filename_column: bool | str = True,
    ):
        # Create the URL generator
        self.url_generator = CustomURLGenerator()

        # Create the downloader
        self.downloader = CustomDownloader(download_dir=download_dir)

        # Create the iterator
        self.iterator = CustomIterator()

        # Create the extractor
        self.extractor = CustomExtractor()

        # Initialize the parent composite stage
        super().__init__(
            url_generator=self.url_generator,
            downloader=self.downloader,
            iterator=self.iterator,
            extractor=self.extractor,
            url_limit=url_limit,
            record_limit=record_limit,
            add_filename_column=add_filename_column,
        )
        self.name = "custom_pipeline"

    def decompose(self) -> list[ProcessingStage]:
        """Decompose this composite stage into its constituent stages."""
        return self.stages

    def get_description(self) -> str:
        """Get a description of this composite stage."""
        return "Custom pipeline"

# Initialize Ray client
ray_client = RayClient()
ray_client.start()

# Define acquisition pipeline
pipeline = Pipeline(name="data_acquisition")

# Create download and extract stage with custom components
custom_download_extract_stage = CustomDownloadExtractStage(...)
pipeline.add_stage(custom_download_extract_stage)

# Write the results
pipeline.add_stage(JsonlWriter(...))

# Execute acquisition pipeline
results = pipeline.run()

# Stop Ray client
ray_client.stop()

Performance Optimization

Parallel Processing

Data acquisition leverages distributed computing frameworks for scalable processing:

  • Parallel Downloads: Each URL in the generated list downloads through separate workers.
  • Concurrent Extraction: Files process in parallel across workers.
  • Memory Management: Streaming processing for large files.

Integration with Data Loading

Data acquisition produces a standardized output that integrates seamlessly with Curator’s Data Loading Concepts:

from nemo_curator.stages.text.io.writer import ParquetWriter

# Create acquisition pipeline with all stages including writer
acquisition_pipeline = Pipeline(name="data_acquisition")
# ... add acquisition stages ...

# Add writer to save results directly
writer = ParquetWriter(path="acquired_data/")
acquisition_pipeline.add_stage(writer)

# Run pipeline to acquire and save data in one execution
results = acquisition_pipeline.run()

# Later: Load using pipeline-based data loading
from nemo_curator.stages.text.io.reader import ParquetReader

load_pipeline = Pipeline(name="load_acquired_data")
reader = ParquetReader(file_paths="acquired_data/")
load_pipeline.add_stage(reader)

This enables you to:

  • Separate acquisition from processing for better workflow management.
  • Cache acquired data to avoid re-downloading.
  • Mix acquired and local data in the same processing pipeline.
  • Use standard loading patterns regardless of data origin.