Read Existing Data
Use Curator’s JsonlReader and ParquetReader to read existing datasets into a pipeline, then optionally add processing stages.
JSONL Reader
:sync: jsonl
Example: Read JSONL and Filter
from nemo_curator.core.client import RayClient
from nemo_curator.pipeline import Pipeline
from nemo_curator.stages.text.io.reader import JsonlReader
from nemo_curator.stages.text.filters import ScoreFilter
from nemo_curator.stages.text.filters.heuristic import WordCountFilter
# Initialize Ray client
ray_client = RayClient()
ray_client.start()
# Create pipeline for processing existing JSONL files
pipeline = Pipeline(name="jsonl_data_processing")
# Read JSONL files
reader = JsonlReader(
file_paths="/path/to/data",
files_per_partition=4,
fields=["text", "url"] # Only read specific columns
)
pipeline.add_stage(reader)
# Add filtering stage
word_filter = ScoreFilter(
filter_obj=WordCountFilter(min_words=50, max_words=1000),
text_field="text"
)
pipeline.add_stage(word_filter)
# Add more stages to pipeline...
# Execute pipeline
results = pipeline.run()
# Stop Ray client
ray_client.stop()Parquet Reader
:sync: parquet
Example: Read Parquet and Filter
from nemo_curator.core.client import RayClient
from nemo_curator.pipeline import Pipeline
from nemo_curator.stages.text.io.reader import ParquetReader
from nemo_curator.stages.text.filters import ScoreFilter
from nemo_curator.stages.text.filters.heuristic import WordCountFilter
# Initialize Ray client
ray_client = RayClient()
ray_client.start()
# Create pipeline for processing existing Parquet files
pipeline = Pipeline(name="parquet_data_processing")
# Read Parquet files with PyArrow engine
reader = ParquetReader(
file_paths="/path/to/data",
files_per_partition=4,
fields=["text", "metadata"] # Only read specific columns
)
pipeline.add_stage(reader)
# Add filtering stage
word_filter = ScoreFilter(
filter_obj=WordCountFilter(min_words=50, max_words=1000),
text_field="text"
)
pipeline.add_stage(word_filter)
# Add more stages to pipeline...
# Execute pipeline
results = pipeline.run()
# Stop Ray client
ray_client.stop()Reader Configuration
Common Parameters
Both JsonlReader and ParquetReader support these configuration options:
| Parameter | Type | Description | Default |
|---|---|---|---|
file_paths | str | list[str] | File paths or glob patterns to read | Required |
files_per_partition | int | None | Number of files per partition. Overrides blocksize if both are provided. | None |
blocksize | int | str | None | Target partition size (e.g., “128MB”). Ignored if files_per_partition is provided. | None |
fields | list[str] | None | Column names to read (column selection) | None (all columns) |
read_kwargs | dict[str, Any] | None | Extra arguments for the underlying reader | None |
Parquet-Specific Features
ParquetReader provides these optimizations:
- PyArrow Engine: Uses
pyarrowengine by default for better performance - Storage Options: Supports cloud storage via
storage_optionsinread_kwargs - Schema Handling: Automatic schema inference and validation
- Columnar Efficiency: Optimized for reading specific columns
Performance Tips
- Use
fieldsparameter to read required columns for better performance - Set
files_per_partitionbased on your cluster size and memory constraints - Use
blocksizefor fine-grained control over partition sizes
Output Integration
Both readers produce DocumentBatch tasks that integrate seamlessly with:
- Processing Stages: Apply filters, transformations, and quality checks
- Writer Stages: Export to JSONL, Parquet, or other formats
- Analysis Tools: Convert to Pandas/PyArrow for inspection and debugging