Files
2026-02-04 15:29:36 +07:00

203 lines
6.3 KiB
Python

"""
Video processing utilities for frame extraction.
"""
import os
import cv2
import numpy as np
from pathlib import Path
from typing import Generator, Tuple, Optional, List
from tqdm import tqdm
class VideoProcessor:
"""Extract frames from video files for annotation."""
def __init__(self, video_path: str):
"""
Initialize video processor.
Args:
video_path: Path to the video file
"""
self.video_path = Path(video_path)
if not self.video_path.exists():
raise FileNotFoundError(f"Video not found: {video_path}")
self.cap = cv2.VideoCapture(str(self.video_path))
if not self.cap.isOpened():
raise ValueError(f"Cannot open video: {video_path}")
self.fps = self.cap.get(cv2.CAP_PROP_FPS)
self.frame_count = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT))
self.width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
self.height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
self.duration = self.frame_count / self.fps if self.fps > 0 else 0
def __del__(self):
if hasattr(self, 'cap') and self.cap is not None:
self.cap.release()
def get_info(self) -> dict:
"""Get video information."""
return {
'path': str(self.video_path),
'fps': self.fps,
'frame_count': self.frame_count,
'width': self.width,
'height': self.height,
'duration_seconds': self.duration
}
def extract_frames(
self,
output_dir: str,
sample_fps: Optional[float] = None,
max_frames: Optional[int] = None,
start_time: float = 0.0,
end_time: Optional[float] = None,
resize: Optional[Tuple[int, int]] = None
) -> List[str]:
"""
Extract frames from video and save to directory.
Args:
output_dir: Directory to save extracted frames
sample_fps: Target FPS for sampling (None = use all frames)
max_frames: Maximum number of frames to extract
start_time: Start time in seconds
end_time: End time in seconds (None = until end)
resize: Resize frames to (width, height)
Returns:
List of saved frame paths
"""
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
# Calculate frame interval for sampling
if sample_fps and sample_fps < self.fps:
frame_interval = int(self.fps / sample_fps)
else:
frame_interval = 1
# Calculate frame range
start_frame = int(start_time * self.fps)
end_frame = int(end_time * self.fps) if end_time else self.frame_count
end_frame = min(end_frame, self.frame_count)
# Reset video position
self.cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame)
saved_paths = []
frame_idx = start_frame
extracted_count = 0
pbar = tqdm(total=min((end_frame - start_frame) // frame_interval, max_frames or float('inf')),
desc="Extracting frames")
while frame_idx < end_frame:
if max_frames and extracted_count >= max_frames:
break
ret, frame = self.cap.read()
if not ret:
break
if (frame_idx - start_frame) % frame_interval == 0:
if resize:
frame = cv2.resize(frame, resize)
# Save frame with zero-padded index
frame_name = f"frame_{frame_idx:06d}.jpg"
frame_path = output_path / frame_name
cv2.imwrite(str(frame_path), frame)
saved_paths.append(str(frame_path))
extracted_count += 1
pbar.update(1)
frame_idx += 1
pbar.close()
print(f"Extracted {len(saved_paths)} frames to {output_dir}")
return saved_paths
def iterate_frames(
self,
sample_fps: Optional[float] = None,
start_time: float = 0.0,
end_time: Optional[float] = None
) -> Generator[Tuple[int, np.ndarray], None, None]:
"""
Iterate through video frames as a generator.
Args:
sample_fps: Target FPS for sampling
start_time: Start time in seconds
end_time: End time in seconds
Yields:
Tuple of (frame_index, frame_array)
"""
if sample_fps and sample_fps < self.fps:
frame_interval = int(self.fps / sample_fps)
else:
frame_interval = 1
start_frame = int(start_time * self.fps)
end_frame = int(end_time * self.fps) if end_time else self.frame_count
self.cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame)
frame_idx = start_frame
while frame_idx < end_frame:
ret, frame = self.cap.read()
if not ret:
break
if (frame_idx - start_frame) % frame_interval == 0:
yield frame_idx, frame
frame_idx += 1
def frames_to_video(
frames_dir: str,
output_path: str,
fps: float = 30.0,
codec: str = 'mp4v'
) -> str:
"""
Convert frames directory back to video.
Args:
frames_dir: Directory containing frame images
output_path: Output video path
fps: Frames per second
codec: Video codec
Returns:
Path to created video
"""
frames_path = Path(frames_dir)
frame_files = sorted(frames_path.glob("*.jpg")) + sorted(frames_path.glob("*.png"))
if not frame_files:
raise ValueError(f"No frames found in {frames_dir}")
# Read first frame to get dimensions
first_frame = cv2.imread(str(frame_files[0]))
height, width = first_frame.shape[:2]
# Create video writer
fourcc = cv2.VideoWriter_fourcc(*codec)
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
for frame_file in tqdm(frame_files, desc="Creating video"):
frame = cv2.imread(str(frame_file))
out.write(frame)
out.release()
print(f"Video saved to {output_path}")
return output_path