Data Acquisition Concepts
This guide covers the core concepts for acquiring and processing text data from remote sources in NeMo Curator. Data acquisition focuses on downloading, extracting, and converting remote data sources into the DocumentBatch format for further processing.
Overview
Data acquisition in NeMo Curator follows a three-stage architecture:
- Generate URLs: Discover and generate download URLs from minimal input.
- Download: Retrieve raw data files from remote sources.
- Iterate and Extract: Extract individual records from downloaded containers and convert raw content to clean, structured text.
This process transforms diverse remote data sources into a standardized DocumentBatch that can be used throughout the text curation pipeline.
Core Components
The data acquisition framework consists of abstract base classes that define the acquisition workflow:
URLGenerator
Generates URLs for downloading from minimal input configuration. Override the generate_urls method, which returns a list of URLs for the user to download.
Example Implementation:
from dataclasses import dataclass
from nemo_curator.stages.text.download import URLGenerator
@dataclass
class CustomURLGenerator(URLGenerator):
def generate_urls(self):
# Custom URL generation logic
urls = []
...
return urls
DocumentDownloader
Connects to and downloads data from remote repositories. Override _get_output_filename and _download_to_path, which are called by the underlying download function, designed to be idempotent.
Example Implementation:
from nemo_curator.stages.text.download import DocumentDownloader
class CustomDownloader(DocumentDownloader):
def __init__(self, download_dir: str):
super().__init__(download_dir=download_dir)
def _get_output_filename(self, url: str) -> str:
# Custom logic to extract filename from URL
return url.split("/")[-1]
def _download_to_path(self, url: str, path: str) -> tuple[bool, str | None]:
# Custom download logic
# Return (success_bool, error_message)
try:
# ... download implementation ...
return True, None
except Exception as e:
return False, str(e)
DocumentIterator
Extracts individual records from downloaded containers. Override iterate to load the file at the given path and yield records, and override output_columns to declare the output schema. Records are passed directly to the extractor (if provided) inline during iteration.
Example Implementation:
from collections.abc import Iterator
from typing import Any
from nemo_curator.stages.text.download import DocumentIterator
class CustomIterator(DocumentIterator):
def __init__(self, log_frequency: int = 1000):
super().__init__()
self._log_frequency = log_frequency
def iterate(self, file_path: str) -> Iterator[dict[str, Any]]:
# Custom iteration logic to load local file and return documents
for record in load_local_file_fn(file_path):
yield {"content": record_content, "metadata": record_metadata}
def output_columns(self) -> list[str]:
return ["content", "metadata"]
DocumentExtractor (Optional)
DocumentExtractor transforms individual records and is optional. When provided to DocumentIterateExtractStage, it processes each record inline during iteration rather than as a separate stage.
Example Implementation:
from typing import Any
from nemo_curator.stages.text.download import DocumentExtractor
class CustomExtractor(DocumentExtractor):
def __init__(self):
super().__init__()
def extract(self, record: dict[str, str]) -> dict[str, Any] | None:
# Custom extraction logic
cleaned_text = clean_content(record["content"])
detected_lang = detect_language(cleaned_text)
return {"text": cleaned_text, "language": detected_lang}
def input_columns(self):
return ["content", "metadata"]
def output_columns(self):
return ["text", "language"]
Supported Data Sources
NeMo Curator provides built-in support for major public text datasets:
Download and extract web archive data from Common Crawl web-scale multilingual
ArXivDownload and extract scientific papers from arXiv academic scientific
WikipediaDownload and extract Wikipedia articles from Wikipedia dumps encyclopedic structured
Custom Data SourcesImplement a download and extract pipeline for a custom data source extensible specialized
Integration with Pipeline Architecture
The data acquisition process seamlessly integrates with NeMo Curator’s pipeline-based architecture. The DocumentDownloadExtractStage handles parallel processing through the distributed computing framework.
Acquisition Workflow
from nemo_curator.core.client import RayClient
from nemo_curator.pipeline import Pipeline
from nemo_curator.stages.text.download import DocumentDownloadExtractStage
from nemo_curator.stages.text.io.writer.jsonl import JsonlWriter
from nemo_curator.stages.base import ProcessingStage
# Create composite stage
class CustomDownloadExtractStage(DocumentDownloadExtractStage):
def __init__(
self,
download_dir: str = "./custom_downloads",
url_limit: int | None = None,
record_limit: int | None = None,
add_filename_column: bool | str = True,
):
# Create the URL generator
self.url_generator = CustomURLGenerator()
# Create the downloader
self.downloader = CustomDownloader(download_dir=download_dir)
# Create the iterator
self.iterator = CustomIterator()
# Create the extractor
self.extractor = CustomExtractor()
# Initialize the parent composite stage
super().__init__(
url_generator=self.url_generator,
downloader=self.downloader,
iterator=self.iterator,
extractor=self.extractor,
url_limit=url_limit,
record_limit=record_limit,
add_filename_column=add_filename_column,
)
self.name = "custom_pipeline"
def decompose(self) -> list[ProcessingStage]:
"""Decompose this composite stage into its constituent stages."""
return self.stages
def get_description(self) -> str:
"""Get a description of this composite stage."""
return "Custom pipeline"
# Initialize Ray client
ray_client = RayClient()
ray_client.start()
# Define acquisition pipeline
pipeline = Pipeline(name="data_acquisition")
# Create download and extract stage with custom components
custom_download_extract_stage = CustomDownloadExtractStage(...)
pipeline.add_stage(custom_download_extract_stage)
# Write the results
pipeline.add_stage(JsonlWriter(...))
# Execute acquisition pipeline
results = pipeline.run()
# Stop Ray client
ray_client.stop()
Performance Optimization
Parallel Processing
Data acquisition leverages distributed computing frameworks for scalable processing:
- Parallel Downloads: Each URL in the generated list downloads through separate workers.
- Concurrent Extraction: Files process in parallel across workers.
- Memory Management: Streaming processing for large files.
Integration with Data Loading
Data acquisition produces a standardized output that integrates seamlessly with Curator’s Data Loading Concepts:
from nemo_curator.stages.text.io.writer import ParquetWriter
# Create acquisition pipeline with all stages including writer
acquisition_pipeline = Pipeline(name="data_acquisition")
# ... add acquisition stages ...
# Add writer to save results directly
writer = ParquetWriter(path="acquired_data/")
acquisition_pipeline.add_stage(writer)
# Run pipeline to acquire and save data in one execution
results = acquisition_pipeline.run()
# Later: Load using pipeline-based data loading
from nemo_curator.stages.text.io.reader import ParquetReader
load_pipeline = Pipeline(name="load_acquired_data")
reader = ParquetReader(file_paths="acquired_data/")
load_pipeline.add_stage(reader)
This enables you to:
- Separate acquisition from processing for better workflow management.
- Cache acquired data to avoid re-downloading.
- Mix acquired and local data in the same processing pipeline.
- Use standard loading patterns regardless of data origin.