NVIDIANeMo Curator
Menu

Read Existing Data

Use Curator’s JsonlReader and ParquetReader to read existing datasets into a pipeline, then optionally add processing stages.

JSONL Reader

:sync: jsonl

Example: Read JSONL and Filter

from nemo_curator.core.client import RayClient
from nemo_curator.pipeline import Pipeline
from nemo_curator.stages.text.io.reader import JsonlReader
from nemo_curator.stages.text.filters import ScoreFilter
from nemo_curator.stages.text.filters.heuristic import WordCountFilter

# Initialize Ray client
ray_client = RayClient()
ray_client.start()

# Create pipeline for processing existing JSONL files
pipeline = Pipeline(name="jsonl_data_processing")

# Read JSONL files
reader = JsonlReader(
    file_paths="/path/to/data",
    files_per_partition=4,
    fields=["text", "url"]  # Only read specific columns
)
pipeline.add_stage(reader)

# Add filtering stage
word_filter = ScoreFilter(
    filter_obj=WordCountFilter(min_words=50, max_words=1000),
    text_field="text"
)
pipeline.add_stage(word_filter)

# Add more stages to pipeline...

# Execute pipeline
results = pipeline.run()

# Stop Ray client
ray_client.stop()

Parquet Reader

:sync: parquet

Example: Read Parquet and Filter

from nemo_curator.core.client import RayClient
from nemo_curator.pipeline import Pipeline
from nemo_curator.stages.text.io.reader import ParquetReader
from nemo_curator.stages.text.filters import ScoreFilter
from nemo_curator.stages.text.filters.heuristic import WordCountFilter

# Initialize Ray client
ray_client = RayClient()
ray_client.start()

# Create pipeline for processing existing Parquet files
pipeline = Pipeline(name="parquet_data_processing")

# Read Parquet files with PyArrow engine
reader = ParquetReader(
    file_paths="/path/to/data",
    files_per_partition=4,
    fields=["text", "metadata"]  # Only read specific columns
)
pipeline.add_stage(reader)

# Add filtering stage
word_filter = ScoreFilter(
    filter_obj=WordCountFilter(min_words=50, max_words=1000),
    text_field="text"
)
pipeline.add_stage(word_filter)

# Add more stages to pipeline...

# Execute pipeline
results = pipeline.run()

# Stop Ray client
ray_client.stop()

Reader Configuration

Common Parameters

Both JsonlReader and ParquetReader support these configuration options:

ParameterTypeDescriptionDefault
file_pathsstr | list[str]File paths or glob patterns to readRequired
files_per_partitionint | NoneNumber of files per partition. Overrides blocksize if both are provided.None
blocksizeint | str | NoneTarget partition size (e.g., “128MB”). Ignored if files_per_partition is provided.None
fieldslist[str] | NoneColumn names to read (column selection)None (all columns)
read_kwargsdict[str, Any] | NoneExtra arguments for the underlying readerNone

Parquet-Specific Features

ParquetReader provides these optimizations:

  • PyArrow Engine: Uses pyarrow engine by default for better performance
  • Storage Options: Supports cloud storage via storage_options in read_kwargs
  • Schema Handling: Automatic schema inference and validation
  • Columnar Efficiency: Optimized for reading specific columns

Performance Tips

  • Use fields parameter to read required columns for better performance
  • Set files_per_partition based on your cluster size and memory constraints
  • Use blocksize for fine-grained control over partition sizes

Output Integration

Both readers produce DocumentBatch tasks that integrate seamlessly with:

  • Processing Stages: Apply filters, transformations, and quality checks
  • Writer Stages: Export to JSONL, Parquet, or other formats
  • Analysis Tools: Convert to Pandas/PyArrow for inspection and debugging