NVIDIANeMo Curator
Menu

Image Duplicate Removal Workflow

Learn how to run a complete image duplicate removal workflow that generates embeddings, identifies semantic duplicates, and removes similar images from your dataset.

Before You Start


1. Generate Image Embeddings

Create CLIP embeddings for all images in your dataset. This pipeline reads images, generates embeddings, and saves them to Parquet format for duplicate removal processing.

Define the Embedding Pipeline

from nemo_curator.pipeline import Pipeline
from nemo_curator.stages.file_partitioning import FilePartitioningStage
from nemo_curator.stages.image.embedders.clip_embedder import ImageEmbeddingStage
from nemo_curator.stages.image.io.convert import ConvertImageBatchToDocumentBatchStage
from nemo_curator.stages.image.io.image_reader import ImageReaderStage
from nemo_curator.stages.text.io.writer.parquet import ParquetWriter

def create_image_embedding_pipeline(input_dir, embeddings_dir, model_dir):
    """Create pipeline to generate embeddings for duplicate removal."""

    pipeline = Pipeline(
        name="image_embedding",
        description="Generate CLIP embeddings for image duplicate removal"
    )

    # Partition tar files for parallel processing
    pipeline.add_stage(FilePartitioningStage(
        file_paths=input_dir,
        files_per_partition=1,
        file_extensions=[".tar"],
    ))

    # Read images from tar archives
    pipeline.add_stage(ImageReaderStage(
        dali_batch_size=100,
        verbose=True,
        num_threads=16,
        num_gpus_per_worker=0.25,
    ))

    # Generate CLIP embeddings
    pipeline.add_stage(ImageEmbeddingStage(
        model_dir=model_dir,
        num_gpus_per_worker=0.25,
        model_inference_batch_size=32,
        verbose=True,
    ))

    # Convert to document format for deduplication
    pipeline.add_stage(ConvertImageBatchToDocumentBatchStage(
        fields=["image_id", "embedding"]
    ))

    # Save embeddings to Parquet
    pipeline.add_stage(ParquetWriter(path=embeddings_dir))

    return pipeline

Run Embedding Generation

# Set your paths
INPUT_TAR_DIR = "/path/to/input/tar_dataset"
EMBEDDINGS_DIR = "/path/to/embeddings"
MODEL_DIR = "/path/to/models"

# Create and run pipeline
embedding_pipeline = create_image_embedding_pipeline(
    INPUT_TAR_DIR, EMBEDDINGS_DIR, MODEL_DIR
)
embedding_pipeline.run()  # Uses XennaExecutor by default

Embedding Format Example

The pipeline writes embeddings to Parquet with two columns:

  • image_id: String identifier for the image
  • embedding: List of float values with length 768 (CLIP ViT-L/14 dimension)

Directory layout

/path/to/embeddings/
  part-00000-....parquet
  part-00001-....parquet
  part-00002-....parquet

Schema

image_id: string
embedding: list<float32>  # length = 768 for CLIP ViT-L/14

Sample row

{"image_id": "00001234", "embedding": [0.0123, -0.0456, 0.0031, 0.1279, ...]}

Read example

import pyarrow.parquet as pq

table = pq.read_table("/path/to/embeddings")
df = table.to_pandas()
print(df.head())  # columns: image_id, embedding (list[float])
print(f"Embedding dimension: {len(df.iloc[0]['embedding'])}")

2. Run Semantic Duplicate Removal

Use the semantic duplicate removal workflow to identify and mark duplicate images based on embedding similarity.

from nemo_curator.stages.deduplication.semantic import SemanticDeduplicationWorkflow

def create_deduplication_workflow(embeddings_dir, removal_dir):
    """Create semantic deduplication workflow."""

    return SemanticDeduplicationWorkflow(
        input_path=embeddings_dir,
        output_path=removal_dir,
        id_field="image_id",
        embedding_field="embedding",
        n_clusters=100,          # Number of clusters for grouping
        eps=0.01,               # Similarity threshold (lower = more strict)
        verbose=True,
    )

# Set paths
EMBEDDINGS_DIR = "/path/to/embeddings"
REMOVAL_DIR = "/path/to/removal_ids"

# Run deduplication
dedup_workflow = create_deduplication_workflow(EMBEDDINGS_DIR, REMOVAL_DIR)
dedup_workflow.run()

3. Remove Duplicate Images

After identifying duplicates, use ImageDuplicatesRemovalStage to filter them from your dataset.

Filter the original dataset to remove identified duplicates and create the final deduplicated dataset.

from nemo_curator.stages.image.deduplication.removal import ImageDuplicatesRemovalStage
from nemo_curator.stages.image.io.image_writer import ImageWriterStage

def create_image_removal_pipeline(input_dir, removal_dir, output_dir):
    """Create pipeline to remove duplicate images."""

    pipeline = Pipeline(
        name="image_deduplication",
        description="Remove duplicate images from dataset"
    )

    # Partition input files
    pipeline.add_stage(FilePartitioningStage(
        file_paths=input_dir,
        files_per_partition=1,
        file_extensions=[".tar"],
    ))

    # Read original images
    pipeline.add_stage(ImageReaderStage(
        dali_batch_size=100,
        verbose=True,
        num_threads=16,
        num_gpus_per_worker=0.25,
    ))

    # Remove duplicates based on removal list
    pipeline.add_stage(ImageDuplicatesRemovalStage(
        removal_parquets_dir=removal_dir + "/duplicates",
        duplicate_id_field="id",
        verbose=True,
    ))

    # Write deduplicated dataset
    pipeline.add_stage(ImageWriterStage(
        output_dir=output_dir,
        remove_image_data=True,
        verbose=True,
    ))

    return pipeline

Run the Removal Pipeline

# Set paths
INPUT_TAR_DIR = "/path/to/input/tar_dataset"
REMOVAL_DIR = "/path/to/removal_ids"
OUTPUT_DIR = "/path/to/deduplicated/dataset"

# Run removal pipeline
removal_pipeline = create_image_removal_pipeline(
    INPUT_TAR_DIR, REMOVAL_DIR, OUTPUT_DIR
)
removal_pipeline.run()  # Uses XennaExecutor by default

4. Inspect Results

After deduplication, examine the results to understand what was removed:

Check Removal Statistics

import pandas as pd
from glob import glob

# Read removal results
removal_files = glob(f"{REMOVAL_DIR}/duplicates/*.parquet")
removal_dfs = [pd.read_parquet(f) for f in removal_files]
all_removals = pd.concat(removal_dfs, ignore_index=True)

print(f"Total images marked for removal: {len(all_removals)}")
print(f"Unique images marked for removal: {all_removals['id'].nunique()}")

# Show sample of removed images
print("\nSample removed image IDs:")
print(all_removals['id'].head(10).tolist())

Compare Dataset Sizes

import os

def count_tar_files(directory):
    """Count tar files in a directory."""
    tar_files = glob(os.path.join(directory, "*.tar"))
    return len(tar_files)

original_count = count_tar_files(INPUT_TAR_DIR)
deduplicated_count = count_tar_files(OUTPUT_DIR)

print(f"Original dataset: {original_count} tar files")
print(f"Deduplicated dataset: {deduplicated_count} tar files")
print(f"Reduction: {original_count - deduplicated_count} files ({(1 - deduplicated_count/original_count)*100:.1f}%)")

5. Complete Workflow Script

Here’s the complete workflow that combines all steps:

from nemo_curator.pipeline import Pipeline
from nemo_curator.stages.deduplication.semantic import SemanticDeduplicationWorkflow
from nemo_curator.stages.file_partitioning import FilePartitioningStage
from nemo_curator.stages.image.deduplication.removal import ImageDuplicatesRemovalStage
from nemo_curator.stages.image.embedders.clip_embedder import ImageEmbeddingStage
from nemo_curator.stages.image.io.convert import ConvertImageBatchToDocumentBatchStage
from nemo_curator.stages.image.io.image_reader import ImageReaderStage
from nemo_curator.stages.image.io.image_writer import ImageWriterStage
from nemo_curator.stages.text.io.writer.parquet import ParquetWriter

def run_image_deduplication_workflow():
    """Run complete image deduplication workflow."""

    # Define paths
    INPUT_TAR_DIR = "/path/to/input/tar_dataset"
    EMBEDDINGS_DIR = "/path/to/embeddings"
    REMOVAL_DIR = "/path/to/removal_ids"
    OUTPUT_DIR = "/path/to/deduplicated/dataset"
    MODEL_DIR = "/path/to/models"

    print("Step 1: Generating embeddings...")

    # Step 1: Generate embeddings
    embedding_pipeline = Pipeline(name="embedding", description="Generate embeddings")

    embedding_pipeline.add_stage(FilePartitioningStage(
        file_paths=INPUT_TAR_DIR, files_per_partition=1, file_extensions=[".tar"]
    ))
    embedding_pipeline.add_stage(ImageReaderStage(
        dali_batch_size=100, verbose=True, num_threads=16, num_gpus_per_worker=0.25
    ))
    embedding_pipeline.add_stage(ImageEmbeddingStage(
        model_dir=MODEL_DIR, num_gpus_per_worker=0.25,
        model_inference_batch_size=32, verbose=True
    ))
    embedding_pipeline.add_stage(ConvertImageBatchToDocumentBatchStage(
        fields=["image_id", "embedding"]
    ))
    embedding_pipeline.add_stage(ParquetWriter(path=EMBEDDINGS_DIR))

    embedding_pipeline.run()  # Uses XennaExecutor by default

    print("Step 2: Running semantic deduplication...")

    # Step 2: Semantic deduplication
    dedup_workflow = SemanticDeduplicationWorkflow(
        input_path=EMBEDDINGS_DIR,
        output_path=REMOVAL_DIR,
        id_field="image_id",
        embedding_field="embedding",
        n_clusters=100,
        eps=0.01,
        verbose=True,
    )
    dedup_workflow.run()

    print("Step 3: Removing duplicate images...")

    # Step 3: Remove duplicates
    removal_pipeline = Pipeline(name="removal", description="Remove duplicates")

    removal_pipeline.add_stage(FilePartitioningStage(
        file_paths=INPUT_TAR_DIR, files_per_partition=1, file_extensions=[".tar"]
    ))
    removal_pipeline.add_stage(ImageReaderStage(
        dali_batch_size=100, verbose=True, num_threads=16, num_gpus_per_worker=0.25
    ))
    removal_pipeline.add_stage(ImageDuplicatesRemovalStage(
        removal_parquets_dir=REMOVAL_DIR + "/duplicates",
        duplicate_id_field="id",
        verbose=True,
    ))
    removal_pipeline.add_stage(ImageWriterStage(
        output_dir=OUTPUT_DIR, remove_image_data=True, verbose=True
    ))

    removal_pipeline.run()  # Uses XennaExecutor by default

    print(f"Deduplication complete! Results saved to: {OUTPUT_DIR}")

if __name__ == "__main__":
    run_image_deduplication_workflow()

Next Steps

After running image deduplication:

  1. Quality assessment: Manually review a sample of removed duplicates
  2. Combine with filtering: Run aesthetic/NSFW filtering on deduplicated data
  3. Export for training: Prepare final curated dataset for ML training
  4. Monitor metrics: Track deduplication rates across different image types