diff options
Diffstat (limited to 'pixpat-python')
| -rw-r--r-- | pixpat-python/pixpat/__init__.py | 489 | ||||
| -rw-r--r-- | pixpat-python/pixpat/_lib/.gitkeep | 3 | ||||
| -rw-r--r-- | pixpat-python/pixpat/_native.py | 203 | ||||
| -rwxr-xr-x | pixpat-python/scripts/build_wheel.sh | 39 | ||||
| -rwxr-xr-x | pixpat-python/scripts/perf_test.py | 434 | ||||
| -rw-r--r-- | pixpat-python/tests/test_basic.py | 539 | ||||
| -rw-r--r-- | pixpat-python/tests/test_numpy.py | 110 | ||||
| -rw-r--r-- | pixpat-python/tests/test_threading.py | 187 |
8 files changed, 0 insertions, 2004 deletions
diff --git a/pixpat-python/pixpat/__init__.py b/pixpat-python/pixpat/__init__.py deleted file mode 100644 index 6531662..0000000 --- a/pixpat-python/pixpat/__init__.py +++ /dev/null @@ -1,489 +0,0 @@ -"""Python bindings for libpixpat. - -Loads the bundled shared library via ctypes — no CPython extension. Works on -any CPython >= 3.9 for the wheel's target architecture. - -The library draws test patterns and converts pixel data directly into -caller-owned pixel buffers, in a wide range of pixel formats -(planar/semi-planar/packed YUV, RGB, raw, …). The caller is responsible for -allocating each plane with the correct size and stride for the chosen format. - -Pixel data is described by :class:`Buffer`, a passive struct mirroring the -C ``pixpat_buffer``: format name, width, height, one writable buffer per -plane, and one row stride per plane. Use :func:`draw_pattern` to fill a -buffer with a test pattern, and :func:`convert` to convert pixel data -between formats. - -Format names follow the convention used by `kms++` and `pixutils` -(e.g. ``"XRGB8888"``, ``"NV12"``, ``"YUYV"``) — not the DRM/V4L2 -four-character codes (``"XR24"``, etc.). - -Example: - >>> import pixpat - >>> width, height = 1920, 1080 - >>> stride = width * 4 # XRGB8888: 4 bytes per pixel - >>> data = bytearray(stride * height) - >>> buf = pixpat.Buffer(planes=[data], fmt="XRGB8888", - ... width=width, height=height, strides=[stride]) - >>> pixpat.draw_pattern(buf, "smpte") - -Use :func:`supported_formats` to enumerate the format names accepted by -``Buffer.fmt``, and :func:`is_supported` to test a single format. - -Using numpy buffers -------------------- - -pixpat does not import numpy and does not depend on it, but any -C-contiguous ``numpy.ndarray`` works as a plane via Python's buffer -protocol. The caller is responsible for matching the array's dtype and -shape to the pixel format and for passing the correct row stride: - - >>> import numpy as np - >>> arr = np.zeros((height, width, 4), dtype=np.uint8) - >>> stride = arr.strides[0] # bytes per row - >>> buf = pixpat.Buffer(planes=[arr], fmt="XRGB8888", - ... width=width, height=height, strides=[stride]) - >>> pixpat.draw_pattern(buf, "smpte") - -For multi-plane formats like ``"NV12"`` pass one ndarray per plane (e.g. -``(h, w)`` uint8 for Y and ``(h//2, w)`` uint8 for the interleaved UV -plane). - -If you already hold all planes in a single contiguous buffer — the layout -OpenCV uses for NV12, an ``(h * 3 // 2, w)`` uint8 array with Y on top -and the interleaved UV plane below — slice it into per-plane views and -pass those:: - - >>> nv12 = np.zeros((h * 3 // 2, w), dtype=np.uint8) - >>> y, uv = nv12[:h], nv12[h:].reshape(h // 2, w) - >>> pixpat.Buffer([y, uv], "NV12", w, h, - ... [y.strides[0], uv.strides[0]]) - -The source side of :func:`convert` accepts read-only buffers (``bytes``, -``arr.view()`` with ``writeable=False``, mmap'd files, …). The -destination side must always be writable. -""" - -import ctypes -from dataclasses import dataclass, field -from enum import IntEnum -from typing import Mapping, Optional, Sequence, Union - -from ._native import ( - _Buffer, - _ConvertOpts, - _PatternOpts, - _PinnedBuffers, - _fill_buffer, - _lib, -) - -_VALID_PATTERNS = frozenset( - { - 'kmstest', - 'smpte', - 'plain', - 'checker', - 'hramp', - 'vramp', - 'hbar', - 'vbar', - 'dramp', - 'zoneplate', - } -) - - -class Rec(IntEnum): - """YCbCr color encoding standard used for YUV output formats. - - Selects the matrix used to convert from internal RGB to YUV. Has no - effect when drawing into an RGB or raw format. - - Attributes: - BT601: ITU-R BT.601 (standard definition). - BT709: ITU-R BT.709 (HD). - BT2020: ITU-R BT.2020 (UHD / HDR, non-constant luminance). - """ - - BT601 = 0 - BT709 = 1 - BT2020 = 2 - - -class Range(IntEnum): - """Quantization range used for YUV output formats. - - Attributes: - LIMITED: "TV" / studio range — Y in [16, 235], C in [16, 240] - (scaled to bit depth). What most video pipelines expect. - FULL: "PC" / full range — every component uses the full code range - (e.g. [0, 255] for 8-bit). - """ - - LIMITED = 0 - FULL = 1 - - -class PixpatError(RuntimeError): - """Raised when the underlying ``pixpat_*`` call fails. - - Most commonly indicates an unknown format name or buffer dimensions / - plane layout that the format does not allow (e.g. odd width for a - horizontally-subsampled YUV format). - """ - - -@dataclass -class Buffer: - """Plane data + format + dimensions, mirroring the C ``pixpat_buffer``. - - A passive struct: no allocation, no format knowledge, no - numpy/cv2 awareness. The caller decides plane shapes and strides - based on its own format knowledge (e.g. via ``pixutils``). - - Attributes: - planes: One buffer-protocol object per plane (e.g. ``bytearray``, - ``array.array``, ``numpy.ndarray``, ``mmap.mmap``, - ``memoryview``). Up to 4 planes are supported. Each plane - must hold at least ``strides[i] * plane_height`` bytes, - where ``plane_height`` is ``height`` for the main plane and - ``height // vertical_subsampling`` for chroma planes (e.g. - ``height // 2`` for the UV plane of ``NV12``). For - destination buffers each plane must be writable; for the - source side of :func:`convert` read-only objects (such as - ``bytes``) are accepted. - fmt: Pixel-format name; see :func:`supported_formats`. - width: Image width in pixels. Some formats constrain this — for - chroma-subsampled YUV formats it must be a multiple of the - horizontal subsampling factor (e.g. 2 for ``NV12``), and - packed formats add a further multiple from their - pixel-group size (e.g. 6 for ``P030``, 4 for ``SBGGR10P``). - height: Image height in pixels. For vertically-subsampled - formats (e.g. ``NV12``, ``YUV420``) it must be a multiple - of the vertical subsampling factor. - strides: Per-plane row stride in bytes. Must have the same - length as ``planes``. Strides larger than the minimum row - size are allowed. - """ - - planes: Sequence - fmt: str - width: int - height: int - strides: Sequence[int] = field(default_factory=list) - - -def supported_formats() -> list[str]: - """Return the list of pixel-format names accepted by :class:`Buffer`. - - Format names follow the convention used by `kms++` and `pixutils` - (e.g. ``"XRGB8888"``, ``"NV12"``, ``"YUYV"``) — not the DRM/V4L2 - four-character codes. - """ - n = _lib.pixpat_format_count() - return [_lib.pixpat_format_name(i).decode('ascii') for i in range(n)] - - -def is_supported(fmt: str) -> bool: - """Return whether ``fmt`` is a known pixel-format name. - - Equivalent to ``fmt in supported_formats()`` but cheaper — it does not - materialize the full list. - """ - return bool(_lib.pixpat_format_supported(fmt.encode('ascii'))) - - -def _serialize_pattern_params( - params: Optional[Union[str, Mapping[str, object]]], -) -> Optional[bytes]: - """Encode `params` for the C ABI. - - Accepts a ready-made string (passed through verbatim) or a mapping of - string keys to stringifiable values, which is serialized to the - ``"key=val,key=val"`` form pixpat parses on the C side. Returns None - if there is nothing to pass. - """ - if params is None: - return None - if isinstance(params, str): - return params.encode('ascii') - if isinstance(params, Mapping): - items = [] - for k, v in params.items(): - if not isinstance(k, str): - raise TypeError(f'pattern params keys must be str, got {type(k).__name__}') - sv = str(v) - if ',' in k or '=' in k: - raise ValueError(f'pattern params key contains , or =: {k!r}') - if ',' in sv or '=' in sv: - raise ValueError(f'pattern params value contains , or =: {sv!r}') - items.append(f'{k}={sv}') - return ','.join(items).encode('ascii') - raise TypeError(f'pattern params must be None, str, or a Mapping, got {type(params).__name__}') - - -def draw_pattern( - dst: Buffer, - pattern: Optional[str] = None, - *, - rec: Rec = Rec.BT601, - color_range: Range = Range.LIMITED, - num_threads: int = 0, - params: Optional[Union[str, Mapping[str, object]]] = None, -) -> None: - """Draw a test pattern into ``dst``. - - Args: - dst: Destination buffer; all planes must be writable. - pattern: Name of the pattern to draw. ``None`` selects the default - (equivalent to ``"kmstest"``). Recognized values: - - * ``"kmstest"`` — color gradients with ramps, in the style of - the original ``kmstest`` tool. The default. - * ``"smpte"`` — SMPTE RP 219-1 color bars (with PLUGE). - * ``"plain"`` — solid color fill from ``params["color"]``. - See `params` below. - * ``"checker"`` — black/white checkerboard. Optional - ``params["cell"]`` (decimal positive integer, default 8) - sets the cell size in pixels. ``"cell": "1"`` is a - 1-pixel chroma-subsampling stress test. - * ``"hramp"`` — four horizontal stripes (R, G, B, gray), - each a 0..max ramp along x. Combined per-channel and - luma quantization check. - * ``"vramp"`` — same as ``"hramp"`` rotated 90°: four - vertical columns ramping along y. - * ``"hbar"`` — horizontal bar (full image width, narrow - along y) over a black background. Required - ``params["pos"]`` (signed integer, top edge in pixels); - optional ``params["width"]`` (positive integer, default - 32). The bar is split into seven equal-width regions - colored white/red/white/green/white/blue/white. - * ``"vbar"`` — same as ``"hbar"`` rotated 90°: vertical - bar with ``pos`` measured along x. - * ``"dramp"`` — diagonal RGB ramp (R on x, G on y, - B on x+y). - * ``"zoneplate"`` — centered radial cosine pattern, - frequency ramping from DC at the center to Nyquist at - the longer edge. Useful for spotting scaling/aliasing. - - ``"smpte"`` is defined by the spec in BT.709 / Limited. - Pass ``rec=Rec.BT709, color_range=Range.LIMITED`` for - spec-correct output; other settings produce visibly-wrong - colors when drawing into RGB sinks (the caller's matrix is - applied to BT.709-encoded values). Callers are trusted — - pixpat does not silently override the spec. - rec: YCbCr matrix for YUV formats. Ignored for RGB / raw formats. - Defaults to :attr:`Rec.BT601`. - color_range: Quantization range for YUV formats. Ignored for - RGB / raw formats. Defaults to :attr:`Range.LIMITED`. - num_threads: Worker-thread count. ``0`` selects a sensible - default (one per online CPU, capped to a sane maximum); - ``1`` runs single-threaded with no thread-spawn overhead; - ``N > 1`` uses exactly ``N`` workers. Defaults to ``0``. - Output is bit-identical regardless of the chosen count. - params: Optional pattern-specific parameters. Either a mapping - (``{"color": "ff0000"}``) — serialized to ``"color=ff0000"`` - for the C ABI — or a raw ``"key=val,key=val"`` string. - Unknown keys are silently ignored; patterns that don't read - params (``kmstest``, ``smpte``) ignore this entirely. - Per-pattern keys: - - * ``"plain"`` reads ``"color"`` as a hex RGB(A) string with - an optional ``"0x"`` prefix. The hex-digit count selects - the layout: 6 → 8-bit ``RRGGBB``, 8 → 8-bit ``AARRGGBB`` - (alpha first), 12 → 16-bit ``RRRRGGGGBBBB``, 16 → 16-bit - ``AAAARRRRGGGGBBBB``. Missing or malformed ``"color"`` - raises :class:`PixpatError`. - * ``"checker"`` reads optional ``"cell"`` as a positive - decimal integer; default 8. A non-positive or non-numeric - value raises :class:`PixpatError`. - * ``"hbar"`` / ``"vbar"`` read required ``"pos"`` (signed - decimal integer, top/left edge of the bar in pixels; - negative values clip at the edge) and optional ``"width"`` - (positive decimal integer, bar thickness; default 32). - Missing/non-numeric ``pos`` or non-positive ``width`` - raises :class:`PixpatError`. - - Raises: - ValueError: ``dst.planes`` exceeds the maximum plane count, - ``dst.strides`` length does not match ``dst.planes``, - ``pattern`` is not one of the recognized names, or a - ``params`` mapping value contains a forbidden ``,`` or ``=``. - TypeError: A plane is read-only (e.g. ``bytes``, a read-only - ``memoryview``), or ``params`` has an unsupported type. - PixpatError: The underlying C call failed — typically an unknown - ``fmt``, dimensions / strides incompatible with the format, - or pattern parameters that the pattern rejected. - - Example: - >>> w, h = 640, 480 - >>> stride = w * 4 - >>> data = bytearray(stride * h) - >>> dst = Buffer([data], "XRGB8888", w, h, [stride]) - >>> draw_pattern(dst, "smpte") - >>> draw_pattern(dst, "plain", params={"color": "ff0000"}) - """ - if pattern is None: - pattern = 'kmstest' - elif pattern not in _VALID_PATTERNS: - raise ValueError( - f'unknown pattern {pattern!r}; expected one of {sorted(_VALID_PATTERNS)} or None' - ) - if num_threads < 0: - raise ValueError(f'num_threads must be >= 0, got {num_threads}') - - params_bytes = _serialize_pattern_params(params) - - c_buf = _Buffer() - opts = _PatternOpts() - opts.rec = int(rec) - opts.range = int(color_range) - opts.num_threads = num_threads - opts.params = params_bytes - - with _PinnedBuffers() as pins: - _fill_buffer( - c_buf, - pins, - dst.planes, - dst.fmt, - dst.width, - dst.height, - dst.strides, - writable=True, - ) - rc = _lib.pixpat_draw_pattern( - ctypes.byref(c_buf), pattern.encode('ascii'), ctypes.byref(opts) - ) - - if rc != 0: - raise PixpatError(f'pixpat_draw_pattern failed (rc={rc}); check format name and dimensions') - - -def convert( - dst: Buffer, - src: Buffer, - *, - rec: Rec = Rec.BT601, - color_range: Range = Range.LIMITED, - num_threads: int = 0, -) -> None: - """Convert pixel data from ``src`` into ``dst``. - - Both buffers must describe an image of the same width and height; - only the pixel format may differ. Any format accepted by - :func:`supported_formats` works as both source and destination in - the default build (custom build profiles can mark individual formats - read-only or write-only). Conversion is routed internally through a - 16-bit normalized RGB or YUV intermediate, so format-to-format - conversions in either direction (e.g. ``NV12`` -> ``YUV420``, - ``XRGB8888`` -> ``NV12``, ``SRGGB10`` -> ``BGR888``) are a single - call. Bayer sources are decoded with a 3x3 bilinear demosaic. - - ``src.planes`` may hold read-only buffers (e.g. ``bytes``, mmap'd - files, numpy arrays with ``writeable=False``); ``dst.planes`` must - be writable. - - Args: - dst: Destination buffer. - src: Source buffer. - rec: YCbCr matrix used when the conversion crosses the RGB/YUV - boundary. Ignored when ``src.fmt`` and ``dst.fmt`` share - the same color kind. Defaults to :attr:`Rec.BT601`. - color_range: Quantization range used when the conversion crosses - the RGB/YUV boundary. Ignored when ``src.fmt`` and - ``dst.fmt`` share the same color kind. Defaults to - :attr:`Range.LIMITED`. - num_threads: Worker-thread count. ``0`` selects a sensible - default (one per online CPU, capped to a sane maximum); - ``1`` runs single-threaded with no thread-spawn overhead; - ``N > 1`` uses exactly ``N`` workers. Defaults to ``0``. - Output is bit-identical regardless of the chosen count. - - Raises: - ValueError: ``dst`` and ``src`` have mismatched dimensions, a - plane sequence exceeds the maximum plane count, or a - strides length does not match its planes length. - TypeError: A ``dst`` plane is read-only. - PixpatError: The underlying C call failed — typically an - unknown format name, a format disabled in the current build - (or disabled in the requested direction), or dimensions / - strides incompatible with one of the formats. - - Example: - Cross-color-kind, multi-plane on both sides — paint an NV12 - source via :func:`draw_pattern`, then convert it to planar - YUV420: - - >>> w, h = 64, 32 - >>> y_src = bytearray(w * h) - >>> uv_src = bytearray(w * h // 2) - >>> draw_pattern(Buffer([y_src, uv_src], "NV12", w, h, [w, w]), - ... "smpte") - >>> y_dst = bytearray(w * h) - >>> u_dst = bytearray(w * h // 4) - >>> v_dst = bytearray(w * h // 4) - >>> convert( - ... Buffer([y_dst, u_dst, v_dst], "YUV420", w, h, - ... [w, w // 2, w // 2]), - ... Buffer([y_src, uv_src], "NV12", w, h, [w, w]), - ... ) - """ - if dst.width != src.width or dst.height != src.height: - raise ValueError( - f'dst dimensions {dst.width}x{dst.height} do not match ' - f'src dimensions {src.width}x{src.height}' - ) - if num_threads < 0: - raise ValueError(f'num_threads must be >= 0, got {num_threads}') - - c_dst = _Buffer() - c_src = _Buffer() - opts = _ConvertOpts() - opts.rec = int(rec) - opts.range = int(color_range) - opts.num_threads = num_threads - - with _PinnedBuffers() as pins: - _fill_buffer( - c_dst, - pins, - dst.planes, - dst.fmt, - dst.width, - dst.height, - dst.strides, - writable=True, - role='dst', - ) - _fill_buffer( - c_src, - pins, - src.planes, - src.fmt, - src.width, - src.height, - src.strides, - writable=False, - role='src', - ) - rc = _lib.pixpat_convert(ctypes.byref(c_dst), ctypes.byref(c_src), ctypes.byref(opts)) - - if rc != 0: - raise PixpatError( - f'pixpat_convert failed (rc={rc}); check format names, dimensions, ' - f'and that src.fmt is a supported source format' - ) - - -__all__ = [ - 'Buffer', - 'Rec', - 'Range', - 'PixpatError', - 'supported_formats', - 'is_supported', - 'draw_pattern', - 'convert', -] diff --git a/pixpat-python/pixpat/_lib/.gitkeep b/pixpat-python/pixpat/_lib/.gitkeep deleted file mode 100644 index 7319712..0000000 --- a/pixpat-python/pixpat/_lib/.gitkeep +++ /dev/null @@ -1,3 +0,0 @@ -# Placeholder so the _lib/ directory exists in source control. -# `pip install .` and `pip install -e .` populate libpixpat.so.0 here -# (copy and symlink respectively); see setup.py. diff --git a/pixpat-python/pixpat/_native.py b/pixpat-python/pixpat/_native.py deleted file mode 100644 index 46fe452..0000000 --- a/pixpat-python/pixpat/_native.py +++ /dev/null @@ -1,203 +0,0 @@ -"""ctypes plumbing for libpixpat. - -Private module — public API lives in :mod:`pixpat`. Loads the bundled -shared library, declares the C structs and function signatures, and -provides :func:`_fill_buffer` to translate Python plane sequences into -the C ``pixpat_buffer`` layout while pinning the underlying memory via -the buffer protocol. - -The buffer-protocol path uses ``PyObject_GetBuffer`` / ``PyBuffer_Release`` -directly (rather than ctypes ``from_buffer``) so that read-only inputs -work for the ``src`` side of :func:`pixpat.convert`. ``from_buffer`` -unconditionally requires a writable buffer. -""" - -import ctypes -import os -import pathlib -from typing import Sequence - -_PIXPAT_MAX_PLANES = 4 - -_lib_override = os.environ.get('PIXPAT_LIB') -if _lib_override: - if not pathlib.Path(_lib_override).exists(): - raise ImportError(f'pixpat: PIXPAT_LIB={_lib_override} does not exist') - _lib = ctypes.CDLL(_lib_override) -else: - _lib_dir = pathlib.Path(__file__).parent / '_lib' - _so_candidates = sorted(_lib_dir.glob('libpixpat.so*')) - if not _so_candidates: - raise ImportError(f'pixpat: no libpixpat.so* found in {_lib_dir}') - _lib = ctypes.CDLL(str(_so_candidates[0])) - - -class _Buffer(ctypes.Structure): - _fields_ = [ - ('format', ctypes.c_char_p), - ('width', ctypes.c_uint32), - ('height', ctypes.c_uint32), - ('num_planes', ctypes.c_uint32), - ('planes', ctypes.c_void_p * _PIXPAT_MAX_PLANES), - ('strides', ctypes.c_uint32 * _PIXPAT_MAX_PLANES), - ] - - -class _PatternOpts(ctypes.Structure): - _fields_ = [ - ('rec', ctypes.c_int), - ('range', ctypes.c_int), - ('num_threads', ctypes.c_int), - ('params', ctypes.c_char_p), - ] - - -class _ConvertOpts(ctypes.Structure): - _fields_ = [ - ('rec', ctypes.c_int), - ('range', ctypes.c_int), - ('num_threads', ctypes.c_int), - ] - - -_lib.pixpat_draw_pattern.argtypes = [ - ctypes.POINTER(_Buffer), - ctypes.c_char_p, - ctypes.POINTER(_PatternOpts), -] -_lib.pixpat_draw_pattern.restype = ctypes.c_int - -_lib.pixpat_convert.argtypes = [ - ctypes.POINTER(_Buffer), - ctypes.POINTER(_Buffer), - ctypes.POINTER(_ConvertOpts), -] -_lib.pixpat_convert.restype = ctypes.c_int - -_lib.pixpat_format_supported.argtypes = [ctypes.c_char_p] -_lib.pixpat_format_supported.restype = ctypes.c_int - -_lib.pixpat_format_count.argtypes = [] -_lib.pixpat_format_count.restype = ctypes.c_size_t - -_lib.pixpat_format_name.argtypes = [ctypes.c_size_t] -_lib.pixpat_format_name.restype = ctypes.c_char_p - - -class _Py_buffer(ctypes.Structure): - _fields_ = [ - ('buf', ctypes.c_void_p), - ('obj', ctypes.py_object), - ('len', ctypes.c_ssize_t), - ('itemsize', ctypes.c_ssize_t), - ('readonly', ctypes.c_int), - ('ndim', ctypes.c_int), - ('format', ctypes.c_char_p), - ('shape', ctypes.POINTER(ctypes.c_ssize_t)), - ('strides', ctypes.POINTER(ctypes.c_ssize_t)), - ('suboffsets', ctypes.POINTER(ctypes.c_ssize_t)), - ('internal', ctypes.c_void_p), - ] - - -_PyObject_GetBuffer = ctypes.pythonapi.PyObject_GetBuffer -_PyObject_GetBuffer.argtypes = [ - ctypes.py_object, - ctypes.POINTER(_Py_buffer), - ctypes.c_int, -] -_PyObject_GetBuffer.restype = ctypes.c_int - -_PyBuffer_Release = ctypes.pythonapi.PyBuffer_Release -_PyBuffer_Release.argtypes = [ctypes.POINTER(_Py_buffer)] -_PyBuffer_Release.restype = None - -_PyBUF_SIMPLE = 0 -_PyBUF_WRITABLE = 0x0001 - - -class _PinnedBuffers: - """Holds Py_buffer views for the lifetime of a pixpat call. - - Releases each view in ``__exit__`` (or when garbage-collected) so - the underlying objects' buffer-export count drops back to zero. - """ - - def __init__(self) -> None: - self._views: list[_Py_buffer] = [] - - def acquire(self, obj, *, writable: bool) -> int: - view = _Py_buffer() - flags = _PyBUF_WRITABLE if writable else _PyBUF_SIMPLE - # ctypes.pythonapi propagates the set exception (BufferError / - # TypeError) automatically on failure, so no rc check is needed. - _PyObject_GetBuffer(obj, ctypes.byref(view), flags) - self._views.append(view) - return view.buf or 0 - - def release(self) -> None: - while self._views: - view = self._views.pop() - _PyBuffer_Release(ctypes.byref(view)) - - def __enter__(self) -> '_PinnedBuffers': - return self - - def __exit__(self, exc_type, exc, tb) -> None: - self.release() - - def __del__(self) -> None: - self.release() - - -def _fill_buffer( - buf: _Buffer, - pins: _PinnedBuffers, - planes: Sequence, - fmt: str, - width: int, - height: int, - strides: Sequence[int], - *, - writable: bool, - role: str = '', -) -> None: - """Populate ``buf`` from a Python plane sequence, pinning each plane. - - ``writable`` selects whether each plane must be writable: True for - destination buffers (the C library writes into them), False for - source buffers (the C library only reads). - """ - label = f'{role} ' if role else '' - if len(planes) > _PIXPAT_MAX_PLANES: - raise ValueError(f'too many {label}planes: {len(planes)} (max {_PIXPAT_MAX_PLANES})') - if len(strides) != len(planes): - raise ValueError(f'{label}strides has {len(strides)} entries, expected {len(planes)}') - - plane_ptrs = (ctypes.c_void_p * _PIXPAT_MAX_PLANES)() - stride_arr = (ctypes.c_uint32 * _PIXPAT_MAX_PLANES)() - for i, plane in enumerate(planes): - try: - addr = pins.acquire(plane, writable=writable) - except BufferError as e: - kind = 'writable' if writable else 'buffer-protocol' - raise TypeError(f'{label}plane {i} does not support the {kind} interface: {e}') from e - plane_ptrs[i] = addr - stride_arr[i] = strides[i] - - buf.format = fmt.encode('ascii') - buf.width = width - buf.height = height - buf.num_planes = len(planes) - buf.planes = plane_ptrs - buf.strides = stride_arr - - -__all__ = [ - '_Buffer', - '_PatternOpts', - '_ConvertOpts', - '_PinnedBuffers', - '_lib', - '_fill_buffer', -] diff --git a/pixpat-python/scripts/build_wheel.sh b/pixpat-python/scripts/build_wheel.sh deleted file mode 100755 index 1eb2586..0000000 --- a/pixpat-python/scripts/build_wheel.sh +++ /dev/null @@ -1,39 +0,0 @@ -#!/usr/bin/env bash -# Build a pixpat wheel for the given target architecture. -# -# Usage: pixpat-python/scripts/build_wheel.sh <x86_64|aarch64> -# -# Meson is invoked from setup.py during the wheel build; this script just -# selects the target arch and hands off to `python -m build`. The meson -# build dir lands at pixpat-python/build-<arch>/native/, so cross-compiling -# both arches in turn keeps each arch's compile incremental. -# -# Prerequisites: -# - meson, ninja in PATH (e.g. `pip install meson ninja`) -# - For aarch64: gcc-aarch64-linux-gnu, g++-aarch64-linux-gnu -# (sudo apt install gcc-aarch64-linux-gnu g++-aarch64-linux-gnu) -# - Python with `build` package: pip install build - -set -euo pipefail - -ARCH="${1:-}" -case "$ARCH" in - x86_64|aarch64) ;; - *) - echo "usage: $0 <x86_64|aarch64>" >&2 - exit 1 - ;; -esac - -REPO_ROOT="$(cd "$(dirname "$0")/../.." && pwd)" -cd "$REPO_ROOT" - -# --no-isolation: reuse the host environment so meson's per-arch build dir -# (pixpat-python/build-<arch>/native/) survives across invocations and stays -# incremental. Build isolation copies the source to a temp dir, so the meson -# build dir would be thrown away each run. -PIXPAT_TARGET_ARCH="$ARCH" python -m build --wheel --no-isolation - -echo -echo "Wheel(s) in dist/:" -ls -1 dist/*.whl diff --git a/pixpat-python/scripts/perf_test.py b/pixpat-python/scripts/perf_test.py deleted file mode 100755 index 100c16e..0000000 --- a/pixpat-python/scripts/perf_test.py +++ /dev/null @@ -1,434 +0,0 @@ -#!/usr/bin/env python3 -"""Micro-benchmark pixpat across draw_pattern and convert call paths. - -Each row times one ``<source> -> <destination>`` operation: - -* a pattern draw, where ``source`` is a pattern name (``smpte``, - ``kmstest`` etc.) and the row times ``pixpat.draw_pattern``; -* a format conversion, where both sides are format names and the row - times ``pixpat.convert``. - -Buffer allocation is driven by ``pixutils.formats.PixelFormats`` — -``framesize`` / ``planesize`` / ``stride`` give us per-plane geometry -without per-format hand-coding. - -Filtering is uniform across both kinds because the case name always -has the form ``"<lhs> -> <rhs>"``: ``--ionly kmstest`` keeps that -pattern's cases, ``--ionly NV12`` keeps NV12-source convert cases, -``--oonly BGR888`` keeps any case (pattern or convert) writing -BGR888, and so on. - -Usage: - python perf_test.py [--width 1920] [--height 1080] [--iters 200] - [--warmup 10] [--pp-threads 1] - [--rec BT709] [--range LIMITED] - [--only-pattern | --only-convert] - [--only STR] [--ionly STR] [--oonly STR] - [--cases STR] [--tsv] -""" - -from __future__ import annotations - -import argparse -import gc -import math -import sys -import time -from dataclasses import dataclass -from typing import Callable, Optional - -import numpy as np - -import pixpat -from pixutils.formats import PixelFormat, PixelFormats - - -PATTERNS = ['kmstest', 'smpte', 'plain'] - -PATTERN_SINK_FMTS = [ - 'XRGB8888', - 'BGR888', - 'RGB565', - 'RGBA1010102', - 'ABGR16161616', - 'Y8', - 'YUYV', - 'UYVY', - 'NV12', - 'NV16', - 'YUV420', - 'SRGGB8', - 'SRGGB10P', -] - -CONVERT_PAIRS: list[tuple[str, str]] = [ - # RGB shuffles - ('RGB888', 'BGR888'), - ('BGR888', 'BGRA8888'), - ('BGRA8888', 'BGR888'), - # Grayscale - ('BGR888', 'Y8'), - ('Y8', 'BGR888'), - # Packed YUV decode - ('YUYV', 'BGR888'), - ('UYVY', 'BGR888'), - # Semiplanar YUV decode - ('NV12', 'BGR888'), - ('NV21', 'BGR888'), - # Planar YUV decode - ('YUV420', 'BGR888'), - ('YVU420', 'BGR888'), - # Planar YUV encode - ('BGR888', 'YUV420'), - ('BGR888', 'YVU420'), - # NV12 encode/decode against XRGB8888 - ('XRGB8888', 'NV12'), - ('BGR888', 'NV12'), - ('NV12', 'XRGB8888'), - # Cold path: neither side is BGR888 - ('XRGB8888', 'RGBA8888'), - ('RGBA8888', 'XRGB8888'), - ('NV12', 'YUV420'), - ('YUV420', 'NV12'), - ('NV12', 'YUYV'), - ('NV12', 'RGB565'), - ('Y8', 'NV12'), - # Wider coverage - ('BGR888', 'YUYV'), - ('BGR888', 'NV16'), - ('NV16', 'BGR888'), - ('BGR888', 'YUV422'), - ('YUV422', 'BGR888'), - ('BGR888', 'YUV444'), - ('YUV444', 'BGR888'), - # 16-bit packed RGB - ('BGR888', 'RGB565'), - ('RGB565', 'BGR888'), - # 10-bit packed RGB - ('BGR888', 'RGBA1010102'), - ('RGBA1010102', 'BGR888'), - # 64-bit packed RGB (pixpat's normalized wide form) - ('BGR888', 'ABGR16161616'), - ('ABGR16161616', 'BGR888'), - # Multi-pixel-per-word semiplanar YUV - ('BGR888', 'P030'), - ('P030', 'BGR888'), - ('BGR888', 'P230'), - # Multi-pixel-per-word planar YUV - ('BGR888', 'T430'), - ('T430', 'BGR888'), - # Multi-pixel-per-word grayscale - ('BGR888', 'XYYY2101010'), - # Single-pixel-per-word YUV - ('BGR888', 'XVUY2101010'), - ('BGR888', 'AVUY16161616'), - # Bayer - ('BGR888', 'SRGGB8'), - ('SRGGB8', 'BGR888'), - ('BGR888', 'SRGGB10P'), - ('SRGGB10P', 'BGR888'), - ('BGR888', 'SRGGB12P'), - ('SRGGB12P', 'BGR888'), - # Single-channel RGB - ('BGR888', 'R8'), - ('R8', 'BGR888'), - # MIPI CSI-2 packed grayscale - ('BGR888', 'Y10P'), - ('Y10P', 'BGR888'), - # 4:2:2 packed YUV in 64-bit words - ('BGR888', 'Y210'), - ('Y210', 'BGR888'), -] - - -_PF_BY_NAME = {v.name: v for v in PixelFormats.__dict__.values() if isinstance(v, PixelFormat)} - - -def _required_align(pf: PixelFormat) -> tuple[int, int]: - # pf.pixel_align doesn't always capture the per-plane pixels_per_block / - # vsub requirements (e.g. T430 lists (1,1) but each plane is 3 px wide), - # so combine them to get the effective alignment we need to skip cleanly. - w_align = pf.pixel_align[0] - h_align = pf.pixel_align[1] - for p in pf.planes: - w_align = math.lcm(w_align, p.pixels_per_block * p.hsub) - h_align = math.lcm(h_align, p.vsub) - return w_align, h_align - - -def _alloc_buffer(fmt_name: str, w: int, h: int) -> Optional[tuple[pixpat.Buffer, np.ndarray]]: - """Build a pixpat.Buffer + its 1-D backing array, or None when the - resolution doesn't fit the format's alignment.""" - pf = _PF_BY_NAME[fmt_name] - w_align, h_align = _required_align(pf) - if w % w_align or h % h_align: - return None - - backing = np.zeros(pf.framesize(w, h), dtype=np.uint8) - planes: list[np.ndarray] = [] - strides: list[int] = [] - offset = 0 - for i in range(len(pf.planes)): - s = pf.stride(w, i) - psize = pf.planesize(s, h, i) - view = backing[offset : offset + psize].reshape(psize // s, s) - planes.append(view) - strides.append(s) - offset += psize - return pixpat.Buffer(planes, fmt_name, w, h, strides), backing - - -@dataclass -class Case: - name: str - kind: str # 'pattern' or 'convert' - dst: pixpat.Buffer - src: Optional[pixpat.Buffer] = None - src_arr: Optional[np.ndarray] = None # 1-D backing for random fill - - -def _build_cases(w: int, h: int, kinds: set[str]) -> list[Case]: - cases: list[Case] = [] - - if 'pattern' in kinds: - for fmt in PATTERN_SINK_FMTS: - alloc = _alloc_buffer(fmt, w, h) - if alloc is None: - continue - buf, _ = alloc - for pat in PATTERNS: - cases.append(Case(name=f'{pat} -> {fmt}', kind='pattern', dst=buf)) - - if 'convert' in kinds: - for src_fmt, dst_fmt in CONVERT_PAIRS: - src_alloc = _alloc_buffer(src_fmt, w, h) - dst_alloc = _alloc_buffer(dst_fmt, w, h) - if src_alloc is None or dst_alloc is None: - continue - src_buf, src_backing = src_alloc - dst_buf, _ = dst_alloc - cases.append( - Case( - name=f'{src_fmt} -> {dst_fmt}', - kind='convert', - dst=dst_buf, - src=src_buf, - src_arr=src_backing, - ) - ) - - return cases - - -def _parse_filter(s: Optional[str]) -> Optional[set[str]]: - if s is None: - return None - return {x.strip().upper() for x in s.split(',') if x.strip()} - - -def _norm_case(name: str) -> str: - return ' '.join(name.upper().split()) - - -def _filter_cases( - cases: list[Case], - only: Optional[str], - ionly: Optional[str], - oonly: Optional[str], - cases_filter: Optional[str], -) -> list[Case]: - only_set = _parse_filter(only) - ionly_set = _parse_filter(ionly) - oonly_set = _parse_filter(oonly) - cases_set: Optional[set[str]] = None - if cases_filter is not None: - cases_set = {_norm_case(x) for x in cases_filter.split(',') if x.strip()} - if only_set is None and ionly_set is None and oonly_set is None and cases_set is None: - return cases - - out: list[Case] = [] - for c in cases: - lhs, _, rhs = c.name.partition(' -> ') - lhs, rhs = lhs.upper(), rhs.upper() - if cases_set is not None and _norm_case(c.name) not in cases_set: - continue - if only_set is not None and lhs not in only_set and rhs not in only_set: - continue - if ionly_set is not None and lhs not in ionly_set: - continue - if oonly_set is not None and rhs not in oonly_set: - continue - out.append(c) - return out - - -def _bind( - case: Case, - num_threads: int, - rec: 'pixpat.Rec', - color_range: 'pixpat.Range', -) -> Callable[[], object]: - if case.kind == 'pattern': - fn = pixpat.draw_pattern - dst = case.dst - pat = case.name.partition(' -> ')[0] - # `plain` needs an explicit color; the rest ignore params. - params = {'color': 'ff0000'} if pat == 'plain' else None - return lambda: fn( - dst, - pat, - rec=rec, - color_range=color_range, - num_threads=num_threads, - params=params, - ) - fn = pixpat.convert - assert case.src is not None - src, dst = case.src, case.dst - return lambda: fn(dst, src, rec=rec, color_range=color_range, num_threads=num_threads) - - -def _time_n(fn: Callable[[], object], iters: int) -> float: - """Return min seconds over ``iters`` calls.""" - best = float('inf') - for _ in range(iters): - t0 = time.perf_counter_ns() - fn() - dt = time.perf_counter_ns() - t0 - if dt < best: - best = dt - return best * 1e-9 - - -def main() -> int: - p = argparse.ArgumentParser( - description='Micro-benchmark pixpat across draw_pattern and convert.' - ) - p.add_argument('--width', type=int, default=1920) - p.add_argument('--height', type=int, default=1080) - p.add_argument('--iters', type=int, default=200) - p.add_argument('--warmup', type=int, default=10) - p.add_argument( - '--pp-threads', - type=int, - default=1, - help='pixpat thread count; 0 = sensible default', - ) - p.add_argument( - '--rec', - default='BT709', - choices=[r.name for r in pixpat.Rec], - help='YCbCr matrix for YUV cases (default BT709)', - ) - p.add_argument( - '--range', - dest='color_range', - default='LIMITED', - choices=[r.name for r in pixpat.Range], - help='Quantization range for YUV cases (default LIMITED)', - ) - kind_group = p.add_mutually_exclusive_group() - kind_group.add_argument( - '--only-pattern', - action='store_true', - help='restrict to pattern-draw cases', - ) - kind_group.add_argument( - '--only-convert', - action='store_true', - help='restrict to convert cases', - ) - p.add_argument( - '--only', - default=None, - help='comma-separated names; keep cases whose lhs OR rhs of " -> " matches', - ) - p.add_argument( - '--ionly', - default=None, - help='comma-separated names; keep cases whose lhs (pattern or src fmt) matches', - ) - p.add_argument( - '--oonly', - default=None, - help='comma-separated names; keep cases whose rhs (dst fmt) matches', - ) - p.add_argument( - '--cases', - default=None, - help='comma-separated full case names (e.g. "smpte -> BGR888,RGB888 -> BGR888")', - ) - p.add_argument( - '--tsv', - action='store_true', - help='emit tab-separated rows on stdout; meta and warnings go to stderr', - ) - args = p.parse_args() - - w, h = args.width, args.height - mp = (w * h) / 1e6 - - rec = pixpat.Rec[args.rec] - color_range = pixpat.Range[args.color_range] - - kinds: set[str] = {'pattern', 'convert'} - if args.only_pattern: - kinds = {'pattern'} - elif args.only_convert: - kinds = {'convert'} - - cases = _build_cases(w, h, kinds) - cases = _filter_cases(cases, args.only, args.ionly, args.oonly, args.cases) - if not cases: - print('no cases matched filter') - return 1 - - info: Callable[[str], None] = (lambda m: print(m, file=sys.stderr)) if args.tsv else print - - info( - f'Resolution: {w}x{h} ({mp:.2f} MP/frame), ' - f'iters={args.iters}, warmup={args.warmup}, ' - f'pp threads={args.pp_threads}, ' - f'rec={rec.name}, range={color_range.name}' - ) - - name_w = max((len(c.name) for c in cases), default=20) + 2 - - if args.tsv: - print('\t'.join(['case', 'pp_mps', 'pp_fps'])) - else: - print() - header = f'{"case":<{name_w}} {"pp MP/s":>9} {"pp fps":>8}' - print(header) - print('-' * len(header)) - - rng = np.random.default_rng(0) - - for case in cases: - if case.kind == 'convert' and case.src_arr is not None: - case.src_arr[...] = rng.integers(0, 256, size=case.src_arr.shape, dtype=np.uint8) - - run = _bind(case, args.pp_threads, rec=rec, color_range=color_range) - gc.disable() - try: - for _ in range(args.warmup): - run() - t = _time_n(run, args.iters) - except (pixpat.PixpatError, ValueError, TypeError) as e: - info(f'{case.name:<{name_w}} skipped: {type(e).__name__}: {e}') - continue - finally: - gc.enable() - - fps = 1.0 / t - mps = fps * mp - if args.tsv: - print(f'{case.name}\t{mps:.0f}\t{fps:.0f}') - else: - print(f'{case.name:<{name_w}} {mps:>9.0f} {fps:>8.0f}') - - return 0 - - -if __name__ == '__main__': - raise SystemExit(main()) diff --git a/pixpat-python/tests/test_basic.py b/pixpat-python/tests/test_basic.py deleted file mode 100644 index 2ed8bb7..0000000 --- a/pixpat-python/tests/test_basic.py +++ /dev/null @@ -1,539 +0,0 @@ -"""Smoke tests for the pixpat Python bindings.""" - -import pytest - -import pixpat - - -def test_supported_formats_nonempty(): - formats = pixpat.supported_formats() - assert isinstance(formats, list) - assert len(formats) > 0 - assert all(isinstance(f, str) for f in formats) - assert 'XRGB8888' in formats - assert 'NV12' in formats - - -def test_is_supported(): - assert pixpat.is_supported('XRGB8888') - assert pixpat.is_supported('NV12') - assert not pixpat.is_supported('NOT_A_REAL_FORMAT') - - -def test_draw_pattern_xrgb8888(): - w, h = 64, 32 - stride = w * 4 - data = bytearray(stride * h) - buf = pixpat.Buffer([data], 'XRGB8888', w, h, [stride]) - pixpat.draw_pattern(buf, 'smpte') - assert any(b != 0 for b in data) - - -def test_draw_pattern_unknown_pattern_raises(): - data = bytearray(64 * 32 * 4) - buf = pixpat.Buffer([data], 'XRGB8888', 64, 32, [64 * 4]) - with pytest.raises(ValueError): - pixpat.draw_pattern(buf, 'bogus') - - -def test_draw_pattern_unknown_format_raises(): - data = bytearray(64 * 32 * 4) - buf = pixpat.Buffer([data], 'NOT_A_REAL_FORMAT', 64, 32, [64 * 4]) - with pytest.raises(pixpat.PixpatError): - pixpat.draw_pattern(buf) - - -def test_draw_pattern_readonly_buffer_raises(): - data = bytes(64 * 32 * 4) - buf = pixpat.Buffer([data], 'XRGB8888', 64, 32, [64 * 4]) - with pytest.raises((TypeError, BufferError)): - pixpat.draw_pattern(buf) - - -def _alloc_xrgb(w, h): - stride = w * 4 - return bytearray(stride * h), stride - - -def _first_pixel_bgr(data): - # BGR888-style byte order: pixpat XRGB8888 stores B at byte 0, - # G at byte 1, R at byte 2, X at byte 3. See - # README "Format names and byte order". - return data[0], data[1], data[2] - - -def test_draw_pattern_plain_red_str_params(): - w, h = 32, 16 - data, stride = _alloc_xrgb(w, h) - buf = pixpat.Buffer([data], 'XRGB8888', w, h, [stride]) - pixpat.draw_pattern(buf, 'plain', params='color=ff0000') - b, g, r = _first_pixel_bgr(data) - assert (b, g, r) == (0, 0, 0xFF) - - -def test_draw_pattern_plain_red_dict_params(): - w, h = 32, 16 - data, stride = _alloc_xrgb(w, h) - buf = pixpat.Buffer([data], 'XRGB8888', w, h, [stride]) - pixpat.draw_pattern(buf, 'plain', params={'color': 'ff0000'}) - b, g, r = _first_pixel_bgr(data) - assert (b, g, r) == (0, 0, 0xFF) - - -def test_draw_pattern_plain_alpha_passes_through_to_argb(): - w, h = 16, 8 - stride = w * 4 - data = bytearray(stride * h) - buf = pixpat.Buffer([data], 'ARGB8888', w, h, [stride]) - # 8-char form is alpha-first: AARRGGBB. - pixpat.draw_pattern(buf, 'plain', params={'color': '80112233'}) - # ARGB8888 byte order (LSB-first): B, G, R, A - assert (data[0], data[1], data[2], data[3]) == (0x33, 0x22, 0x11, 0x80) - - -def test_draw_pattern_plain_accepts_0x_prefix(): - w, h = 16, 8 - stride = w * 4 - data = bytearray(stride * h) - buf = pixpat.Buffer([data], 'XRGB8888', w, h, [stride]) - pixpat.draw_pattern(buf, 'plain', params={'color': '0xff0000'}) - assert (data[0], data[1], data[2]) == (0, 0, 0xFF) - - -def test_draw_pattern_plain_16bpc_rgb(): - """16-bpc form: 12 hex chars, RRRRGGGGBBBB. ABGR16161616 names - A-B-G-R MSB-first in a 64-bit word; stored little-endian, the - bytes are R lo, R hi, G lo, G hi, B lo, B hi, A lo, A hi.""" - w, h = 4, 2 - stride = w * 8 - data = bytearray(stride * h) - buf = pixpat.Buffer([data], 'ABGR16161616', w, h, [stride]) - pixpat.draw_pattern(buf, 'plain', params={'color': '0xfedc00000000'}) - # R=0xFEDC, G=0x0000, B=0x0000, A=0xFFFF (default opaque). - pix = bytes(data[:8]) - assert pix == b'\xdc\xfe\x00\x00\x00\x00\xff\xff' - - -def test_draw_pattern_plain_16bpc_argb(): - w, h = 4, 2 - stride = w * 8 - data = bytearray(stride * h) - buf = pixpat.Buffer([data], 'ABGR16161616', w, h, [stride]) - # 16-char form is alpha-first: AAAARRRRGGGGBBBB. - pixpat.draw_pattern(buf, 'plain', params={'color': '0x80007f000000ffff'}) - pix = bytes(data[:8]) - # R=0x7F00, G=0x0000, B=0xFFFF, A=0x8000. - assert pix == b'\x00\x7f\x00\x00\xff\xff\x00\x80' - - -def test_draw_pattern_plain_missing_color_raises(): - w, h = 32, 16 - data, stride = _alloc_xrgb(w, h) - buf = pixpat.Buffer([data], 'XRGB8888', w, h, [stride]) - with pytest.raises(pixpat.PixpatError): - pixpat.draw_pattern(buf, 'plain') - with pytest.raises(pixpat.PixpatError): - pixpat.draw_pattern(buf, 'plain', params='') - - -def test_draw_pattern_plain_malformed_color_raises(): - w, h = 32, 16 - data, stride = _alloc_xrgb(w, h) - buf = pixpat.Buffer([data], 'XRGB8888', w, h, [stride]) - for bad in ('color=zzzzzz', 'color=abc', 'color=', 'color=1234567'): - with pytest.raises(pixpat.PixpatError): - pixpat.draw_pattern(buf, 'plain', params=bad) - - -def test_draw_pattern_malformed_params_string_raises(): - """Top-level params parse failure (not pattern-specific).""" - w, h = 32, 16 - data, stride = _alloc_xrgb(w, h) - buf = pixpat.Buffer([data], 'XRGB8888', w, h, [stride]) - for bad in ('foo,bar', '=ff0000', ',color=ff0000'): - with pytest.raises(pixpat.PixpatError): - pixpat.draw_pattern(buf, 'plain', params=bad) - - -def test_draw_pattern_dict_params_with_separators_rejected_by_python(): - w, h = 32, 16 - data, stride = _alloc_xrgb(w, h) - buf = pixpat.Buffer([data], 'XRGB8888', w, h, [stride]) - with pytest.raises(ValueError): - pixpat.draw_pattern(buf, 'plain', params={'color': 'ff,00,00'}) - with pytest.raises(ValueError): - pixpat.draw_pattern(buf, 'plain', params={'col=or': 'ff0000'}) - - -def test_draw_pattern_unknown_params_keys_ignored(): - """Patterns silently ignore keys they don't read; unknown keys - alongside the recognised one still let the call succeed.""" - w, h = 32, 16 - data, stride = _alloc_xrgb(w, h) - buf = pixpat.Buffer([data], 'XRGB8888', w, h, [stride]) - pixpat.draw_pattern(buf, 'plain', params={'color': 'ff0000', 'unknown_key': '42'}) - b, g, r = _first_pixel_bgr(data) - assert (b, g, r) == (0, 0, 0xFF) - - -def test_draw_pattern_params_ignored_by_kmstest(): - """Patterns that don't read params accept arbitrary params strings.""" - w, h = 32, 16 - data, stride = _alloc_xrgb(w, h) - buf = pixpat.Buffer([data], 'XRGB8888', w, h, [stride]) - pixpat.draw_pattern(buf, 'kmstest', params={'whatever': 'foo'}) - assert any(b != 0 for b in data) - - -def _xrgb_pixel_at(data, stride, x, y): - off = y * stride + x * 4 - return data[off], data[off + 1], data[off + 2] # B, G, R - - -@pytest.mark.parametrize( - 'pattern,extra', - [ - ('hramp', {}), - ('vramp', {}), - ('dramp', {}), - ('zoneplate', {}), - ('checker', {}), - ('checker', {'params': 'cell=1'}), - ('checker', {'params': {'cell': '32'}}), - ], -) -def test_draw_pattern_phase3_smoke(pattern, extra): - w, h = 64, 32 - data, stride = _alloc_xrgb(w, h) - buf = pixpat.Buffer([data], 'XRGB8888', w, h, [stride]) - pixpat.draw_pattern(buf, pattern, **extra) - assert any(b != 0 for b in data) - - -def test_draw_pattern_checker_default_cell_first_pixel_white(): - w, h = 64, 32 - data, stride = _alloc_xrgb(w, h) - buf = pixpat.Buffer([data], 'XRGB8888', w, h, [stride]) - pixpat.draw_pattern(buf, 'checker') - # (0,0) is in the first cell -> white. - assert _xrgb_pixel_at(data, stride, 0, 0) == (0xFF, 0xFF, 0xFF) - # Default cell=8: (8,0) is the next cell horizontally -> black. - assert _xrgb_pixel_at(data, stride, 8, 0) == (0, 0, 0) - - -def test_draw_pattern_checker_cell1_alternates(): - w, h = 16, 8 - data, stride = _alloc_xrgb(w, h) - buf = pixpat.Buffer([data], 'XRGB8888', w, h, [stride]) - pixpat.draw_pattern(buf, 'checker', params={'cell': '1'}) - p00 = _xrgb_pixel_at(data, stride, 0, 0) - p10 = _xrgb_pixel_at(data, stride, 1, 0) - p01 = _xrgb_pixel_at(data, stride, 0, 1) - p11 = _xrgb_pixel_at(data, stride, 1, 1) - assert p00 != p10 - assert p00 != p01 - assert p00 == p11 # diagonal repeats - - -@pytest.mark.parametrize('cell', ['0', '-1', 'oops', '1.5']) -def test_draw_pattern_checker_invalid_cell_raises(cell): - w, h = 16, 8 - data, stride = _alloc_xrgb(w, h) - buf = pixpat.Buffer([data], 'XRGB8888', w, h, [stride]) - with pytest.raises(pixpat.PixpatError): - pixpat.draw_pattern(buf, 'checker', params={'cell': cell}) - - -def test_draw_pattern_hramp_stripes(): - """hramp: 4 horizontal stripes (R, G, B, gray), each ramping along x. - The right edge (x=W-1) hits 0xFF in the active channel(s).""" - w, h = 32, 16 # h=16 → stripes at rows [0,3], [4,7], [8,11], [12,15] - data, stride = _alloc_xrgb(w, h) - buf = pixpat.Buffer([data], 'XRGB8888', w, h, [stride]) - pixpat.draw_pattern(buf, 'hramp') - assert _xrgb_pixel_at(data, stride, w - 1, 0) == (0, 0, 0xFF) # R - assert _xrgb_pixel_at(data, stride, w - 1, 4) == (0, 0xFF, 0) # G - assert _xrgb_pixel_at(data, stride, w - 1, 8) == (0xFF, 0, 0) # B - assert _xrgb_pixel_at(data, stride, w - 1, h - 1) == (0xFF, 0xFF, 0xFF) # gray - # Left edge is the 0 end of every ramp. - for y in (0, 4, 8, 12): - assert _xrgb_pixel_at(data, stride, 0, y) == (0, 0, 0) - - -def test_draw_pattern_vramp_columns(): - """vramp: 4 vertical columns (R, G, B, gray), each ramping along y.""" - w, h = 16, 32 - data, stride = _alloc_xrgb(w, h) - buf = pixpat.Buffer([data], 'XRGB8888', w, h, [stride]) - pixpat.draw_pattern(buf, 'vramp') - # Bottom row hits max in the active channel(s). - assert _xrgb_pixel_at(data, stride, 0, h - 1) == (0, 0, 0xFF) # R - assert _xrgb_pixel_at(data, stride, 4, h - 1) == (0, 0xFF, 0) # G - assert _xrgb_pixel_at(data, stride, 8, h - 1) == (0xFF, 0, 0) # B - assert _xrgb_pixel_at(data, stride, 12, h - 1) == (0xFF, 0xFF, 0xFF) # gray - # Top row is the 0 end. - for x in (0, 4, 8, 12): - assert _xrgb_pixel_at(data, stride, x, 0) == (0, 0, 0) - # Within a column, luma is monotonic. - prev = -1 - for y in range(h): - b, g, r = _xrgb_pixel_at(data, stride, 12, y) # gray column - assert b == g == r - assert b >= prev - prev = b - - -def test_draw_pattern_zoneplate_center_white(): - """Even sizes don't sample exactly at the center; pick odd dimensions - so (W//2, H//2) is the center pixel where cos(0)=1 → white.""" - w, h = 65, 33 - data, stride = _alloc_xrgb(w, h) - buf = pixpat.Buffer([data], 'XRGB8888', w, h, [stride]) - pixpat.draw_pattern(buf, 'zoneplate') - b, g, r = _xrgb_pixel_at(data, stride, w // 2, h // 2) - # Center pixel: phase=0, gray ≈ 1.0. - assert b == g == r and b >= 0xFE - - -def test_convert_roundtrip_xrgb8888(): - w, h = 64, 32 - src = bytearray(w * h * 4) - mid = bytearray(w * h * 8) - dst = bytearray(w * h * 4) - pixpat.draw_pattern(pixpat.Buffer([src], 'XRGB8888', w, h, [w * 4]), 'smpte') - pixpat.convert( - pixpat.Buffer([mid], 'ABGR16161616', w, h, [w * 8]), - pixpat.Buffer([src], 'XRGB8888', w, h, [w * 4]), - ) - pixpat.convert( - pixpat.Buffer([dst], 'XRGB8888', w, h, [w * 4]), - pixpat.Buffer([mid], 'ABGR16161616', w, h, [w * 8]), - ) - assert src == dst - - -def test_convert_readonly_bytes_src(): - """Source planes may be read-only (bytes); the C library only reads src.""" - w, h = 64, 32 - src_writable = bytearray(w * h * 4) - pixpat.draw_pattern(pixpat.Buffer([src_writable], 'XRGB8888', w, h, [w * 4]), 'smpte') - src_readonly = bytes(src_writable) - dst = bytearray(w * h * 8) - pixpat.convert( - pixpat.Buffer([dst], 'ABGR16161616', w, h, [w * 8]), - pixpat.Buffer([src_readonly], 'XRGB8888', w, h, [w * 4]), - ) - assert any(b != 0 for b in dst) - - -def test_convert_dimension_mismatch_raises(): - w, h = 64, 32 - src = bytearray(w * h * 4) - dst = bytearray((w * 2) * h * 8) - with pytest.raises(ValueError, match='dimensions'): - pixpat.convert( - pixpat.Buffer([dst], 'ABGR16161616', w * 2, h, [w * 2 * 8]), - pixpat.Buffer([src], 'XRGB8888', w, h, [w * 4]), - ) - - -def test_draw_pattern_nv12_semi_planar(): - """Semi-planar NV12: separate Y and interleaved-UV planes.""" - w, h = 64, 32 - y = bytearray(w * h) - uv = bytearray(w * (h // 2)) - pixpat.draw_pattern(pixpat.Buffer([y, uv], 'NV12', w, h, [w, w]), 'smpte') - assert any(b != 0 for b in y) - assert any(b != 0 for b in uv) - - -def test_draw_pattern_yuv420_three_planes(): - """Fully planar YUV420: Y, U and V all at separate plane pointers.""" - w, h = 64, 32 - yp = bytearray(w * h) - up = bytearray((w // 2) * (h // 2)) - vp = bytearray((w // 2) * (h // 2)) - pixpat.draw_pattern( - pixpat.Buffer([yp, up, vp], 'YUV420', w, h, [w, w // 2, w // 2]), - 'smpte', - ) - assert any(b != 0 for b in yp) - assert any(b != 0 for b in up) - assert any(b != 0 for b in vp) - - -def test_convert_xrgb_to_nv12_writes_both_planes(): - """Convert into a multi-plane destination must populate every plane.""" - w, h = 64, 32 - src = bytearray(w * h * 4) - pixpat.draw_pattern(pixpat.Buffer([src], 'XRGB8888', w, h, [w * 4]), 'smpte') - y = bytearray(w * h) - uv = bytearray(w * (h // 2)) - pixpat.convert( - pixpat.Buffer([y, uv], 'NV12', w, h, [w, w]), - pixpat.Buffer([src], 'XRGB8888', w, h, [w * 4]), - ) - assert any(b != 0 for b in y) - assert any(b != 0 for b in uv) - - -def test_convert_yuv420_src_three_planes(): - """Planar YUV is a supported convert source — the C library reads - all three plane pointers, so make sure the binding wires them up.""" - w, h = 64, 32 - yp = bytearray(w * h) - up = bytearray((w // 2) * (h // 2)) - vp = bytearray((w // 2) * (h // 2)) - pixpat.draw_pattern( - pixpat.Buffer([yp, up, vp], 'YUV420', w, h, [w, w // 2, w // 2]), - 'smpte', - ) - dst = bytearray(w * h * 4) - pixpat.convert( - pixpat.Buffer([dst], 'XRGB8888', w, h, [w * 4]), - pixpat.Buffer([yp, up, vp], 'YUV420', w, h, [w, w // 2, w // 2]), - ) - assert any(b != 0 for b in dst) - - -def test_convert_bayer_src(): - """Bayer formats are supported as a convert source (decoded by - nearest-neighbor demosaic per the public header docstring).""" - w, h = 64, 32 - src = bytearray(w * h) - pixpat.draw_pattern(pixpat.Buffer([src], 'SRGGB8', w, h, [w]), 'smpte') - dst = bytearray(w * h * 4) - pixpat.convert( - pixpat.Buffer([dst], 'XRGB8888', w, h, [w * 4]), - pixpat.Buffer([src], 'SRGGB8', w, h, [w]), - ) - assert any(b != 0 for b in dst) - - -def test_convert_roundtrip_yuyv(): - """YUYV (packed 4:2:2) — chroma is averaged on write and replicated on - read, so the second leg must not change the buffer.""" - w, h = 64, 32 - src = bytearray(w * h * 2) - mid = bytearray(w * h * 8) - dst = bytearray(w * h * 2) - pixpat.draw_pattern(pixpat.Buffer([src], 'YUYV', w, h, [w * 2]), 'smpte') - pixpat.convert( - pixpat.Buffer([mid], 'AVUY16161616', w, h, [w * 8]), - pixpat.Buffer([src], 'YUYV', w, h, [w * 2]), - ) - pixpat.convert( - pixpat.Buffer([dst], 'YUYV', w, h, [w * 2]), - pixpat.Buffer([mid], 'AVUY16161616', w, h, [w * 8]), - ) - assert src == dst - - -def test_convert_roundtrip_y8(): - """Y8 grayscale — chroma is discarded on write and synthesized on read, - Y values themselves should roundtrip exactly.""" - w, h = 64, 32 - src = bytearray(w * h) - mid = bytearray(w * h * 8) - dst = bytearray(w * h) - pixpat.draw_pattern(pixpat.Buffer([src], 'Y8', w, h, [w]), 'smpte') - pixpat.convert( - pixpat.Buffer([mid], 'AVUY16161616', w, h, [w * 8]), - pixpat.Buffer([src], 'Y8', w, h, [w]), - ) - pixpat.convert( - pixpat.Buffer([dst], 'Y8', w, h, [w]), - pixpat.Buffer([mid], 'AVUY16161616', w, h, [w * 8]), - ) - assert src == dst - - -def test_convert_roundtrip_nv12(): - """NV12 semi-planar 4:2:0 — chroma averaged into a 2x2 block on write, - replicated on read; second leg is a no-op.""" - w, h = 64, 32 - y_src = bytearray(w * h) - uv_src = bytearray(w * (h // 2)) - mid = bytearray(w * h * 8) - y_dst = bytearray(w * h) - uv_dst = bytearray(w * (h // 2)) - pixpat.draw_pattern(pixpat.Buffer([y_src, uv_src], 'NV12', w, h, [w, w]), 'smpte') - pixpat.convert( - pixpat.Buffer([mid], 'AVUY16161616', w, h, [w * 8]), - pixpat.Buffer([y_src, uv_src], 'NV12', w, h, [w, w]), - ) - pixpat.convert( - pixpat.Buffer([y_dst, uv_dst], 'NV12', w, h, [w, w]), - pixpat.Buffer([mid], 'AVUY16161616', w, h, [w * 8]), - ) - assert y_src == y_dst - assert uv_src == uv_dst - - -def test_convert_roundtrip_xv15(): - """P030 semi-planar 4:2:0, 10 bits packed 3-per-32-bit. Width and height - chosen so the Y plane fits 3 samples per word and chroma is on a 2x2 grid.""" - w, h = 96, 32 # multiples of 3 (Y group) and 2 (v_sub) - y_words = (w // 3) * h - uv_words = (w // 3 // 2) * (h // 2) - y_src = bytearray(y_words * 4) - uv_src = bytearray(uv_words * 8) - mid = bytearray(w * h * 8) - y_dst = bytearray(y_words * 4) - uv_dst = bytearray(uv_words * 8) - pixpat.draw_pattern( - pixpat.Buffer([y_src, uv_src], 'P030', w, h, [(w // 3) * 4, (w // 3 // 2) * 8]), - 'smpte', - ) - pixpat.convert( - pixpat.Buffer([mid], 'AVUY16161616', w, h, [w * 8]), - pixpat.Buffer([y_src, uv_src], 'P030', w, h, [(w // 3) * 4, (w // 3 // 2) * 8]), - ) - pixpat.convert( - pixpat.Buffer([y_dst, uv_dst], 'P030', w, h, [(w // 3) * 4, (w // 3 // 2) * 8]), - pixpat.Buffer([mid], 'AVUY16161616', w, h, [w * 8]), - ) - assert y_src == y_dst - assert uv_src == uv_dst - - -def test_convert_roundtrip_x403(): - """T430 fully-planar 4:4:4 with 3-samples-per-32-bit packing on each - plane; no chroma subsampling so first leg is already lossless.""" - w, h = 96, 32 # width must be a multiple of 3 - plane_words = (w // 3) * h - y_src = bytearray(plane_words * 4) - cb_src = bytearray(plane_words * 4) - cr_src = bytearray(plane_words * 4) - mid = bytearray(w * h * 8) - y_dst = bytearray(plane_words * 4) - cb_dst = bytearray(plane_words * 4) - cr_dst = bytearray(plane_words * 4) - strides = [(w // 3) * 4] * 3 - pixpat.draw_pattern( - pixpat.Buffer([y_src, cb_src, cr_src], 'T430', w, h, strides), - 'smpte', - ) - pixpat.convert( - pixpat.Buffer([mid], 'AVUY16161616', w, h, [w * 8]), - pixpat.Buffer([y_src, cb_src, cr_src], 'T430', w, h, strides), - ) - pixpat.convert( - pixpat.Buffer([y_dst, cb_dst, cr_dst], 'T430', w, h, strides), - pixpat.Buffer([mid], 'AVUY16161616', w, h, [w * 8]), - ) - assert y_src == y_dst - assert cb_src == cb_dst - assert cr_src == cr_dst - - -def test_strides_length_mismatch_multi_plane_raises(): - """Strides count must match planes count — caught in Python before C.""" - w, h = 64, 32 - y = bytearray(w * h) - uv = bytearray(w * (h // 2)) - with pytest.raises(ValueError, match='strides'): - pixpat.draw_pattern( - pixpat.Buffer([y, uv], 'NV12', w, h, [w]), # 1 stride, 2 planes - 'smpte', - ) diff --git a/pixpat-python/tests/test_numpy.py b/pixpat-python/tests/test_numpy.py deleted file mode 100644 index 6dbd7bb..0000000 --- a/pixpat-python/tests/test_numpy.py +++ /dev/null @@ -1,110 +0,0 @@ -"""Empirical check that pixpat works correctly with numpy. - -pixpat does not depend on numpy. This file proves that ndarrays work -as ``Buffer`` planes through the buffer protocol, in the shapes a -caller would typically use. Skipped when numpy is missing. -""" - -import pixpat -import pytest - -np = pytest.importorskip('numpy') - - -def test_xrgb8888_into_uint8_3d_ndarray(): - w, h = 64, 32 - arr = np.zeros((h, w, 4), dtype=np.uint8) - pixpat.draw_pattern(pixpat.Buffer([arr], 'XRGB8888', w, h, [arr.strides[0]]), 'smpte') - assert arr.any() - - -def test_abgr16161616_into_uint16_3d_ndarray(): - w, h = 64, 32 - arr = np.zeros((h, w, 4), dtype=np.uint16) - pixpat.draw_pattern(pixpat.Buffer([arr], 'ABGR16161616', w, h, [arr.strides[0]]), 'smpte') - assert arr.any() - assert arr.dtype == np.uint16 - - -def test_nv12_into_two_ndarrays(): - w, h = 64, 32 - y = np.zeros((h, w), dtype=np.uint8) - uv = np.zeros((h // 2, w), dtype=np.uint8) # interleaved U,V at half-height - pixpat.draw_pattern( - pixpat.Buffer( - planes=[y, uv], - fmt='NV12', - width=w, - height=h, - strides=[y.strides[0], uv.strides[0]], - ), - 'smpte', - ) - assert y.any() - assert uv.any() - - -def test_convert_with_readonly_ndarray_src(): - """A numpy view with writeable=False must work as the convert source.""" - w, h = 64, 32 - src = np.zeros((h, w, 4), dtype=np.uint8) - pixpat.draw_pattern(pixpat.Buffer([src], 'XRGB8888', w, h, [src.strides[0]]), 'smpte') - src.flags.writeable = False - - dst = np.zeros((h, w, 4), dtype=np.uint16) - pixpat.convert( - pixpat.Buffer([dst], 'ABGR16161616', w, h, [dst.strides[0]]), - pixpat.Buffer([src], 'XRGB8888', w, h, [src.strides[0]]), - ) - assert dst.any() - - -def test_roundtrip_ndarrays_byte_for_byte(): - """End-to-end XRGB8888 -> ABGR16161616 -> XRGB8888 with ndarrays only.""" - w, h = 64, 32 - src = np.zeros((h, w, 4), dtype=np.uint8) - mid = np.zeros((h, w, 4), dtype=np.uint16) - dst = np.zeros((h, w, 4), dtype=np.uint8) - - pixpat.draw_pattern(pixpat.Buffer([src], 'XRGB8888', w, h, [src.strides[0]]), 'smpte') - pixpat.convert( - pixpat.Buffer([mid], 'ABGR16161616', w, h, [mid.strides[0]]), - pixpat.Buffer([src], 'XRGB8888', w, h, [src.strides[0]]), - ) - pixpat.convert( - pixpat.Buffer([dst], 'XRGB8888', w, h, [dst.strides[0]]), - pixpat.Buffer([mid], 'ABGR16161616', w, h, [mid.strides[0]]), - ) - assert np.array_equal(src, dst) - - -def test_writable_view_via_view_method(): - """`arr.view()` shares memory and is writable by default — drawing into - it must update the original.""" - w, h = 64, 32 - arr = np.zeros((h, w, 4), dtype=np.uint8) - view = arr.view() - pixpat.draw_pattern(pixpat.Buffer([view], 'XRGB8888', w, h, [view.strides[0]]), 'smpte') - assert arr.any() # the original sees the writes - - -def test_noncontiguous_source_smoke(): - """Discovery test: what happens if the caller hands us a non-C-contiguous - array? pixpat reads ``stride`` bytes per row, so any row-contiguous - layout where ``arr.strides[0]`` is the row stride should work; a - transposed array (where rows aren't even contiguous) is misuse. - - This test passes a *valid* row-contiguous slice — the top half of a - bigger image, taken via slicing — and expects success. It exists to - document the contract: 'rows must be contiguous in memory; pass - arr.strides[0] as the stride'. - """ - big = np.zeros((64, 64, 4), dtype=np.uint8) - top_half = big[:32] # shape (32, 64, 4); rows still C-contiguous - assert top_half.strides[0] == 64 * 4 - pixpat.draw_pattern( - pixpat.Buffer([top_half], 'XRGB8888', 64, 32, [top_half.strides[0]]), - 'smpte', - ) - assert big[:32].any() - assert not big[32:].any() # the other half stayed untouched diff --git a/pixpat-python/tests/test_threading.py b/pixpat-python/tests/test_threading.py deleted file mode 100644 index 99d4620..0000000 --- a/pixpat-python/tests/test_threading.py +++ /dev/null @@ -1,187 +0,0 @@ -"""Multi-threaded == single-threaded parity tests. - -The chosen ``num_threads`` is supposed to be transparent: the output -bytes must match the single-threaded output exactly. Anything else is -a threading bug. - -These tests parametrize across one format per writer template so that -every code path in the dispatcher is covered. -""" - -import pytest - -import pixpat - - -# Width must be divisible by 3 (T430/XYYY2101010 pack 3-per-32-bit), 4 -# (Bayer 10P packs 4 pixels per group), and 2 (h_sub). -# Height must be divisible by 2 (v_sub for 4:2:0 formats). -# -# Two sizes: a small one for fast format-coverage parametrization, and -# a larger one (~HD-width) where worker-stripe alignment can interact -# with the writers' chroma logic in non-obvious ways. -SMALL = (192, 64) -LARGE = (1920, 64) - - -def _alloc(fmt, w, h): - """Return (planes, strides) sized for `fmt` at (w, h).""" - if fmt in ('NV12', 'NV21'): # semi-planar 4:2:0 - return [bytearray(w * h), bytearray(w * (h // 2))], [w, w] - if fmt in ('NV16', 'NV61'): # semi-planar 4:2:2 - return [bytearray(w * h), bytearray(w * h)], [w, w] - if fmt == 'P030': # semi-planar 4:2:0, 10-bit packed 3-per-32 - ys, uvs = (w // 3) * 4, (w // 3 // 2) * 8 - return [bytearray(ys * h), bytearray(uvs * (h // 2))], [ys, uvs] - if fmt == 'P230': # semi-planar 4:2:2, 10-bit packed 3-per-32 - ys, uvs = (w // 3) * 4, (w // 3 // 2) * 8 - return [bytearray(ys * h), bytearray(uvs * h)], [ys, uvs] - if fmt in ('YUV420', 'YVU420'): # planar 4:2:0 - return ( - [bytearray(w * h), bytearray((w // 2) * (h // 2)), bytearray((w // 2) * (h // 2))], - [w, w // 2, w // 2], - ) - if fmt in ('YUV422', 'YVU422'): # planar 4:2:2 - return ( - [bytearray(w * h), bytearray((w // 2) * h), bytearray((w // 2) * h)], - [w, w // 2, w // 2], - ) - if fmt in ('YUV444', 'YVU444'): # planar 4:4:4 - return [bytearray(w * h)] * 3, [w] * 3 - if fmt == 'T430': # planar packed 4:4:4, 10-bit - s = (w // 3) * 4 - return [bytearray(s * h)] * 3, [s] * 3 - if fmt in ('Y8', 'R8', 'RGB332'): - return [bytearray(w * h)], [w] - if fmt == 'XYYY2101010': - s = (w // 3) * 4 - return [bytearray(s * h)], [s] - if fmt in ('YUYV', 'YVYU', 'UYVY', 'VYUY'): - return [bytearray(w * h * 2)], [w * 2] - if fmt in ('Y210', 'Y212', 'Y216'): - # 4:2:2, two pixels per 64-bit word. - return [bytearray(w * h * 4)], [w * 4] - if fmt == 'VUY888': - # 24-bit packed YUV, 3 bytes per pixel (storage uint32_t). - return [bytearray(w * h * 3)], [w * 3] - if fmt in ('XVUY2101010', 'XVUY8888'): - return [bytearray(w * h * 4)], [w * 4] - if fmt in ('AVUY16161616', 'ABGR16161616'): - return [bytearray(w * h * 8)], [w * 8] - # MIPI CSI-2 byte packing (Bayer SXXXX10P/12P and grayscale Y10P/Y12P). - if fmt.endswith('10P'): - s = (w // 4) * 5 - return [bytearray(s * h)], [s] - if fmt.endswith('12P'): - s = (w // 2) * 3 - return [bytearray(s * h)], [s] - # Plain Bayer - if fmt.startswith('S') and fmt[-1] == '8': - return [bytearray(w * h)], [w] - if fmt.startswith('S'): - return [bytearray(w * h * 2)], [w * 2] - # 16-bit RGB - if fmt.endswith('565') or fmt.endswith('1555') or fmt.endswith('4444'): - return [bytearray(w * h * 2)], [w * 2] - # 32-bit RGB (8888 / 2101010 / 1010102 / 888 in 32-bit storage) - return [bytearray(w * h * 4)], [w * 4] - - -# One format per writer template in pixpat.cpp. -DRAW_FORMATS = [ - 'XRGB8888', # ARGB_Writer - 'RGB565', # ARGB_Writer (16-bit) - 'ABGR16161616', # ARGB_Writer (normalized wide) - 'XVUY2101010', # YUV_Writer - 'AVUY16161616', # YUV_Writer (normalized wide) - 'YUYV', # YUVPackedWriter - 'NV12', # YUVSemiPlanarWriter v_sub=2 - 'NV16', # YUVSemiPlanarWriter v_sub=1 - 'P030', # YUVSemiPlanarWriter v_sub=2, 10-bit packed - 'P230', # YUVSemiPlanarWriter v_sub=1, 10-bit packed - 'YUV420', # YUVPlanarWriter v_sub=2 - 'YUV422', # YUVPlanarWriter v_sub=1 - 'YUV444', # YUVPlanarWriter no subsampling - 'T430', # YUVPlanarPackedWriter - 'Y8', # Y_Writer (1 sample per byte) - 'XYYY2101010', # Y_Writer (3 samples per word) - 'Y10P', # GrayPacked_Writer (CSI-2 byte packing, Y-only) - 'Y210', # YUVPackedWriter with X-padding entries (4:2:2 in 64-bit word) - 'R8', # MonoRGB_Writer (single R channel) - 'SRGGB8', # Bayer_Writer - 'SRGGB10', # Bayer_Writer - 'SRGGB10P', # BayerPacked_Writer - 'SRGGB12P', # BayerPacked_Writer -] - -# Convert sources: same minus Bayer (Bayer formats have no read path). -CONVERT_FORMATS = [f for f in DRAW_FORMATS if not f.startswith('S')] - - -@pytest.mark.parametrize('fmt', DRAW_FORMATS) -@pytest.mark.parametrize( - 'pattern', - ['smpte', 'kmstest', 'checker', 'hramp', 'vramp', 'dramp', 'zoneplate'], -) -@pytest.mark.parametrize('size', [SMALL, LARGE], ids=['small', 'large']) -def test_draw_pattern_threaded_matches_single(fmt, pattern, size): - """draw_pattern with num_threads=4 must produce identical bytes - to num_threads=1 for every supported format.""" - w, h = size - s_planes, strides = _alloc(fmt, w, h) - pixpat.draw_pattern(pixpat.Buffer(s_planes, fmt, w, h, strides), pattern, num_threads=1) - - t_planes, _ = _alloc(fmt, w, h) - pixpat.draw_pattern(pixpat.Buffer(t_planes, fmt, w, h, strides), pattern, num_threads=4) - - for i, (s, t) in enumerate(zip(s_planes, t_planes)): - assert s == t, f'{fmt} ({pattern}, {w}x{h}): plane {i} differs threaded vs single' - - -@pytest.mark.parametrize('fmt', ['XRGB8888', 'NV12', 'YUV420', 'BGR888']) -@pytest.mark.parametrize('size', [SMALL, LARGE], ids=['small', 'large']) -def test_draw_pattern_plain_threaded_matches_single(fmt, size): - """draw_pattern with params= must also be bit-identical across - thread counts. Spot-check a few representative formats.""" - w, h = size - params = {'color': '12ab34'} - s_planes, strides = _alloc(fmt, w, h) - pixpat.draw_pattern( - pixpat.Buffer(s_planes, fmt, w, h, strides), 'plain', num_threads=1, params=params - ) - - t_planes, _ = _alloc(fmt, w, h) - pixpat.draw_pattern( - pixpat.Buffer(t_planes, fmt, w, h, strides), 'plain', num_threads=4, params=params - ) - - for i, (s, t) in enumerate(zip(s_planes, t_planes)): - assert s == t, f'{fmt} (plain, {w}x{h}): plane {i} differs threaded vs single' - - -@pytest.mark.parametrize('src_fmt', CONVERT_FORMATS) -@pytest.mark.parametrize('dst_fmt', ['ABGR16161616', 'AVUY16161616', 'XRGB8888', 'NV12', 'YUV420']) -@pytest.mark.parametrize('size', [SMALL, LARGE], ids=['small', 'large']) -def test_convert_threaded_matches_single(src_fmt, dst_fmt, size): - """convert with num_threads=4 must produce identical bytes - to num_threads=1.""" - w, h = size - src_planes, src_strides = _alloc(src_fmt, w, h) - pixpat.draw_pattern(pixpat.Buffer(src_planes, src_fmt, w, h, src_strides), 'smpte') - - s_dst_planes, dst_strides = _alloc(dst_fmt, w, h) - pixpat.convert( - pixpat.Buffer(s_dst_planes, dst_fmt, w, h, dst_strides), - pixpat.Buffer(src_planes, src_fmt, w, h, src_strides), - num_threads=1, - ) - - t_dst_planes, _ = _alloc(dst_fmt, w, h) - pixpat.convert( - pixpat.Buffer(t_dst_planes, dst_fmt, w, h, dst_strides), - pixpat.Buffer(src_planes, src_fmt, w, h, src_strides), - num_threads=4, - ) - - for i, (s, t) in enumerate(zip(s_dst_planes, t_dst_planes)): - assert s == t, f'{src_fmt}->{dst_fmt} ({w}x{h}): plane {i} differs threaded vs single' |
