258 lines
7.4 KiB
Python
258 lines
7.4 KiB
Python
"""
|
|
Video source handler for MP4 and RTSP streams.
|
|
"""
|
|
|
|
import cv2
|
|
import numpy as np
|
|
from pathlib import Path
|
|
from typing import Generator, Tuple, Optional, Dict, Any
|
|
from dataclasses import dataclass
|
|
from threading import Thread, Lock
|
|
from queue import Queue
|
|
import time
|
|
|
|
|
|
@dataclass
|
|
class VideoInfo:
|
|
"""Video metadata."""
|
|
path: str
|
|
fps: float
|
|
frame_count: int
|
|
width: int
|
|
height: int
|
|
duration: float
|
|
codec: str = ""
|
|
|
|
|
|
class VideoSource:
|
|
"""Video source handler supporting MP4 files."""
|
|
|
|
def __init__(
|
|
self,
|
|
source: str,
|
|
fps_limit: Optional[float] = None,
|
|
loop: bool = False,
|
|
resize: Optional[Tuple[int, int]] = None,
|
|
):
|
|
"""
|
|
Initialize video source.
|
|
|
|
Args:
|
|
source: Path to MP4 file
|
|
fps_limit: Maximum FPS to process
|
|
loop: Loop video when finished
|
|
resize: Resize frames to (width, height)
|
|
"""
|
|
self.source = source
|
|
self.fps_limit = fps_limit
|
|
self.loop = loop
|
|
self.resize = resize
|
|
|
|
self.cap: Optional[cv2.VideoCapture] = None
|
|
self.info: Optional[VideoInfo] = None
|
|
self._frame_interval = 0
|
|
self._is_running = False
|
|
|
|
def open(self) -> VideoInfo:
|
|
"""Open video source and return info."""
|
|
path = Path(self.source)
|
|
if not path.exists():
|
|
raise FileNotFoundError(f"Video not found: {self.source}")
|
|
|
|
self.cap = cv2.VideoCapture(str(path))
|
|
if not self.cap.isOpened():
|
|
raise ValueError(f"Cannot open video: {self.source}")
|
|
|
|
fps = self.cap.get(cv2.CAP_PROP_FPS)
|
|
frame_count = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT))
|
|
width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
|
|
height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
|
|
|
# Calculate frame interval for FPS limiting
|
|
if self.fps_limit and self.fps_limit < fps:
|
|
self._frame_interval = int(fps / self.fps_limit)
|
|
else:
|
|
self._frame_interval = 1
|
|
|
|
self.info = VideoInfo(
|
|
path=str(path),
|
|
fps=fps,
|
|
frame_count=frame_count,
|
|
width=width,
|
|
height=height,
|
|
duration=frame_count / fps if fps > 0 else 0,
|
|
)
|
|
|
|
self._is_running = True
|
|
return self.info
|
|
|
|
def close(self) -> None:
|
|
"""Close video source."""
|
|
self._is_running = False
|
|
if self.cap is not None:
|
|
self.cap.release()
|
|
self.cap = None
|
|
|
|
def read(self) -> Tuple[bool, Optional[np.ndarray], int]:
|
|
"""
|
|
Read next frame.
|
|
|
|
Returns:
|
|
Tuple of (success, frame, frame_index)
|
|
"""
|
|
if self.cap is None:
|
|
self.open()
|
|
|
|
ret, frame = self.cap.read()
|
|
frame_idx = int(self.cap.get(cv2.CAP_PROP_POS_FRAMES)) - 1
|
|
|
|
if not ret:
|
|
if self.loop:
|
|
self.cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
|
|
ret, frame = self.cap.read()
|
|
frame_idx = 0
|
|
else:
|
|
return False, None, frame_idx
|
|
|
|
if ret and self.resize:
|
|
frame = cv2.resize(frame, self.resize)
|
|
|
|
return ret, frame, frame_idx
|
|
|
|
def iterate(
|
|
self,
|
|
start_time: float = 0,
|
|
end_time: Optional[float] = None,
|
|
max_frames: Optional[int] = None,
|
|
) -> Generator[Tuple[int, np.ndarray, float], None, None]:
|
|
"""
|
|
Iterate through video frames.
|
|
|
|
Args:
|
|
start_time: Start time in seconds
|
|
end_time: End time in seconds
|
|
max_frames: Maximum frames to yield
|
|
|
|
Yields:
|
|
Tuple of (frame_index, frame, timestamp)
|
|
"""
|
|
if self.cap is None:
|
|
self.open()
|
|
|
|
fps = self.info.fps
|
|
start_frame = int(start_time * fps)
|
|
end_frame = int(end_time * fps) if end_time else self.info.frame_count
|
|
|
|
self.cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame)
|
|
|
|
frame_idx = start_frame
|
|
yielded = 0
|
|
|
|
while frame_idx < end_frame and self._is_running:
|
|
if max_frames and yielded >= max_frames:
|
|
break
|
|
|
|
ret, frame = self.cap.read()
|
|
if not ret:
|
|
if self.loop:
|
|
self.cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame)
|
|
frame_idx = start_frame
|
|
continue
|
|
else:
|
|
break
|
|
|
|
if (frame_idx - start_frame) % self._frame_interval == 0:
|
|
if self.resize:
|
|
frame = cv2.resize(frame, self.resize)
|
|
|
|
timestamp = frame_idx / fps
|
|
yield frame_idx, frame, timestamp
|
|
yielded += 1
|
|
|
|
frame_idx += 1
|
|
|
|
def __enter__(self):
|
|
self.open()
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
self.close()
|
|
|
|
|
|
class BufferedVideoSource(VideoSource):
|
|
"""Video source with frame buffering for smoother processing."""
|
|
|
|
def __init__(
|
|
self,
|
|
source: str,
|
|
buffer_size: int = 10,
|
|
**kwargs,
|
|
):
|
|
"""
|
|
Initialize buffered video source.
|
|
|
|
Args:
|
|
source: Path to video
|
|
buffer_size: Number of frames to buffer
|
|
**kwargs: Additional VideoSource arguments
|
|
"""
|
|
super().__init__(source, **kwargs)
|
|
self.buffer_size = buffer_size
|
|
self._buffer: Queue = Queue(maxsize=buffer_size)
|
|
self._thread: Optional[Thread] = None
|
|
self._lock = Lock()
|
|
|
|
def _reader_thread(self) -> None:
|
|
"""Background thread to read frames into buffer."""
|
|
while self._is_running:
|
|
if self._buffer.full():
|
|
time.sleep(0.001)
|
|
continue
|
|
|
|
with self._lock:
|
|
if self.cap is None:
|
|
break
|
|
ret, frame = self.cap.read()
|
|
frame_idx = int(self.cap.get(cv2.CAP_PROP_POS_FRAMES)) - 1
|
|
|
|
if not ret:
|
|
if self.loop:
|
|
with self._lock:
|
|
self.cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
|
|
continue
|
|
else:
|
|
self._buffer.put((False, None, -1))
|
|
break
|
|
|
|
if self.resize:
|
|
frame = cv2.resize(frame, self.resize)
|
|
|
|
self._buffer.put((True, frame, frame_idx))
|
|
|
|
def open(self) -> VideoInfo:
|
|
"""Open video and start buffer thread."""
|
|
info = super().open()
|
|
|
|
# Start reader thread
|
|
self._thread = Thread(target=self._reader_thread, daemon=True)
|
|
self._thread.start()
|
|
|
|
return info
|
|
|
|
def close(self) -> None:
|
|
"""Stop buffer thread and close video."""
|
|
self._is_running = False
|
|
if self._thread is not None:
|
|
self._thread.join(timeout=1.0)
|
|
super().close()
|
|
|
|
def read(self) -> Tuple[bool, Optional[np.ndarray], int]:
|
|
"""Read frame from buffer."""
|
|
if not self._is_running:
|
|
return False, None, -1
|
|
|
|
try:
|
|
return self._buffer.get(timeout=1.0)
|
|
except:
|
|
return False, None, -1
|