""" MP4 video file source. """ import cv2 import time import logging import numpy as np from pathlib import Path from typing import Tuple, Optional from dataclasses import dataclass logger = logging.getLogger(__name__) @dataclass class VideoInfo: """Video metadata.""" path: str fps: float frame_count: int width: int height: int duration: float class MP4Source: """ MP4 video file source that acts as a camera feed. Features: - FPS limiting - Loop playback - Frame resize """ def __init__( self, source: str, fps_limit: Optional[float] = None, loop: bool = True, resize: Optional[Tuple[int, int]] = None, ): """ Initialize MP4 source. Args: source: Path to MP4 file fps_limit: Maximum FPS to output loop: Loop video when finished resize: Resize frames to (width, height) """ self.source = source self.fps_limit = fps_limit self.loop = loop self.resize = resize self.cap: Optional[cv2.VideoCapture] = None self.info: Optional[VideoInfo] = None self._frame_time = 0 self._last_frame_time = 0 def open(self) -> VideoInfo: """Open video file and return info.""" path = Path(self.source) if not path.exists(): raise FileNotFoundError(f"Video file not found: {self.source}") self.cap = cv2.VideoCapture(str(path)) if not self.cap.isOpened(): raise ValueError(f"Cannot open video: {self.source}") fps = self.cap.get(cv2.CAP_PROP_FPS) frame_count = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT)) width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) self.info = VideoInfo( path=str(path), fps=fps, frame_count=frame_count, width=width, height=height, duration=frame_count / fps if fps > 0 else 0, ) # Calculate frame time for FPS limiting if self.fps_limit: self._frame_time = 1.0 / self.fps_limit else: self._frame_time = 1.0 / fps if fps > 0 else 0 logger.info(f"Opened video: {path.name} ({width}x{height} @ {fps:.1f}fps)") return self.info def read(self) -> Tuple[bool, Optional[np.ndarray], int]: """ Read next frame with FPS limiting. Returns: Tuple of (success, frame, frame_index) """ if self.cap is None: self.open() # FPS limiting if self._frame_time > 0: elapsed = time.time() - self._last_frame_time if elapsed < self._frame_time: time.sleep(self._frame_time - elapsed) self._last_frame_time = time.time() # Read frame ret, frame = self.cap.read() frame_idx = int(self.cap.get(cv2.CAP_PROP_POS_FRAMES)) - 1 if not ret: if self.loop: self.cap.set(cv2.CAP_PROP_POS_FRAMES, 0) ret, frame = self.cap.read() frame_idx = 0 logger.debug("Video looped") else: return False, None, frame_idx # Resize if needed if ret and self.resize: frame = cv2.resize(frame, self.resize) return ret, frame, frame_idx def close(self) -> None: """Close video file.""" if self.cap is not None: self.cap.release() self.cap = None logger.debug(f"Closed video: {self.source}") def seek(self, frame_idx: int) -> bool: """Seek to specific frame.""" if self.cap is None: return False self.cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx) return True def seek_time(self, seconds: float) -> bool: """Seek to specific time in seconds.""" if self.cap is None or self.info is None: return False frame_idx = int(seconds * self.info.fps) return self.seek(frame_idx) @property def current_frame(self) -> int: """Get current frame index.""" if self.cap is None: return 0 return int(self.cap.get(cv2.CAP_PROP_POS_FRAMES)) @property def current_time(self) -> float: """Get current time in seconds.""" if self.cap is None or self.info is None: return 0.0 return self.current_frame / self.info.fps def __enter__(self): self.open() return self def __exit__(self, exc_type, exc_val, exc_tb): self.close()