VideoTask is the task type for video processing in NeMo Curator.
Import
from nemo_curator.tasks import VideoTask
Class Definition
from dataclasses import dataclass
from nemo_curator.tasks.video import Video
@dataclass
class VideoTask(Task[Video]):
"""Task containing a single video for processing.
Attributes:
task_id: Unique identifier for this task.
dataset_name: Name of the source dataset.
data: Video object with path and metadata.
"""
task_id: str
dataset_name: str
data: Video
Video Class
The video data is represented by a Video object:
@dataclass
class Video:
"""Represents a video file with metadata.
Attributes:
path: Path to the video file.
start_time: Start time in seconds (for clips).
end_time: End time in seconds (for clips).
metadata: Additional metadata dictionary.
embeddings: Optional embedding vector.
"""
path: str
start_time: float | None = None
end_time: float | None = None
metadata: dict[str, Any] = field(default_factory=dict)
embeddings: np.ndarray | None = None
Properties
num_items
Get the number of items (always 1 for VideoTask).
@property
def num_items(self) -> int:
"""Returns 1 (VideoTask represents a single video)."""
Creating VideoTask
from nemo_curator.tasks import VideoTask
from nemo_curator.tasks.video import Video
# Create a video object
video = Video(
path="/data/videos/video1.mp4",
start_time=0.0,
end_time=30.0,
metadata={
"duration": 120.5,
"fps": 30,
"resolution": "1920x1080",
},
)
# Create task
task = VideoTask(
task_id="video_001",
dataset_name="video_dataset",
data=video,
)
Usage in Stages
from dataclasses import dataclass
from nemo_curator.stages.base import ProcessingStage
from nemo_curator.tasks import VideoTask
@dataclass
class VideoFilterStage(ProcessingStage[VideoTask, VideoTask]):
"""Filter videos based on duration."""
name: str = "VideoFilter"
min_duration: float = 5.0
max_duration: float = 300.0
def inputs(self) -> tuple[list[str], list[str]]:
return ["data"], []
def outputs(self) -> tuple[list[str], list[str]]:
return ["data"], []
def process(self, task: VideoTask) -> VideoTask | None:
video = task.data
duration = video.metadata.get("duration", 0)
if not (self.min_duration <= duration <= self.max_duration):
return None
return VideoTask(
task_id=f"{task.task_id}_filtered",
dataset_name=task.dataset_name,
data=video,
_metadata=task._metadata,
_stage_perf=task._stage_perf,
)
Common Operations
Splitting Videos into Clips
The video splitting stages return multiple VideoTask objects:
def process(self, task: VideoTask) -> list[VideoTask]:
clips = []
video = task.data
for i, (start, end) in enumerate(self._compute_splits(video)):
clip_video = Video(
path=video.path,
start_time=start,
end_time=end,
metadata=video.metadata.copy(),
)
clips.append(VideoTask(
task_id=f"{task.task_id}_clip_{i}",
dataset_name=task.dataset_name,
data=clip_video,
_metadata=task._metadata,
_stage_perf=task._stage_perf,
))
return clips
Adding Embeddings
def process(self, task: VideoTask) -> VideoTask:
video = task.data
video.embeddings = self.encoder.encode(video.path)
return VideoTask(
task_id=f"{task.task_id}_{self.name}",
dataset_name=task.dataset_name,
data=video,
_metadata=task._metadata,
_stage_perf=task._stage_perf,
)