Files
2026-02-04 15:29:36 +07:00

258 lines
7.4 KiB
Python

"""
Video source handler for MP4 and RTSP streams.
"""
import cv2
import numpy as np
from pathlib import Path
from typing import Generator, Tuple, Optional, Dict, Any
from dataclasses import dataclass
from threading import Thread, Lock
from queue import Queue
import time
@dataclass
class VideoInfo:
"""Video metadata."""
path: str
fps: float
frame_count: int
width: int
height: int
duration: float
codec: str = ""
class VideoSource:
"""Video source handler supporting MP4 files."""
def __init__(
self,
source: str,
fps_limit: Optional[float] = None,
loop: bool = False,
resize: Optional[Tuple[int, int]] = None,
):
"""
Initialize video source.
Args:
source: Path to MP4 file
fps_limit: Maximum FPS to process
loop: Loop video when finished
resize: Resize frames to (width, height)
"""
self.source = source
self.fps_limit = fps_limit
self.loop = loop
self.resize = resize
self.cap: Optional[cv2.VideoCapture] = None
self.info: Optional[VideoInfo] = None
self._frame_interval = 0
self._is_running = False
def open(self) -> VideoInfo:
"""Open video source and return info."""
path = Path(self.source)
if not path.exists():
raise FileNotFoundError(f"Video not found: {self.source}")
self.cap = cv2.VideoCapture(str(path))
if not self.cap.isOpened():
raise ValueError(f"Cannot open video: {self.source}")
fps = self.cap.get(cv2.CAP_PROP_FPS)
frame_count = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT))
width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
# Calculate frame interval for FPS limiting
if self.fps_limit and self.fps_limit < fps:
self._frame_interval = int(fps / self.fps_limit)
else:
self._frame_interval = 1
self.info = VideoInfo(
path=str(path),
fps=fps,
frame_count=frame_count,
width=width,
height=height,
duration=frame_count / fps if fps > 0 else 0,
)
self._is_running = True
return self.info
def close(self) -> None:
"""Close video source."""
self._is_running = False
if self.cap is not None:
self.cap.release()
self.cap = None
def read(self) -> Tuple[bool, Optional[np.ndarray], int]:
"""
Read next frame.
Returns:
Tuple of (success, frame, frame_index)
"""
if self.cap is None:
self.open()
ret, frame = self.cap.read()
frame_idx = int(self.cap.get(cv2.CAP_PROP_POS_FRAMES)) - 1
if not ret:
if self.loop:
self.cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
ret, frame = self.cap.read()
frame_idx = 0
else:
return False, None, frame_idx
if ret and self.resize:
frame = cv2.resize(frame, self.resize)
return ret, frame, frame_idx
def iterate(
self,
start_time: float = 0,
end_time: Optional[float] = None,
max_frames: Optional[int] = None,
) -> Generator[Tuple[int, np.ndarray, float], None, None]:
"""
Iterate through video frames.
Args:
start_time: Start time in seconds
end_time: End time in seconds
max_frames: Maximum frames to yield
Yields:
Tuple of (frame_index, frame, timestamp)
"""
if self.cap is None:
self.open()
fps = self.info.fps
start_frame = int(start_time * fps)
end_frame = int(end_time * fps) if end_time else self.info.frame_count
self.cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame)
frame_idx = start_frame
yielded = 0
while frame_idx < end_frame and self._is_running:
if max_frames and yielded >= max_frames:
break
ret, frame = self.cap.read()
if not ret:
if self.loop:
self.cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame)
frame_idx = start_frame
continue
else:
break
if (frame_idx - start_frame) % self._frame_interval == 0:
if self.resize:
frame = cv2.resize(frame, self.resize)
timestamp = frame_idx / fps
yield frame_idx, frame, timestamp
yielded += 1
frame_idx += 1
def __enter__(self):
self.open()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
class BufferedVideoSource(VideoSource):
"""Video source with frame buffering for smoother processing."""
def __init__(
self,
source: str,
buffer_size: int = 10,
**kwargs,
):
"""
Initialize buffered video source.
Args:
source: Path to video
buffer_size: Number of frames to buffer
**kwargs: Additional VideoSource arguments
"""
super().__init__(source, **kwargs)
self.buffer_size = buffer_size
self._buffer: Queue = Queue(maxsize=buffer_size)
self._thread: Optional[Thread] = None
self._lock = Lock()
def _reader_thread(self) -> None:
"""Background thread to read frames into buffer."""
while self._is_running:
if self._buffer.full():
time.sleep(0.001)
continue
with self._lock:
if self.cap is None:
break
ret, frame = self.cap.read()
frame_idx = int(self.cap.get(cv2.CAP_PROP_POS_FRAMES)) - 1
if not ret:
if self.loop:
with self._lock:
self.cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
continue
else:
self._buffer.put((False, None, -1))
break
if self.resize:
frame = cv2.resize(frame, self.resize)
self._buffer.put((True, frame, frame_idx))
def open(self) -> VideoInfo:
"""Open video and start buffer thread."""
info = super().open()
# Start reader thread
self._thread = Thread(target=self._reader_thread, daemon=True)
self._thread.start()
return info
def close(self) -> None:
"""Stop buffer thread and close video."""
self._is_running = False
if self._thread is not None:
self._thread.join(timeout=1.0)
super().close()
def read(self) -> Tuple[bool, Optional[np.ndarray], int]:
"""Read frame from buffer."""
if not self._is_running:
return False, None, -1
try:
return self._buffer.get(timeout=1.0)
except:
return False, None, -1