203 lines
6.3 KiB
Python
203 lines
6.3 KiB
Python
"""
|
|
Video processing utilities for frame extraction.
|
|
"""
|
|
|
|
import os
|
|
import cv2
|
|
import numpy as np
|
|
from pathlib import Path
|
|
from typing import Generator, Tuple, Optional, List
|
|
from tqdm import tqdm
|
|
|
|
|
|
class VideoProcessor:
|
|
"""Extract frames from video files for annotation."""
|
|
|
|
def __init__(self, video_path: str):
|
|
"""
|
|
Initialize video processor.
|
|
|
|
Args:
|
|
video_path: Path to the video file
|
|
"""
|
|
self.video_path = Path(video_path)
|
|
if not self.video_path.exists():
|
|
raise FileNotFoundError(f"Video not found: {video_path}")
|
|
|
|
self.cap = cv2.VideoCapture(str(self.video_path))
|
|
if not self.cap.isOpened():
|
|
raise ValueError(f"Cannot open video: {video_path}")
|
|
|
|
self.fps = self.cap.get(cv2.CAP_PROP_FPS)
|
|
self.frame_count = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT))
|
|
self.width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
|
|
self.height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
|
self.duration = self.frame_count / self.fps if self.fps > 0 else 0
|
|
|
|
def __del__(self):
|
|
if hasattr(self, 'cap') and self.cap is not None:
|
|
self.cap.release()
|
|
|
|
def get_info(self) -> dict:
|
|
"""Get video information."""
|
|
return {
|
|
'path': str(self.video_path),
|
|
'fps': self.fps,
|
|
'frame_count': self.frame_count,
|
|
'width': self.width,
|
|
'height': self.height,
|
|
'duration_seconds': self.duration
|
|
}
|
|
|
|
def extract_frames(
|
|
self,
|
|
output_dir: str,
|
|
sample_fps: Optional[float] = None,
|
|
max_frames: Optional[int] = None,
|
|
start_time: float = 0.0,
|
|
end_time: Optional[float] = None,
|
|
resize: Optional[Tuple[int, int]] = None
|
|
) -> List[str]:
|
|
"""
|
|
Extract frames from video and save to directory.
|
|
|
|
Args:
|
|
output_dir: Directory to save extracted frames
|
|
sample_fps: Target FPS for sampling (None = use all frames)
|
|
max_frames: Maximum number of frames to extract
|
|
start_time: Start time in seconds
|
|
end_time: End time in seconds (None = until end)
|
|
resize: Resize frames to (width, height)
|
|
|
|
Returns:
|
|
List of saved frame paths
|
|
"""
|
|
output_path = Path(output_dir)
|
|
output_path.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Calculate frame interval for sampling
|
|
if sample_fps and sample_fps < self.fps:
|
|
frame_interval = int(self.fps / sample_fps)
|
|
else:
|
|
frame_interval = 1
|
|
|
|
# Calculate frame range
|
|
start_frame = int(start_time * self.fps)
|
|
end_frame = int(end_time * self.fps) if end_time else self.frame_count
|
|
end_frame = min(end_frame, self.frame_count)
|
|
|
|
# Reset video position
|
|
self.cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame)
|
|
|
|
saved_paths = []
|
|
frame_idx = start_frame
|
|
extracted_count = 0
|
|
|
|
pbar = tqdm(total=min((end_frame - start_frame) // frame_interval, max_frames or float('inf')),
|
|
desc="Extracting frames")
|
|
|
|
while frame_idx < end_frame:
|
|
if max_frames and extracted_count >= max_frames:
|
|
break
|
|
|
|
ret, frame = self.cap.read()
|
|
if not ret:
|
|
break
|
|
|
|
if (frame_idx - start_frame) % frame_interval == 0:
|
|
if resize:
|
|
frame = cv2.resize(frame, resize)
|
|
|
|
# Save frame with zero-padded index
|
|
frame_name = f"frame_{frame_idx:06d}.jpg"
|
|
frame_path = output_path / frame_name
|
|
cv2.imwrite(str(frame_path), frame)
|
|
saved_paths.append(str(frame_path))
|
|
extracted_count += 1
|
|
pbar.update(1)
|
|
|
|
frame_idx += 1
|
|
|
|
pbar.close()
|
|
print(f"Extracted {len(saved_paths)} frames to {output_dir}")
|
|
return saved_paths
|
|
|
|
def iterate_frames(
|
|
self,
|
|
sample_fps: Optional[float] = None,
|
|
start_time: float = 0.0,
|
|
end_time: Optional[float] = None
|
|
) -> Generator[Tuple[int, np.ndarray], None, None]:
|
|
"""
|
|
Iterate through video frames as a generator.
|
|
|
|
Args:
|
|
sample_fps: Target FPS for sampling
|
|
start_time: Start time in seconds
|
|
end_time: End time in seconds
|
|
|
|
Yields:
|
|
Tuple of (frame_index, frame_array)
|
|
"""
|
|
if sample_fps and sample_fps < self.fps:
|
|
frame_interval = int(self.fps / sample_fps)
|
|
else:
|
|
frame_interval = 1
|
|
|
|
start_frame = int(start_time * self.fps)
|
|
end_frame = int(end_time * self.fps) if end_time else self.frame_count
|
|
|
|
self.cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame)
|
|
|
|
frame_idx = start_frame
|
|
while frame_idx < end_frame:
|
|
ret, frame = self.cap.read()
|
|
if not ret:
|
|
break
|
|
|
|
if (frame_idx - start_frame) % frame_interval == 0:
|
|
yield frame_idx, frame
|
|
|
|
frame_idx += 1
|
|
|
|
|
|
def frames_to_video(
|
|
frames_dir: str,
|
|
output_path: str,
|
|
fps: float = 30.0,
|
|
codec: str = 'mp4v'
|
|
) -> str:
|
|
"""
|
|
Convert frames directory back to video.
|
|
|
|
Args:
|
|
frames_dir: Directory containing frame images
|
|
output_path: Output video path
|
|
fps: Frames per second
|
|
codec: Video codec
|
|
|
|
Returns:
|
|
Path to created video
|
|
"""
|
|
frames_path = Path(frames_dir)
|
|
frame_files = sorted(frames_path.glob("*.jpg")) + sorted(frames_path.glob("*.png"))
|
|
|
|
if not frame_files:
|
|
raise ValueError(f"No frames found in {frames_dir}")
|
|
|
|
# Read first frame to get dimensions
|
|
first_frame = cv2.imread(str(frame_files[0]))
|
|
height, width = first_frame.shape[:2]
|
|
|
|
# Create video writer
|
|
fourcc = cv2.VideoWriter_fourcc(*codec)
|
|
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
|
|
|
|
for frame_file in tqdm(frame_files, desc="Creating video"):
|
|
frame = cv2.imread(str(frame_file))
|
|
out.write(frame)
|
|
|
|
out.release()
|
|
print(f"Video saved to {output_path}")
|
|
return output_path
|