Interleaved IO
Read and write interleaved image-text data between WebDataset tar shards and Parquet. All four combinations work out of the box:
WDS tar ⇄ InterleavedBatch ⇄ Parquet
Understanding the Round-Trip
Storage Formats
Interleaved samples can live in two on-disk formats; in memory both materialize as InterleavedBatch:
| Format | Layout | When to Use |
|---|---|---|
| WebDataset tar shards | One tar per shard, one file per item; sample_id encoded in member key | Streaming reads, S3-friendly, MINT-1T-compatible |
| Parquet rows | One row per item (text/image/metadata), grouped by sample_id | Random access, column projection, ~5× faster reads in benchmarks |
Choose based on the consumer: training loops that stream large datasets often prefer WDS tars; analytics or sample-level inspection prefers Parquet.
When to Convert
A common workflow is “curate once in Parquet, train from tars”:
- Curate in Parquet — fast reads, easy filtering, column projection.
- Convert to WDS — once curation is done, write the final dataset as MINT-1T-style tars for the training loop.
The IO stages on this page support that workflow without intermediate copies.
Readers
InterleavedParquetReader
Reads Parquet files into InterleavedBatch. Each row maps to one item (text, image, or metadata) with a sample_id that groups items into samples.
from nemo_curator.stages.interleaved.io.reader import InterleavedParquetReader
reader = InterleavedParquetReader(
file_paths="s3://bucket/interleaved/*.parquet",
fields=("source_url", "license"), # passthrough columns kept alongside reserved ones
max_batch_bytes=512 * 1024 * 1024, # 512 MiB per output batch
)
Key behaviors:
fields=passthrough: keep additional columns alongside the reserved interleaved schema. Missing columns null-fill in a single pass.- Push-down column projection: the reader uses
pq.read_schema()per file and asks Parquet for only the columns it needs. max_batch_bytessplitting: large file groups are split into multiple batches by accumulated size; per-split lineage is preserved on the outputsource_filesfield.- Schema control: pass
schema=for strict alignment to a target Arrow schema, orschema_overrides=for partial type overrides on top ofINTERLEAVED_SCHEMA. Specifying both warns and prefersschema=.
InterleavedWebdatasetReader
Reads MINT-1T-style WebDataset tar shards. Same fields= and max_batch_bytes semantics as the Parquet reader.
from nemo_curator.stages.interleaved.io.reader import InterleavedWebdatasetReader
reader = InterleavedWebdatasetReader(
file_paths="s3://bucket/mint1t/*.tar",
fields=("source_url",),
sample_id_field="sample_id",
image_extensions=["png", "jpg", "jpeg"],
json_extensions=["json"],
texts_field="texts",
images_field="images",
)
Configurable file extensions for image, JSON, and texts members (image_extensions, json_extensions, texts_field, images_field, image_member_field) let the reader cooperate with non-standard tar layouts.
Reader Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
file_paths | str | list[str] | Required | Glob, list, or single path. Supports local, S3, GCS via fsspec. |
files_per_partition | int | None | None | Group input files into partitions of this size. |
blocksize | int | str | None | None | Optional bytes-per-partition target (e.g., "512MiB"). |
max_batch_bytes | int | None | None | Split large partitions into multiple InterleavedBatch outputs; None means no splitting. |
fields | tuple[str, …] | None | None | Passthrough columns to keep alongside the reserved INTERLEAVED_SCHEMA. |
read_kwargs | dict | {} | Forwarded to the underlying read (pyarrow.parquet.read_table, etc.). |
schema | pa.Schema | None | None | Strict alignment target. Reserved columns get canonical types; passthrough columns surface overflow. |
schema_overrides | dict[str, pa.DataType] | None | None | Partial type overrides on top of INTERLEAVED_SCHEMA. Warns when both schema= and schema_overrides= are set. |
Writers
InterleavedParquetWriter
Writes InterleavedBatch to Parquet. Inherits schema= / schema_overrides= and the materialization-error policy (see below).
InterleavedWebdatasetWriterStage
Writes InterleavedBatch to MINT-1T-style WebDataset tar shards.
from nemo_curator.stages.interleaved.io.writers.webdataset import InterleavedWebdatasetWriterStage
writer = InterleavedWebdatasetWriterStage(
output_dir="./out",
file_extension="tar",
)
pipeline.add_stage(writer)
Implementation notes:
urllib.parse.quote(sample_id, safe="")is used as the tar member key — injective and roundtrip-safe withsample_id_field="sample_id"on the matching reader.- Single-pass
groupby: rows are grouped bysample_idonce instead of filtered per sample (O(n) instead of O(n × m)). - Sparse positions: gaps in the position field are preserved as
Noneentries; the WDS reader skips them on the way back. - Supported modalities:
metadata,text, andimage. Any other modality raisesValueErrorat write time.
Schema Utilities
utils/schema.py ships shared helpers used by every arrow-based reader and writer.
reconcile_schema()
Apply canonical types to reserved columns; preserve passthrough column types as-is; avoid unsafe large↔small downcasts; unwrap Parquet dictionary encoding from passthrough columns.
align_table()
Pad, reorder, and cast an Arrow table to a target schema. Reserved columns use safe=False for predictable casts; passthrough columns use safe=True so overflow surfaces rather than silently corrupting data. Does not re-reconcile the user-provided target.
resolve_schema()
Merge schema= and schema_overrides= arguments with the canonical INTERLEAVED_SCHEMA; warn when both are provided; return None when neither is set. Used internally by readers and writers.
Materialization Error Policy
Writers expose on_materialize_error=, controlling what happens when a fetch step (or upstream stage) sets an error on a row:
| Value | Behavior | Use Case |
|---|---|---|
"error" | Raise immediately (default for strict pipelines). | Production pipelines where an error indicates a real problem. |
"warn" | Emit a warning and keep the row. | Development; want visibility without halting. |
"drop_row" | Drop the offending row but keep the rest of the sample. | Resilient pipelines where one bad item shouldn’t kill a sample. |
"drop_sample" | Drop the entire sample if any row in it errored. | Strict cleanliness; one bad row taints the whole sample. |
The policy is applied after fetch, so it covers errors raised by the materialization step itself and by upstream stages.
Mixed-Backend Path Handling
The internal helper _build_global_range_index() groups paths by filesystem object so a single batch that mixes S3 and local paths no longer fails silently — each backend’s range queries run against the right filesystem. This works automatically; you don’t need to configure it.
Performance
Benchmarks on 80 NVMe shards of MINT-1T PDF data (6,818 samples, 9.9 GB WDS / 6.0 GB Parquet) with an aspect-ratio filter applied:
| Path | Wall-clock | Samples/sec | Notes |
|---|---|---|---|
| WDS → Parquet | 76.8 s | 88.8 | Tar parsing dominates |
| WDS → WDS | 75.4 s | 90.4 | Tar parsing dominates |
| Parquet → Parquet | 15.7 s | 435.0 | Parquet column projection wins |
| Parquet → WDS | 18.5 s | 368.4 | Parquet read + tar write |
Parquet-sourced paths are roughly 5× faster than WDS-sourced paths because tar parsing dominates the WDS read cost.
Complete IO Pipeline Examples
Curate-Once-in-Parquet, Train-from-Tars
from nemo_curator.pipeline import Pipeline
from nemo_curator.backends.xenna import XennaExecutor
from nemo_curator.stages.interleaved.io.reader import InterleavedParquetReader
from nemo_curator.stages.interleaved.io.writers.webdataset import (
InterleavedWebdatasetWriterStage,
)
from nemo_curator.stages.interleaved.filter.blur_filter import InterleavedBlurFilterStage
pipeline = Pipeline(name="parquet_to_wds")
# 1. Read curated Parquet (faster random access during filtering)
pipeline.add_stage(
InterleavedParquetReader(file_paths="s3://bucket/curated/*.parquet")
)
# 2. Filter
pipeline.add_stage(InterleavedBlurFilterStage(score_threshold=100.0))
# 3. Write final WDS tars (training-loop friendly)
pipeline.add_stage(InterleavedWebdatasetWriterStage(output_dir="./final_tars"))
executor = XennaExecutor()
pipeline.run(executor)
Format Conversion (No Filtering)
# WDS → Parquet for analytics / ad-hoc inspection
pipeline.add_stage(
InterleavedWebdatasetReader(file_paths="s3://bucket/mint1t/*.tar")
)
pipeline.add_stage(InterleavedParquetWriter(output_dir="./parquet_copy"))
Best Practices
- Curate in Parquet, ship in WDS: Parquet is ~5× faster for filtering operations; convert to WDS only for the final training-loop format.
- Pass through provenance fields: list source-tracking columns (
source_url,license,crawl_date) infields=so they survive into the output. Missing the list silently drops them. - Use
max_batch_bytesfor large shards: without splitting, a single 5 GB Parquet file becomes one giant batch. Setmax_batch_bytes=512 * 1024 * 1024for memory-friendly batches. - Pick the right
on_materialize_error:"error"for production,"warn"for development,"drop_sample"for strict cleanliness. The default raises — set it explicitly when you want anything else.
Related Topics
- Interleaved Filters — sample-level quality filters that operate on
InterleavedBatch. - Nemotron-Parse PDF Pipeline — produces interleaved Parquet output from PDF inputs.