Nemotron-CC Pipelines
Nemotron-CC provides advanced synthetic data generation workflows for transforming and extracting knowledge from existing text documents. Unlike simple generation, these pipelines use sophisticated preprocessing, LLM-based transformation, and postprocessing to create high-quality training data.
The Composable Pipeline Pattern
Nemotron-CC stages follow a composable pattern with three distinct phases:
- Preprocessing: Segment documents, filter by length, and prepare inputs for the LLM
- Generation: Apply task-specific prompts to transform text using the LLM
- Postprocessing: Clean outputs, remove formatting artifacts, and filter low-quality results
This separation enables fine-grained control over each phase while providing reusable helper functions for common patterns.
Pipeline Architecture
flowchart TB
subgraph "Preprocessing"
A[Input Documents] --> B[Token Count Filter]
B --> C[Document Splitter]
C --> D[Segment Filter]
D --> E[Document Joiner]
end
subgraph "LLM Generation"
E --> F[Task-Specific Stage<br/>WikiPara/DiverseQA/Distill/etc.]
end
subgraph "Postprocessing"
F --> G[Token Count Filter]
G --> H[Markdown Remover]
H --> I[Task-Specific Cleanup]
I --> J[Quality Filter]
end
J --> K[Output Dataset]
Input Data Requirements
Before running a Nemotron-CC pipeline, prepare your input data as Parquet files with the required schema.
Required Schema
| Column | Type | Description |
|---|---|---|
id | int64 | Unique document identifier. Required by the preprocessing pipeline to reassemble document segments after splitting. |
text | string | Document content to transform. This is the primary input field for all Nemotron-CC stages. |
bucketed_results | int64 | Quality score used to route documents to appropriate pipelines. Values typically range from 0-20, where higher scores indicate higher quality content. |
Quality Score Field
The bucketed_results field contains quality scores that determine which pipeline processes each document:
- High-quality documents (
bucketed_results >11): Process with DiverseQA, Distill, ExtractKnowledge, or KnowledgeList tasks - Low-quality documents (
bucketed_results <= 11): Process with WikipediaParaphrasing to improve text quality
Generating Quality Scores
Use NeMo Curator’s quality assessment tools to generate quality scores before running SDG pipelines:
from nemo_curator.pipeline import Pipeline
from nemo_curator.stages.text.io.reader import JsonlReader
from nemo_curator.stages.text.io.writer import ParquetWriter
from nemo_curator.stages.text.classifiers import FineWebEduClassifier
from nemo_curator.stages.text.modules import AddId
# Create pipeline to score documents
pipeline = Pipeline(name="quality_scoring")
# Read raw documents
pipeline.add_stage(JsonlReader(file_paths="raw_data/*.jsonl", fields=["text"]))
# Add unique document IDs
pipeline.add_stage(AddId(id_field="id"))
# Score document quality (outputs int score 0-5)
pipeline.add_stage(
FineWebEduClassifier(
int_score_field="bucketed_results", # Use this as quality score
)
)
# Save as Parquet for SDG pipeline
pipeline.add_stage(ParquetWriter(path="scored_data/"))
results = pipeline.run()
Example Data
An example Parquet file with the correct schema is available in the tutorials directory:
tutorials/synthetic/nemotron_cc/example_data/data.parquet
You can inspect its structure:
import pandas as pd
df = pd.read_parquet("tutorials/synthetic/nemotron_cc/example_data/data.parquet")
print(df.columns.tolist()) # ['id', 'text', 'bucketed_results']
print(df.head(2))
Available Tasks
Nemotron-CC provides five specialized generation tasks, each designed for specific data transformation needs:
| Task | Stage Class | Purpose | Use Case |
|---|---|---|---|
| Wikipedia Paraphrasing | WikipediaParaphrasingStage | Rewrite text as Wikipedia-style prose | Improving noisy web data |
| Diverse QA | DiverseQAStage | Generate diverse Q&A pairs | Reading comprehension training |
| Distill | DistillStage | Create condensed, informative paraphrases | Knowledge distillation |
| Extract Knowledge | ExtractKnowledgeStage | Extract factual content as passages | Knowledge base creation |
| Knowledge List | KnowledgeListStage | Extract structured fact lists | Fact extraction |
Quality-Based Processing Strategy
Nemotron-CC pipelines are designed to process data based on quality scores. The typical approach:
High-Quality Data Pipeline
For documents with high quality scores, use tasks that leverage the existing quality:
- DiverseQA: Generate Q&A pairs from well-structured content
- Distill: Create condensed versions preserving key information
- ExtractKnowledge: Extract factual passages
- KnowledgeList: Extract structured facts
from nemo_curator.stages.text.filters import Filter
# Filter for high-quality documents (score >11)
pipeline.add_stage(
Filter(
filter_fn=lambda x: int(x) >11,
filter_field="bucketed_results",
),
)
Low-Quality Data Pipeline
For documents with lower quality scores, use Wikipedia Paraphrasing to improve text quality:
# Filter for low-quality documents (score <= 11)
pipeline.add_stage(
Filter(
filter_fn=lambda x: int(x) <= 11,
filter_field="bucketed_results",
),
)
Using Helper Functions
The recommended approach is to use the helper functions in nemotron_cc_pipelines.py:
from nemotron_cc_pipelines import (
add_preprocessing_pipeline,
add_diverse_qa_postprocessing_pipeline,
)
from nemo_curator.pipeline import Pipeline
from nemo_curator.stages.synthetic.nemotron_cc.nemotron_cc import DiverseQAStage
pipeline = Pipeline(name="diverse_qa_pipeline")
# Add preprocessing
pipeline = add_preprocessing_pipeline(
pipeline=pipeline,
text_field="text",
system_prompt=SYSTEM_PROMPT,
user_prompt_template=PROMPT_TEMPLATE,
min_document_tokens=30,
min_segment_tokens=30,
max_input_tokens=1000,
args=args, # Contains tokenizer config
)
# Add generation stage
pipeline.add_stage(
DiverseQAStage(
client=llm_client,
model_name="meta/llama-3.3-70b-instruct",
generation_config=generation_config,
input_field="text",
output_field="diverse_qa",
)
)
# Add postprocessing
pipeline = add_diverse_qa_postprocessing_pipeline(
pipeline=pipeline,
llm_response_field="diverse_qa",
args=args,
)
Task Configuration
Each task has specific token count and preprocessing requirements:
| Task | Min Doc Tokens | Min Segment Tokens | Max Input Tokens | Max Output Tokens |
|---|---|---|---|---|
| Diverse QA | 30 | 30 | 1000 | 600 |
| Distill | 30 | 10 | 2000 | 1600 |
| Extract Knowledge | 30 | 30 | 1400 | 1400 |
| Knowledge List | 30 | 30 | 1000 | 600 |
| Wikipedia Paraphrasing | 5 | 5 | 512 | 512 |
Quick Example
import os
from transformers import AutoTokenizer
from nemo_curator.core.client import RayClient
from nemo_curator.backends.xenna import XennaExecutor
from nemo_curator.models.client import AsyncOpenAIClient
from nemo_curator.models.client.llm_client import GenerationConfig
from nemo_curator.pipeline import Pipeline
from nemo_curator.stages.synthetic.nemotron_cc.nemotron_cc import DiverseQAStage
from nemo_curator.stages.text.io.reader.parquet import ParquetReader
from nemo_curator.stages.text.io.writer.parquet import ParquetWriter
# Initialize
client = RayClient(include_dashboard=False)
client.start()
tokenizer = AutoTokenizer.from_pretrained("meta-llama/Llama-3.3-70B-Instruct")
# Create LLM client
llm_client = AsyncOpenAIClient(
api_key=os.environ["NVIDIA_API_KEY"],
base_url="https://integrate.api.nvidia.com/v1",
max_concurrent_requests=5,
)
# Build pipeline (see "Using Helper Functions" section for preprocessing/postprocessing)
pipeline = Pipeline(name="nemotron_cc_diverse_qa")
pipeline.add_stage(ParquetReader(file_paths=["./input_data/*.parquet"]))
# Add preprocessing stages using helper function:
# pipeline = add_preprocessing_pipeline(pipeline, text_field="text", ...)
# Add generation stage
pipeline.add_stage(
DiverseQAStage(
client=llm_client,
model_name="meta/llama-3.3-70b-instruct",
generation_config=GenerationConfig(temperature=0.5, top_p=0.9),
input_field="text",
output_field="diverse_qa",
)
)
# Add postprocessing stages using helper function:
# pipeline = add_diverse_qa_postprocessing_pipeline(pipeline, llm_response_field="diverse_qa", ...)
pipeline.add_stage(ParquetWriter(path="./output/"))
# Execute
executor = XennaExecutor()
results = pipeline.run(executor)
client.stop()
NDD-Backed Stages
All five Nemotron-CC tasks have NDD-backed equivalents that replace the AsyncOpenAIClient with NeMo Data Designer execution. These stages share the same input_field, output_field, and prompt interface, but configure the LLM through NDD’s ModelConfig and ModelProvider instead of an AsyncOpenAIClient.
Import the NDD-backed stages from nemo_curator.stages.synthetic.nemotron_cc.nemo_data_designer.nemotron_cc:
import os
import data_designer.config as dd
from nemo_curator.pipeline import Pipeline
from nemo_curator.stages.synthetic.nemotron_cc.nemo_data_designer.nemotron_cc import DiverseQAStage
model_config = dd.ModelConfig(
alias="meta/llama-3.3-70b-instruct",
model="meta/llama-3.3-70b-instruct",
provider="nvidia",
inference_parameters=dd.ChatCompletionInferenceParams(
temperature=0.5, top_p=0.9, max_tokens=600,
),
)
model_provider = dd.ModelProvider(
name="nvidia",
endpoint="https://integrate.api.nvidia.com/v1",
provider_type="openai",
api_key=os.environ["NVIDIA_API_KEY"],
)
pipeline = Pipeline(name="nemotron_cc_ndd_diverse_qa")
pipeline.add_stage(
DiverseQAStage(
input_field="text",
output_field="diverse_qa",
model_alias="meta/llama-3.3-70b-instruct",
model_configs=[model_config],
model_providers=[model_provider],
)
)
The NDD backend provides automatic token metric collection and supports both local InferenceServer and remote NVIDIA NIM endpoints. See the NeMo Data Designer guide for full configuration details.