NVIDIANeMo Curator
Menu

Nemotron-CC Pipelines

Nemotron-CC provides advanced synthetic data generation workflows for transforming and extracting knowledge from existing text documents. Unlike simple generation, these pipelines use sophisticated preprocessing, LLM-based transformation, and postprocessing to create high-quality training data.

The Composable Pipeline Pattern

Nemotron-CC stages follow a composable pattern with three distinct phases:

  1. Preprocessing: Segment documents, filter by length, and prepare inputs for the LLM
  2. Generation: Apply task-specific prompts to transform text using the LLM
  3. Postprocessing: Clean outputs, remove formatting artifacts, and filter low-quality results

This separation enables fine-grained control over each phase while providing reusable helper functions for common patterns.

Pipeline Architecture

flowchart TB
    subgraph "Preprocessing"
        A[Input Documents] --> B[Token Count Filter]
        B --> C[Document Splitter]
        C --> D[Segment Filter]
        D --> E[Document Joiner]
    end

    subgraph "LLM Generation"
        E --> F[Task-Specific Stage<br/>WikiPara/DiverseQA/Distill/etc.]
    end

    subgraph "Postprocessing"
        F --> G[Token Count Filter]
        G --> H[Markdown Remover]
        H --> I[Task-Specific Cleanup]
        I --> J[Quality Filter]
    end

    J --> K[Output Dataset]

Input Data Requirements

Before running a Nemotron-CC pipeline, prepare your input data as Parquet files with the required schema.

Required Schema

ColumnTypeDescription
idint64Unique document identifier. Required by the preprocessing pipeline to reassemble document segments after splitting.
textstringDocument content to transform. This is the primary input field for all Nemotron-CC stages.
bucketed_resultsint64Quality score used to route documents to appropriate pipelines. Values typically range from 0-20, where higher scores indicate higher quality content.

Quality Score Field

The bucketed_results field contains quality scores that determine which pipeline processes each document:

  • High-quality documents (bucketed_results &gt;11): Process with DiverseQA, Distill, ExtractKnowledge, or KnowledgeList tasks
  • Low-quality documents (bucketed_results &lt;= 11): Process with WikipediaParaphrasing to improve text quality

Generating Quality Scores

Use NeMo Curator’s quality assessment tools to generate quality scores before running SDG pipelines:

from nemo_curator.pipeline import Pipeline
from nemo_curator.stages.text.io.reader import JsonlReader
from nemo_curator.stages.text.io.writer import ParquetWriter
from nemo_curator.stages.text.classifiers import FineWebEduClassifier
from nemo_curator.stages.text.modules import AddId

# Create pipeline to score documents
pipeline = Pipeline(name="quality_scoring")

# Read raw documents
pipeline.add_stage(JsonlReader(file_paths="raw_data/*.jsonl", fields=["text"]))

# Add unique document IDs
pipeline.add_stage(AddId(id_field="id"))

# Score document quality (outputs int score 0-5)
pipeline.add_stage(
    FineWebEduClassifier(
        int_score_field="bucketed_results",  # Use this as quality score
    )
)

# Save as Parquet for SDG pipeline
pipeline.add_stage(ParquetWriter(path="scored_data/"))

results = pipeline.run()

Example Data

An example Parquet file with the correct schema is available in the tutorials directory:

tutorials/synthetic/nemotron_cc/example_data/data.parquet

You can inspect its structure:

import pandas as pd

df = pd.read_parquet("tutorials/synthetic/nemotron_cc/example_data/data.parquet")
print(df.columns.tolist())  # ['id', 'text', 'bucketed_results']
print(df.head(2))

Available Tasks

Nemotron-CC provides five specialized generation tasks, each designed for specific data transformation needs:

TaskStage ClassPurposeUse Case
Wikipedia ParaphrasingWikipediaParaphrasingStageRewrite text as Wikipedia-style proseImproving noisy web data
Diverse QADiverseQAStageGenerate diverse Q&A pairsReading comprehension training
DistillDistillStageCreate condensed, informative paraphrasesKnowledge distillation
Extract KnowledgeExtractKnowledgeStageExtract factual content as passagesKnowledge base creation
Knowledge ListKnowledgeListStageExtract structured fact listsFact extraction

Quality-Based Processing Strategy

Nemotron-CC pipelines are designed to process data based on quality scores. The typical approach:

High-Quality Data Pipeline

For documents with high quality scores, use tasks that leverage the existing quality:

  • DiverseQA: Generate Q&A pairs from well-structured content
  • Distill: Create condensed versions preserving key information
  • ExtractKnowledge: Extract factual passages
  • KnowledgeList: Extract structured facts
from nemo_curator.stages.text.filters import Filter

# Filter for high-quality documents (score &gt;11)
pipeline.add_stage(
    Filter(
        filter_fn=lambda x: int(x) &gt;11,
        filter_field="bucketed_results",
    ),
)

Low-Quality Data Pipeline

For documents with lower quality scores, use Wikipedia Paraphrasing to improve text quality:

# Filter for low-quality documents (score &lt;= 11)
pipeline.add_stage(
    Filter(
        filter_fn=lambda x: int(x) &lt;= 11,
        filter_field="bucketed_results",
    ),
)

Using Helper Functions

The recommended approach is to use the helper functions in nemotron_cc_pipelines.py:

from nemotron_cc_pipelines import (
    add_preprocessing_pipeline,
    add_diverse_qa_postprocessing_pipeline,
)
from nemo_curator.pipeline import Pipeline
from nemo_curator.stages.synthetic.nemotron_cc.nemotron_cc import DiverseQAStage

pipeline = Pipeline(name="diverse_qa_pipeline")

# Add preprocessing
pipeline = add_preprocessing_pipeline(
    pipeline=pipeline,
    text_field="text",
    system_prompt=SYSTEM_PROMPT,
    user_prompt_template=PROMPT_TEMPLATE,
    min_document_tokens=30,
    min_segment_tokens=30,
    max_input_tokens=1000,
    args=args,  # Contains tokenizer config
)

# Add generation stage
pipeline.add_stage(
    DiverseQAStage(
        client=llm_client,
        model_name="meta/llama-3.3-70b-instruct",
        generation_config=generation_config,
        input_field="text",
        output_field="diverse_qa",
    )
)

# Add postprocessing
pipeline = add_diverse_qa_postprocessing_pipeline(
    pipeline=pipeline,
    llm_response_field="diverse_qa",
    args=args,
)

Task Configuration

Each task has specific token count and preprocessing requirements:

TaskMin Doc TokensMin Segment TokensMax Input TokensMax Output Tokens
Diverse QA30301000600
Distill301020001600
Extract Knowledge303014001400
Knowledge List30301000600
Wikipedia Paraphrasing55512512

Quick Example

import os
from transformers import AutoTokenizer
from nemo_curator.core.client import RayClient
from nemo_curator.backends.xenna import XennaExecutor
from nemo_curator.models.client import AsyncOpenAIClient
from nemo_curator.models.client.llm_client import GenerationConfig
from nemo_curator.pipeline import Pipeline
from nemo_curator.stages.synthetic.nemotron_cc.nemotron_cc import DiverseQAStage
from nemo_curator.stages.text.io.reader.parquet import ParquetReader
from nemo_curator.stages.text.io.writer.parquet import ParquetWriter

# Initialize
client = RayClient(include_dashboard=False)
client.start()
tokenizer = AutoTokenizer.from_pretrained("meta-llama/Llama-3.3-70B-Instruct")

# Create LLM client
llm_client = AsyncOpenAIClient(
    api_key=os.environ["NVIDIA_API_KEY"],
    base_url="https://integrate.api.nvidia.com/v1",
    max_concurrent_requests=5,
)

# Build pipeline (see "Using Helper Functions" section for preprocessing/postprocessing)
pipeline = Pipeline(name="nemotron_cc_diverse_qa")
pipeline.add_stage(ParquetReader(file_paths=["./input_data/*.parquet"]))

# Add preprocessing stages using helper function:
# pipeline = add_preprocessing_pipeline(pipeline, text_field="text", ...)

# Add generation stage
pipeline.add_stage(
    DiverseQAStage(
        client=llm_client,
        model_name="meta/llama-3.3-70b-instruct",
        generation_config=GenerationConfig(temperature=0.5, top_p=0.9),
        input_field="text",
        output_field="diverse_qa",
    )
)

# Add postprocessing stages using helper function:
# pipeline = add_diverse_qa_postprocessing_pipeline(pipeline, llm_response_field="diverse_qa", ...)

pipeline.add_stage(ParquetWriter(path="./output/"))

# Execute
executor = XennaExecutor()
results = pipeline.run(executor)

client.stop()

NDD-Backed Stages

All five Nemotron-CC tasks have NDD-backed equivalents that replace the AsyncOpenAIClient with NeMo Data Designer execution. These stages share the same input_field, output_field, and prompt interface, but configure the LLM through NDD’s ModelConfig and ModelProvider instead of an AsyncOpenAIClient.

Import the NDD-backed stages from nemo_curator.stages.synthetic.nemotron_cc.nemo_data_designer.nemotron_cc:

import os

import data_designer.config as dd

from nemo_curator.pipeline import Pipeline
from nemo_curator.stages.synthetic.nemotron_cc.nemo_data_designer.nemotron_cc import DiverseQAStage

model_config = dd.ModelConfig(
    alias="meta/llama-3.3-70b-instruct",
    model="meta/llama-3.3-70b-instruct",
    provider="nvidia",
    inference_parameters=dd.ChatCompletionInferenceParams(
        temperature=0.5, top_p=0.9, max_tokens=600,
    ),
)

model_provider = dd.ModelProvider(
    name="nvidia",
    endpoint="https://integrate.api.nvidia.com/v1",
    provider_type="openai",
    api_key=os.environ["NVIDIA_API_KEY"],
)

pipeline = Pipeline(name="nemotron_cc_ndd_diverse_qa")
pipeline.add_stage(
    DiverseQAStage(
        input_field="text",
        output_field="diverse_qa",
        model_alias="meta/llama-3.3-70b-instruct",
        model_configs=[model_config],
        model_providers=[model_provider],
    )
)

The NDD backend provides automatic token metric collection and supports both local InferenceServer and remote NVIDIA NIM endpoints. See the NeMo Data Designer guide for full configuration details.


Detailed Reference