Source code for sign_language_translator.vision.utils

"""This module provides utility functions for video processing.
"""

from mimetypes import guess_type
from typing import Generator, List, Optional, Sequence, Tuple, Union

import cv2
from numpy import uint8
from numpy.typing import NDArray

__all__ = [
    "read_frames_with_opencv",
    "iter_frames_with_opencv",
]


[docs] def read_frames_with_opencv(path: str) -> List[NDArray[uint8]]: """ Extracts individual frames from a video file or an image file. This function reads a video file using opencv and extracts its frames as numpy arrays. It can also read an image file and treat it as a single frame video. Args: path (str): The path to the video or image file. Returns: List[NDArray]: A list of numpy arrays, each representing a frame from the video. Raises: FileNotFoundError: If the video file is not found or cannot be opened. """ return list(iter_frames_with_opencv(path))
[docs] def iter_frames_with_opencv(path: str) -> Generator[NDArray[uint8], None, None]: """ Extracts individual frames from a video file or an image file. This function reads a video file using opencv and extracts its frames as numpy arrays. It can also read an image file and treat it as a single frame video. Args: path (str): The path to the video or image file. Yields: NDArray[np.uint8]: numpy arrays representing frames from the video with shape: (height, width, color_channels). Raises: FileNotFoundError: If the video file is not found or cannot be opened. """ file_type = str(guess_type(path)[0]) if not file_type.startswith(("image", "video")): raise ValueError(f"unknown file type: {file_type}") if file_type.startswith("image"): frame = cv2.imread(path) yield cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) # type: ignore elif file_type.startswith("video"): capture = cv2.VideoCapture(path) for _ in range(int(capture.get(cv2.CAP_PROP_FRAME_COUNT))): ret, frame = capture.read() if not ret: break yield cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) # type: ignore capture.release()
def _normalize_args_index_and_timestamp( timestamp: Optional[float], index: Optional[int], max_duration: float, max_index: int, ) -> Tuple[float, int]: if (timestamp is not None) and index is None: if not 0 <= timestamp <= max_duration: raise ValueError(f"'{timestamp=}' is not between 0 and {max_duration}") return timestamp, round( (timestamp / max_duration if max_duration else 1) * max_index ) elif (index is not None) and timestamp is None: if not 0 <= index <= max_index: raise ValueError(f"'{index=}' is not between 0 and {max_index}") return index / (max_index or 1) * max_duration, index else: raise ValueError("provide either timestamp or index.") def _validate_and_normalize_slices( keys: Union[int, slice, Sequence[Union[int, slice]]], max_n_dims: int = 4 ) -> Tuple[slice, ...]: if not isinstance(keys, Sequence): keys = [keys] slices = [] for i, key in enumerate(keys): if key is Ellipsis: slices += [slice(None)] * (max_n_dims - len(keys) + 1) elif isinstance(key, int): slices.append(slice(key, (key + 1) or None)) elif isinstance(key, slice): slices.append(key) else: raise TypeError( f"Invalid argument: {key} at index {i}. Provide either an integer, slice or ellipsis." ) return tuple(slices)