diff options
Diffstat (limited to 'pixpat-python')
| -rw-r--r-- | pixpat-python/pixpat/__init__.py | 489 | ||||
| -rw-r--r-- | pixpat-python/pixpat/_lib/.gitkeep | 3 | ||||
| -rw-r--r-- | pixpat-python/pixpat/_native.py | 203 | ||||
| -rwxr-xr-x | pixpat-python/scripts/build_wheel.sh | 39 | ||||
| -rwxr-xr-x | pixpat-python/scripts/perf_test.py | 434 | ||||
| -rw-r--r-- | pixpat-python/tests/test_basic.py | 539 | ||||
| -rw-r--r-- | pixpat-python/tests/test_numpy.py | 110 | ||||
| -rw-r--r-- | pixpat-python/tests/test_threading.py | 187 |
8 files changed, 2004 insertions, 0 deletions
diff --git a/pixpat-python/pixpat/__init__.py b/pixpat-python/pixpat/__init__.py new file mode 100644 index 0000000..6531662 --- /dev/null +++ b/pixpat-python/pixpat/__init__.py @@ -0,0 +1,489 @@ +"""Python bindings for libpixpat. + +Loads the bundled shared library via ctypes — no CPython extension. Works on +any CPython >= 3.9 for the wheel's target architecture. + +The library draws test patterns and converts pixel data directly into +caller-owned pixel buffers, in a wide range of pixel formats +(planar/semi-planar/packed YUV, RGB, raw, …). The caller is responsible for +allocating each plane with the correct size and stride for the chosen format. + +Pixel data is described by :class:`Buffer`, a passive struct mirroring the +C ``pixpat_buffer``: format name, width, height, one writable buffer per +plane, and one row stride per plane. Use :func:`draw_pattern` to fill a +buffer with a test pattern, and :func:`convert` to convert pixel data +between formats. + +Format names follow the convention used by `kms++` and `pixutils` +(e.g. ``"XRGB8888"``, ``"NV12"``, ``"YUYV"``) — not the DRM/V4L2 +four-character codes (``"XR24"``, etc.). + +Example: + >>> import pixpat + >>> width, height = 1920, 1080 + >>> stride = width * 4 # XRGB8888: 4 bytes per pixel + >>> data = bytearray(stride * height) + >>> buf = pixpat.Buffer(planes=[data], fmt="XRGB8888", + ... width=width, height=height, strides=[stride]) + >>> pixpat.draw_pattern(buf, "smpte") + +Use :func:`supported_formats` to enumerate the format names accepted by +``Buffer.fmt``, and :func:`is_supported` to test a single format. + +Using numpy buffers +------------------- + +pixpat does not import numpy and does not depend on it, but any +C-contiguous ``numpy.ndarray`` works as a plane via Python's buffer +protocol. The caller is responsible for matching the array's dtype and +shape to the pixel format and for passing the correct row stride: + + >>> import numpy as np + >>> arr = np.zeros((height, width, 4), dtype=np.uint8) + >>> stride = arr.strides[0] # bytes per row + >>> buf = pixpat.Buffer(planes=[arr], fmt="XRGB8888", + ... width=width, height=height, strides=[stride]) + >>> pixpat.draw_pattern(buf, "smpte") + +For multi-plane formats like ``"NV12"`` pass one ndarray per plane (e.g. +``(h, w)`` uint8 for Y and ``(h//2, w)`` uint8 for the interleaved UV +plane). + +If you already hold all planes in a single contiguous buffer — the layout +OpenCV uses for NV12, an ``(h * 3 // 2, w)`` uint8 array with Y on top +and the interleaved UV plane below — slice it into per-plane views and +pass those:: + + >>> nv12 = np.zeros((h * 3 // 2, w), dtype=np.uint8) + >>> y, uv = nv12[:h], nv12[h:].reshape(h // 2, w) + >>> pixpat.Buffer([y, uv], "NV12", w, h, + ... [y.strides[0], uv.strides[0]]) + +The source side of :func:`convert` accepts read-only buffers (``bytes``, +``arr.view()`` with ``writeable=False``, mmap'd files, …). The +destination side must always be writable. +""" + +import ctypes +from dataclasses import dataclass, field +from enum import IntEnum +from typing import Mapping, Optional, Sequence, Union + +from ._native import ( + _Buffer, + _ConvertOpts, + _PatternOpts, + _PinnedBuffers, + _fill_buffer, + _lib, +) + +_VALID_PATTERNS = frozenset( + { + 'kmstest', + 'smpte', + 'plain', + 'checker', + 'hramp', + 'vramp', + 'hbar', + 'vbar', + 'dramp', + 'zoneplate', + } +) + + +class Rec(IntEnum): + """YCbCr color encoding standard used for YUV output formats. + + Selects the matrix used to convert from internal RGB to YUV. Has no + effect when drawing into an RGB or raw format. + + Attributes: + BT601: ITU-R BT.601 (standard definition). + BT709: ITU-R BT.709 (HD). + BT2020: ITU-R BT.2020 (UHD / HDR, non-constant luminance). + """ + + BT601 = 0 + BT709 = 1 + BT2020 = 2 + + +class Range(IntEnum): + """Quantization range used for YUV output formats. + + Attributes: + LIMITED: "TV" / studio range — Y in [16, 235], C in [16, 240] + (scaled to bit depth). What most video pipelines expect. + FULL: "PC" / full range — every component uses the full code range + (e.g. [0, 255] for 8-bit). + """ + + LIMITED = 0 + FULL = 1 + + +class PixpatError(RuntimeError): + """Raised when the underlying ``pixpat_*`` call fails. + + Most commonly indicates an unknown format name or buffer dimensions / + plane layout that the format does not allow (e.g. odd width for a + horizontally-subsampled YUV format). + """ + + +@dataclass +class Buffer: + """Plane data + format + dimensions, mirroring the C ``pixpat_buffer``. + + A passive struct: no allocation, no format knowledge, no + numpy/cv2 awareness. The caller decides plane shapes and strides + based on its own format knowledge (e.g. via ``pixutils``). + + Attributes: + planes: One buffer-protocol object per plane (e.g. ``bytearray``, + ``array.array``, ``numpy.ndarray``, ``mmap.mmap``, + ``memoryview``). Up to 4 planes are supported. Each plane + must hold at least ``strides[i] * plane_height`` bytes, + where ``plane_height`` is ``height`` for the main plane and + ``height // vertical_subsampling`` for chroma planes (e.g. + ``height // 2`` for the UV plane of ``NV12``). For + destination buffers each plane must be writable; for the + source side of :func:`convert` read-only objects (such as + ``bytes``) are accepted. + fmt: Pixel-format name; see :func:`supported_formats`. + width: Image width in pixels. Some formats constrain this — for + chroma-subsampled YUV formats it must be a multiple of the + horizontal subsampling factor (e.g. 2 for ``NV12``), and + packed formats add a further multiple from their + pixel-group size (e.g. 6 for ``P030``, 4 for ``SBGGR10P``). + height: Image height in pixels. For vertically-subsampled + formats (e.g. ``NV12``, ``YUV420``) it must be a multiple + of the vertical subsampling factor. + strides: Per-plane row stride in bytes. Must have the same + length as ``planes``. Strides larger than the minimum row + size are allowed. + """ + + planes: Sequence + fmt: str + width: int + height: int + strides: Sequence[int] = field(default_factory=list) + + +def supported_formats() -> list[str]: + """Return the list of pixel-format names accepted by :class:`Buffer`. + + Format names follow the convention used by `kms++` and `pixutils` + (e.g. ``"XRGB8888"``, ``"NV12"``, ``"YUYV"``) — not the DRM/V4L2 + four-character codes. + """ + n = _lib.pixpat_format_count() + return [_lib.pixpat_format_name(i).decode('ascii') for i in range(n)] + + +def is_supported(fmt: str) -> bool: + """Return whether ``fmt`` is a known pixel-format name. + + Equivalent to ``fmt in supported_formats()`` but cheaper — it does not + materialize the full list. + """ + return bool(_lib.pixpat_format_supported(fmt.encode('ascii'))) + + +def _serialize_pattern_params( + params: Optional[Union[str, Mapping[str, object]]], +) -> Optional[bytes]: + """Encode `params` for the C ABI. + + Accepts a ready-made string (passed through verbatim) or a mapping of + string keys to stringifiable values, which is serialized to the + ``"key=val,key=val"`` form pixpat parses on the C side. Returns None + if there is nothing to pass. + """ + if params is None: + return None + if isinstance(params, str): + return params.encode('ascii') + if isinstance(params, Mapping): + items = [] + for k, v in params.items(): + if not isinstance(k, str): + raise TypeError(f'pattern params keys must be str, got {type(k).__name__}') + sv = str(v) + if ',' in k or '=' in k: + raise ValueError(f'pattern params key contains , or =: {k!r}') + if ',' in sv or '=' in sv: + raise ValueError(f'pattern params value contains , or =: {sv!r}') + items.append(f'{k}={sv}') + return ','.join(items).encode('ascii') + raise TypeError(f'pattern params must be None, str, or a Mapping, got {type(params).__name__}') + + +def draw_pattern( + dst: Buffer, + pattern: Optional[str] = None, + *, + rec: Rec = Rec.BT601, + color_range: Range = Range.LIMITED, + num_threads: int = 0, + params: Optional[Union[str, Mapping[str, object]]] = None, +) -> None: + """Draw a test pattern into ``dst``. + + Args: + dst: Destination buffer; all planes must be writable. + pattern: Name of the pattern to draw. ``None`` selects the default + (equivalent to ``"kmstest"``). Recognized values: + + * ``"kmstest"`` — color gradients with ramps, in the style of + the original ``kmstest`` tool. The default. + * ``"smpte"`` — SMPTE RP 219-1 color bars (with PLUGE). + * ``"plain"`` — solid color fill from ``params["color"]``. + See `params` below. + * ``"checker"`` — black/white checkerboard. Optional + ``params["cell"]`` (decimal positive integer, default 8) + sets the cell size in pixels. ``"cell": "1"`` is a + 1-pixel chroma-subsampling stress test. + * ``"hramp"`` — four horizontal stripes (R, G, B, gray), + each a 0..max ramp along x. Combined per-channel and + luma quantization check. + * ``"vramp"`` — same as ``"hramp"`` rotated 90°: four + vertical columns ramping along y. + * ``"hbar"`` — horizontal bar (full image width, narrow + along y) over a black background. Required + ``params["pos"]`` (signed integer, top edge in pixels); + optional ``params["width"]`` (positive integer, default + 32). The bar is split into seven equal-width regions + colored white/red/white/green/white/blue/white. + * ``"vbar"`` — same as ``"hbar"`` rotated 90°: vertical + bar with ``pos`` measured along x. + * ``"dramp"`` — diagonal RGB ramp (R on x, G on y, + B on x+y). + * ``"zoneplate"`` — centered radial cosine pattern, + frequency ramping from DC at the center to Nyquist at + the longer edge. Useful for spotting scaling/aliasing. + + ``"smpte"`` is defined by the spec in BT.709 / Limited. + Pass ``rec=Rec.BT709, color_range=Range.LIMITED`` for + spec-correct output; other settings produce visibly-wrong + colors when drawing into RGB sinks (the caller's matrix is + applied to BT.709-encoded values). Callers are trusted — + pixpat does not silently override the spec. + rec: YCbCr matrix for YUV formats. Ignored for RGB / raw formats. + Defaults to :attr:`Rec.BT601`. + color_range: Quantization range for YUV formats. Ignored for + RGB / raw formats. Defaults to :attr:`Range.LIMITED`. + num_threads: Worker-thread count. ``0`` selects a sensible + default (one per online CPU, capped to a sane maximum); + ``1`` runs single-threaded with no thread-spawn overhead; + ``N > 1`` uses exactly ``N`` workers. Defaults to ``0``. + Output is bit-identical regardless of the chosen count. + params: Optional pattern-specific parameters. Either a mapping + (``{"color": "ff0000"}``) — serialized to ``"color=ff0000"`` + for the C ABI — or a raw ``"key=val,key=val"`` string. + Unknown keys are silently ignored; patterns that don't read + params (``kmstest``, ``smpte``) ignore this entirely. + Per-pattern keys: + + * ``"plain"`` reads ``"color"`` as a hex RGB(A) string with + an optional ``"0x"`` prefix. The hex-digit count selects + the layout: 6 → 8-bit ``RRGGBB``, 8 → 8-bit ``AARRGGBB`` + (alpha first), 12 → 16-bit ``RRRRGGGGBBBB``, 16 → 16-bit + ``AAAARRRRGGGGBBBB``. Missing or malformed ``"color"`` + raises :class:`PixpatError`. + * ``"checker"`` reads optional ``"cell"`` as a positive + decimal integer; default 8. A non-positive or non-numeric + value raises :class:`PixpatError`. + * ``"hbar"`` / ``"vbar"`` read required ``"pos"`` (signed + decimal integer, top/left edge of the bar in pixels; + negative values clip at the edge) and optional ``"width"`` + (positive decimal integer, bar thickness; default 32). + Missing/non-numeric ``pos`` or non-positive ``width`` + raises :class:`PixpatError`. + + Raises: + ValueError: ``dst.planes`` exceeds the maximum plane count, + ``dst.strides`` length does not match ``dst.planes``, + ``pattern`` is not one of the recognized names, or a + ``params`` mapping value contains a forbidden ``,`` or ``=``. + TypeError: A plane is read-only (e.g. ``bytes``, a read-only + ``memoryview``), or ``params`` has an unsupported type. + PixpatError: The underlying C call failed — typically an unknown + ``fmt``, dimensions / strides incompatible with the format, + or pattern parameters that the pattern rejected. + + Example: + >>> w, h = 640, 480 + >>> stride = w * 4 + >>> data = bytearray(stride * h) + >>> dst = Buffer([data], "XRGB8888", w, h, [stride]) + >>> draw_pattern(dst, "smpte") + >>> draw_pattern(dst, "plain", params={"color": "ff0000"}) + """ + if pattern is None: + pattern = 'kmstest' + elif pattern not in _VALID_PATTERNS: + raise ValueError( + f'unknown pattern {pattern!r}; expected one of {sorted(_VALID_PATTERNS)} or None' + ) + if num_threads < 0: + raise ValueError(f'num_threads must be >= 0, got {num_threads}') + + params_bytes = _serialize_pattern_params(params) + + c_buf = _Buffer() + opts = _PatternOpts() + opts.rec = int(rec) + opts.range = int(color_range) + opts.num_threads = num_threads + opts.params = params_bytes + + with _PinnedBuffers() as pins: + _fill_buffer( + c_buf, + pins, + dst.planes, + dst.fmt, + dst.width, + dst.height, + dst.strides, + writable=True, + ) + rc = _lib.pixpat_draw_pattern( + ctypes.byref(c_buf), pattern.encode('ascii'), ctypes.byref(opts) + ) + + if rc != 0: + raise PixpatError(f'pixpat_draw_pattern failed (rc={rc}); check format name and dimensions') + + +def convert( + dst: Buffer, + src: Buffer, + *, + rec: Rec = Rec.BT601, + color_range: Range = Range.LIMITED, + num_threads: int = 0, +) -> None: + """Convert pixel data from ``src`` into ``dst``. + + Both buffers must describe an image of the same width and height; + only the pixel format may differ. Any format accepted by + :func:`supported_formats` works as both source and destination in + the default build (custom build profiles can mark individual formats + read-only or write-only). Conversion is routed internally through a + 16-bit normalized RGB or YUV intermediate, so format-to-format + conversions in either direction (e.g. ``NV12`` -> ``YUV420``, + ``XRGB8888`` -> ``NV12``, ``SRGGB10`` -> ``BGR888``) are a single + call. Bayer sources are decoded with a 3x3 bilinear demosaic. + + ``src.planes`` may hold read-only buffers (e.g. ``bytes``, mmap'd + files, numpy arrays with ``writeable=False``); ``dst.planes`` must + be writable. + + Args: + dst: Destination buffer. + src: Source buffer. + rec: YCbCr matrix used when the conversion crosses the RGB/YUV + boundary. Ignored when ``src.fmt`` and ``dst.fmt`` share + the same color kind. Defaults to :attr:`Rec.BT601`. + color_range: Quantization range used when the conversion crosses + the RGB/YUV boundary. Ignored when ``src.fmt`` and + ``dst.fmt`` share the same color kind. Defaults to + :attr:`Range.LIMITED`. + num_threads: Worker-thread count. ``0`` selects a sensible + default (one per online CPU, capped to a sane maximum); + ``1`` runs single-threaded with no thread-spawn overhead; + ``N > 1`` uses exactly ``N`` workers. Defaults to ``0``. + Output is bit-identical regardless of the chosen count. + + Raises: + ValueError: ``dst`` and ``src`` have mismatched dimensions, a + plane sequence exceeds the maximum plane count, or a + strides length does not match its planes length. + TypeError: A ``dst`` plane is read-only. + PixpatError: The underlying C call failed — typically an + unknown format name, a format disabled in the current build + (or disabled in the requested direction), or dimensions / + strides incompatible with one of the formats. + + Example: + Cross-color-kind, multi-plane on both sides — paint an NV12 + source via :func:`draw_pattern`, then convert it to planar + YUV420: + + >>> w, h = 64, 32 + >>> y_src = bytearray(w * h) + >>> uv_src = bytearray(w * h // 2) + >>> draw_pattern(Buffer([y_src, uv_src], "NV12", w, h, [w, w]), + ... "smpte") + >>> y_dst = bytearray(w * h) + >>> u_dst = bytearray(w * h // 4) + >>> v_dst = bytearray(w * h // 4) + >>> convert( + ... Buffer([y_dst, u_dst, v_dst], "YUV420", w, h, + ... [w, w // 2, w // 2]), + ... Buffer([y_src, uv_src], "NV12", w, h, [w, w]), + ... ) + """ + if dst.width != src.width or dst.height != src.height: + raise ValueError( + f'dst dimensions {dst.width}x{dst.height} do not match ' + f'src dimensions {src.width}x{src.height}' + ) + if num_threads < 0: + raise ValueError(f'num_threads must be >= 0, got {num_threads}') + + c_dst = _Buffer() + c_src = _Buffer() + opts = _ConvertOpts() + opts.rec = int(rec) + opts.range = int(color_range) + opts.num_threads = num_threads + + with _PinnedBuffers() as pins: + _fill_buffer( + c_dst, + pins, + dst.planes, + dst.fmt, + dst.width, + dst.height, + dst.strides, + writable=True, + role='dst', + ) + _fill_buffer( + c_src, + pins, + src.planes, + src.fmt, + src.width, + src.height, + src.strides, + writable=False, + role='src', + ) + rc = _lib.pixpat_convert(ctypes.byref(c_dst), ctypes.byref(c_src), ctypes.byref(opts)) + + if rc != 0: + raise PixpatError( + f'pixpat_convert failed (rc={rc}); check format names, dimensions, ' + f'and that src.fmt is a supported source format' + ) + + +__all__ = [ + 'Buffer', + 'Rec', + 'Range', + 'PixpatError', + 'supported_formats', + 'is_supported', + 'draw_pattern', + 'convert', +] diff --git a/pixpat-python/pixpat/_lib/.gitkeep b/pixpat-python/pixpat/_lib/.gitkeep new file mode 100644 index 0000000..7319712 --- /dev/null +++ b/pixpat-python/pixpat/_lib/.gitkeep @@ -0,0 +1,3 @@ +# Placeholder so the _lib/ directory exists in source control. +# `pip install .` and `pip install -e .` populate libpixpat.so.0 here +# (copy and symlink respectively); see setup.py. diff --git a/pixpat-python/pixpat/_native.py b/pixpat-python/pixpat/_native.py new file mode 100644 index 0000000..46fe452 --- /dev/null +++ b/pixpat-python/pixpat/_native.py @@ -0,0 +1,203 @@ +"""ctypes plumbing for libpixpat. + +Private module — public API lives in :mod:`pixpat`. Loads the bundled +shared library, declares the C structs and function signatures, and +provides :func:`_fill_buffer` to translate Python plane sequences into +the C ``pixpat_buffer`` layout while pinning the underlying memory via +the buffer protocol. + +The buffer-protocol path uses ``PyObject_GetBuffer`` / ``PyBuffer_Release`` +directly (rather than ctypes ``from_buffer``) so that read-only inputs +work for the ``src`` side of :func:`pixpat.convert`. ``from_buffer`` +unconditionally requires a writable buffer. +""" + +import ctypes +import os +import pathlib +from typing import Sequence + +_PIXPAT_MAX_PLANES = 4 + +_lib_override = os.environ.get('PIXPAT_LIB') +if _lib_override: + if not pathlib.Path(_lib_override).exists(): + raise ImportError(f'pixpat: PIXPAT_LIB={_lib_override} does not exist') + _lib = ctypes.CDLL(_lib_override) +else: + _lib_dir = pathlib.Path(__file__).parent / '_lib' + _so_candidates = sorted(_lib_dir.glob('libpixpat.so*')) + if not _so_candidates: + raise ImportError(f'pixpat: no libpixpat.so* found in {_lib_dir}') + _lib = ctypes.CDLL(str(_so_candidates[0])) + + +class _Buffer(ctypes.Structure): + _fields_ = [ + ('format', ctypes.c_char_p), + ('width', ctypes.c_uint32), + ('height', ctypes.c_uint32), + ('num_planes', ctypes.c_uint32), + ('planes', ctypes.c_void_p * _PIXPAT_MAX_PLANES), + ('strides', ctypes.c_uint32 * _PIXPAT_MAX_PLANES), + ] + + +class _PatternOpts(ctypes.Structure): + _fields_ = [ + ('rec', ctypes.c_int), + ('range', ctypes.c_int), + ('num_threads', ctypes.c_int), + ('params', ctypes.c_char_p), + ] + + +class _ConvertOpts(ctypes.Structure): + _fields_ = [ + ('rec', ctypes.c_int), + ('range', ctypes.c_int), + ('num_threads', ctypes.c_int), + ] + + +_lib.pixpat_draw_pattern.argtypes = [ + ctypes.POINTER(_Buffer), + ctypes.c_char_p, + ctypes.POINTER(_PatternOpts), +] +_lib.pixpat_draw_pattern.restype = ctypes.c_int + +_lib.pixpat_convert.argtypes = [ + ctypes.POINTER(_Buffer), + ctypes.POINTER(_Buffer), + ctypes.POINTER(_ConvertOpts), +] +_lib.pixpat_convert.restype = ctypes.c_int + +_lib.pixpat_format_supported.argtypes = [ctypes.c_char_p] +_lib.pixpat_format_supported.restype = ctypes.c_int + +_lib.pixpat_format_count.argtypes = [] +_lib.pixpat_format_count.restype = ctypes.c_size_t + +_lib.pixpat_format_name.argtypes = [ctypes.c_size_t] +_lib.pixpat_format_name.restype = ctypes.c_char_p + + +class _Py_buffer(ctypes.Structure): + _fields_ = [ + ('buf', ctypes.c_void_p), + ('obj', ctypes.py_object), + ('len', ctypes.c_ssize_t), + ('itemsize', ctypes.c_ssize_t), + ('readonly', ctypes.c_int), + ('ndim', ctypes.c_int), + ('format', ctypes.c_char_p), + ('shape', ctypes.POINTER(ctypes.c_ssize_t)), + ('strides', ctypes.POINTER(ctypes.c_ssize_t)), + ('suboffsets', ctypes.POINTER(ctypes.c_ssize_t)), + ('internal', ctypes.c_void_p), + ] + + +_PyObject_GetBuffer = ctypes.pythonapi.PyObject_GetBuffer +_PyObject_GetBuffer.argtypes = [ + ctypes.py_object, + ctypes.POINTER(_Py_buffer), + ctypes.c_int, +] +_PyObject_GetBuffer.restype = ctypes.c_int + +_PyBuffer_Release = ctypes.pythonapi.PyBuffer_Release +_PyBuffer_Release.argtypes = [ctypes.POINTER(_Py_buffer)] +_PyBuffer_Release.restype = None + +_PyBUF_SIMPLE = 0 +_PyBUF_WRITABLE = 0x0001 + + +class _PinnedBuffers: + """Holds Py_buffer views for the lifetime of a pixpat call. + + Releases each view in ``__exit__`` (or when garbage-collected) so + the underlying objects' buffer-export count drops back to zero. + """ + + def __init__(self) -> None: + self._views: list[_Py_buffer] = [] + + def acquire(self, obj, *, writable: bool) -> int: + view = _Py_buffer() + flags = _PyBUF_WRITABLE if writable else _PyBUF_SIMPLE + # ctypes.pythonapi propagates the set exception (BufferError / + # TypeError) automatically on failure, so no rc check is needed. + _PyObject_GetBuffer(obj, ctypes.byref(view), flags) + self._views.append(view) + return view.buf or 0 + + def release(self) -> None: + while self._views: + view = self._views.pop() + _PyBuffer_Release(ctypes.byref(view)) + + def __enter__(self) -> '_PinnedBuffers': + return self + + def __exit__(self, exc_type, exc, tb) -> None: + self.release() + + def __del__(self) -> None: + self.release() + + +def _fill_buffer( + buf: _Buffer, + pins: _PinnedBuffers, + planes: Sequence, + fmt: str, + width: int, + height: int, + strides: Sequence[int], + *, + writable: bool, + role: str = '', +) -> None: + """Populate ``buf`` from a Python plane sequence, pinning each plane. + + ``writable`` selects whether each plane must be writable: True for + destination buffers (the C library writes into them), False for + source buffers (the C library only reads). + """ + label = f'{role} ' if role else '' + if len(planes) > _PIXPAT_MAX_PLANES: + raise ValueError(f'too many {label}planes: {len(planes)} (max {_PIXPAT_MAX_PLANES})') + if len(strides) != len(planes): + raise ValueError(f'{label}strides has {len(strides)} entries, expected {len(planes)}') + + plane_ptrs = (ctypes.c_void_p * _PIXPAT_MAX_PLANES)() + stride_arr = (ctypes.c_uint32 * _PIXPAT_MAX_PLANES)() + for i, plane in enumerate(planes): + try: + addr = pins.acquire(plane, writable=writable) + except BufferError as e: + kind = 'writable' if writable else 'buffer-protocol' + raise TypeError(f'{label}plane {i} does not support the {kind} interface: {e}') from e + plane_ptrs[i] = addr + stride_arr[i] = strides[i] + + buf.format = fmt.encode('ascii') + buf.width = width + buf.height = height + buf.num_planes = len(planes) + buf.planes = plane_ptrs + buf.strides = stride_arr + + +__all__ = [ + '_Buffer', + '_PatternOpts', + '_ConvertOpts', + '_PinnedBuffers', + '_lib', + '_fill_buffer', +] diff --git a/pixpat-python/scripts/build_wheel.sh b/pixpat-python/scripts/build_wheel.sh new file mode 100755 index 0000000..1eb2586 --- /dev/null +++ b/pixpat-python/scripts/build_wheel.sh @@ -0,0 +1,39 @@ +#!/usr/bin/env bash +# Build a pixpat wheel for the given target architecture. +# +# Usage: pixpat-python/scripts/build_wheel.sh <x86_64|aarch64> +# +# Meson is invoked from setup.py during the wheel build; this script just +# selects the target arch and hands off to `python -m build`. The meson +# build dir lands at pixpat-python/build-<arch>/native/, so cross-compiling +# both arches in turn keeps each arch's compile incremental. +# +# Prerequisites: +# - meson, ninja in PATH (e.g. `pip install meson ninja`) +# - For aarch64: gcc-aarch64-linux-gnu, g++-aarch64-linux-gnu +# (sudo apt install gcc-aarch64-linux-gnu g++-aarch64-linux-gnu) +# - Python with `build` package: pip install build + +set -euo pipefail + +ARCH="${1:-}" +case "$ARCH" in + x86_64|aarch64) ;; + *) + echo "usage: $0 <x86_64|aarch64>" >&2 + exit 1 + ;; +esac + +REPO_ROOT="$(cd "$(dirname "$0")/../.." && pwd)" +cd "$REPO_ROOT" + +# --no-isolation: reuse the host environment so meson's per-arch build dir +# (pixpat-python/build-<arch>/native/) survives across invocations and stays +# incremental. Build isolation copies the source to a temp dir, so the meson +# build dir would be thrown away each run. +PIXPAT_TARGET_ARCH="$ARCH" python -m build --wheel --no-isolation + +echo +echo "Wheel(s) in dist/:" +ls -1 dist/*.whl diff --git a/pixpat-python/scripts/perf_test.py b/pixpat-python/scripts/perf_test.py new file mode 100755 index 0000000..100c16e --- /dev/null +++ b/pixpat-python/scripts/perf_test.py @@ -0,0 +1,434 @@ +#!/usr/bin/env python3 +"""Micro-benchmark pixpat across draw_pattern and convert call paths. + +Each row times one ``<source> -> <destination>`` operation: + +* a pattern draw, where ``source`` is a pattern name (``smpte``, + ``kmstest`` etc.) and the row times ``pixpat.draw_pattern``; +* a format conversion, where both sides are format names and the row + times ``pixpat.convert``. + +Buffer allocation is driven by ``pixutils.formats.PixelFormats`` — +``framesize`` / ``planesize`` / ``stride`` give us per-plane geometry +without per-format hand-coding. + +Filtering is uniform across both kinds because the case name always +has the form ``"<lhs> -> <rhs>"``: ``--ionly kmstest`` keeps that +pattern's cases, ``--ionly NV12`` keeps NV12-source convert cases, +``--oonly BGR888`` keeps any case (pattern or convert) writing +BGR888, and so on. + +Usage: + python perf_test.py [--width 1920] [--height 1080] [--iters 200] + [--warmup 10] [--pp-threads 1] + [--rec BT709] [--range LIMITED] + [--only-pattern | --only-convert] + [--only STR] [--ionly STR] [--oonly STR] + [--cases STR] [--tsv] +""" + +from __future__ import annotations + +import argparse +import gc +import math +import sys +import time +from dataclasses import dataclass +from typing import Callable, Optional + +import numpy as np + +import pixpat +from pixutils.formats import PixelFormat, PixelFormats + + +PATTERNS = ['kmstest', 'smpte', 'plain'] + +PATTERN_SINK_FMTS = [ + 'XRGB8888', + 'BGR888', + 'RGB565', + 'RGBA1010102', + 'ABGR16161616', + 'Y8', + 'YUYV', + 'UYVY', + 'NV12', + 'NV16', + 'YUV420', + 'SRGGB8', + 'SRGGB10P', +] + +CONVERT_PAIRS: list[tuple[str, str]] = [ + # RGB shuffles + ('RGB888', 'BGR888'), + ('BGR888', 'BGRA8888'), + ('BGRA8888', 'BGR888'), + # Grayscale + ('BGR888', 'Y8'), + ('Y8', 'BGR888'), + # Packed YUV decode + ('YUYV', 'BGR888'), + ('UYVY', 'BGR888'), + # Semiplanar YUV decode + ('NV12', 'BGR888'), + ('NV21', 'BGR888'), + # Planar YUV decode + ('YUV420', 'BGR888'), + ('YVU420', 'BGR888'), + # Planar YUV encode + ('BGR888', 'YUV420'), + ('BGR888', 'YVU420'), + # NV12 encode/decode against XRGB8888 + ('XRGB8888', 'NV12'), + ('BGR888', 'NV12'), + ('NV12', 'XRGB8888'), + # Cold path: neither side is BGR888 + ('XRGB8888', 'RGBA8888'), + ('RGBA8888', 'XRGB8888'), + ('NV12', 'YUV420'), + ('YUV420', 'NV12'), + ('NV12', 'YUYV'), + ('NV12', 'RGB565'), + ('Y8', 'NV12'), + # Wider coverage + ('BGR888', 'YUYV'), + ('BGR888', 'NV16'), + ('NV16', 'BGR888'), + ('BGR888', 'YUV422'), + ('YUV422', 'BGR888'), + ('BGR888', 'YUV444'), + ('YUV444', 'BGR888'), + # 16-bit packed RGB + ('BGR888', 'RGB565'), + ('RGB565', 'BGR888'), + # 10-bit packed RGB + ('BGR888', 'RGBA1010102'), + ('RGBA1010102', 'BGR888'), + # 64-bit packed RGB (pixpat's normalized wide form) + ('BGR888', 'ABGR16161616'), + ('ABGR16161616', 'BGR888'), + # Multi-pixel-per-word semiplanar YUV + ('BGR888', 'P030'), + ('P030', 'BGR888'), + ('BGR888', 'P230'), + # Multi-pixel-per-word planar YUV + ('BGR888', 'T430'), + ('T430', 'BGR888'), + # Multi-pixel-per-word grayscale + ('BGR888', 'XYYY2101010'), + # Single-pixel-per-word YUV + ('BGR888', 'XVUY2101010'), + ('BGR888', 'AVUY16161616'), + # Bayer + ('BGR888', 'SRGGB8'), + ('SRGGB8', 'BGR888'), + ('BGR888', 'SRGGB10P'), + ('SRGGB10P', 'BGR888'), + ('BGR888', 'SRGGB12P'), + ('SRGGB12P', 'BGR888'), + # Single-channel RGB + ('BGR888', 'R8'), + ('R8', 'BGR888'), + # MIPI CSI-2 packed grayscale + ('BGR888', 'Y10P'), + ('Y10P', 'BGR888'), + # 4:2:2 packed YUV in 64-bit words + ('BGR888', 'Y210'), + ('Y210', 'BGR888'), +] + + +_PF_BY_NAME = {v.name: v for v in PixelFormats.__dict__.values() if isinstance(v, PixelFormat)} + + +def _required_align(pf: PixelFormat) -> tuple[int, int]: + # pf.pixel_align doesn't always capture the per-plane pixels_per_block / + # vsub requirements (e.g. T430 lists (1,1) but each plane is 3 px wide), + # so combine them to get the effective alignment we need to skip cleanly. + w_align = pf.pixel_align[0] + h_align = pf.pixel_align[1] + for p in pf.planes: + w_align = math.lcm(w_align, p.pixels_per_block * p.hsub) + h_align = math.lcm(h_align, p.vsub) + return w_align, h_align + + +def _alloc_buffer(fmt_name: str, w: int, h: int) -> Optional[tuple[pixpat.Buffer, np.ndarray]]: + """Build a pixpat.Buffer + its 1-D backing array, or None when the + resolution doesn't fit the format's alignment.""" + pf = _PF_BY_NAME[fmt_name] + w_align, h_align = _required_align(pf) + if w % w_align or h % h_align: + return None + + backing = np.zeros(pf.framesize(w, h), dtype=np.uint8) + planes: list[np.ndarray] = [] + strides: list[int] = [] + offset = 0 + for i in range(len(pf.planes)): + s = pf.stride(w, i) + psize = pf.planesize(s, h, i) + view = backing[offset : offset + psize].reshape(psize // s, s) + planes.append(view) + strides.append(s) + offset += psize + return pixpat.Buffer(planes, fmt_name, w, h, strides), backing + + +@dataclass +class Case: + name: str + kind: str # 'pattern' or 'convert' + dst: pixpat.Buffer + src: Optional[pixpat.Buffer] = None + src_arr: Optional[np.ndarray] = None # 1-D backing for random fill + + +def _build_cases(w: int, h: int, kinds: set[str]) -> list[Case]: + cases: list[Case] = [] + + if 'pattern' in kinds: + for fmt in PATTERN_SINK_FMTS: + alloc = _alloc_buffer(fmt, w, h) + if alloc is None: + continue + buf, _ = alloc + for pat in PATTERNS: + cases.append(Case(name=f'{pat} -> {fmt}', kind='pattern', dst=buf)) + + if 'convert' in kinds: + for src_fmt, dst_fmt in CONVERT_PAIRS: + src_alloc = _alloc_buffer(src_fmt, w, h) + dst_alloc = _alloc_buffer(dst_fmt, w, h) + if src_alloc is None or dst_alloc is None: + continue + src_buf, src_backing = src_alloc + dst_buf, _ = dst_alloc + cases.append( + Case( + name=f'{src_fmt} -> {dst_fmt}', + kind='convert', + dst=dst_buf, + src=src_buf, + src_arr=src_backing, + ) + ) + + return cases + + +def _parse_filter(s: Optional[str]) -> Optional[set[str]]: + if s is None: + return None + return {x.strip().upper() for x in s.split(',') if x.strip()} + + +def _norm_case(name: str) -> str: + return ' '.join(name.upper().split()) + + +def _filter_cases( + cases: list[Case], + only: Optional[str], + ionly: Optional[str], + oonly: Optional[str], + cases_filter: Optional[str], +) -> list[Case]: + only_set = _parse_filter(only) + ionly_set = _parse_filter(ionly) + oonly_set = _parse_filter(oonly) + cases_set: Optional[set[str]] = None + if cases_filter is not None: + cases_set = {_norm_case(x) for x in cases_filter.split(',') if x.strip()} + if only_set is None and ionly_set is None and oonly_set is None and cases_set is None: + return cases + + out: list[Case] = [] + for c in cases: + lhs, _, rhs = c.name.partition(' -> ') + lhs, rhs = lhs.upper(), rhs.upper() + if cases_set is not None and _norm_case(c.name) not in cases_set: + continue + if only_set is not None and lhs not in only_set and rhs not in only_set: + continue + if ionly_set is not None and lhs not in ionly_set: + continue + if oonly_set is not None and rhs not in oonly_set: + continue + out.append(c) + return out + + +def _bind( + case: Case, + num_threads: int, + rec: 'pixpat.Rec', + color_range: 'pixpat.Range', +) -> Callable[[], object]: + if case.kind == 'pattern': + fn = pixpat.draw_pattern + dst = case.dst + pat = case.name.partition(' -> ')[0] + # `plain` needs an explicit color; the rest ignore params. + params = {'color': 'ff0000'} if pat == 'plain' else None + return lambda: fn( + dst, + pat, + rec=rec, + color_range=color_range, + num_threads=num_threads, + params=params, + ) + fn = pixpat.convert + assert case.src is not None + src, dst = case.src, case.dst + return lambda: fn(dst, src, rec=rec, color_range=color_range, num_threads=num_threads) + + +def _time_n(fn: Callable[[], object], iters: int) -> float: + """Return min seconds over ``iters`` calls.""" + best = float('inf') + for _ in range(iters): + t0 = time.perf_counter_ns() + fn() + dt = time.perf_counter_ns() - t0 + if dt < best: + best = dt + return best * 1e-9 + + +def main() -> int: + p = argparse.ArgumentParser( + description='Micro-benchmark pixpat across draw_pattern and convert.' + ) + p.add_argument('--width', type=int, default=1920) + p.add_argument('--height', type=int, default=1080) + p.add_argument('--iters', type=int, default=200) + p.add_argument('--warmup', type=int, default=10) + p.add_argument( + '--pp-threads', + type=int, + default=1, + help='pixpat thread count; 0 = sensible default', + ) + p.add_argument( + '--rec', + default='BT709', + choices=[r.name for r in pixpat.Rec], + help='YCbCr matrix for YUV cases (default BT709)', + ) + p.add_argument( + '--range', + dest='color_range', + default='LIMITED', + choices=[r.name for r in pixpat.Range], + help='Quantization range for YUV cases (default LIMITED)', + ) + kind_group = p.add_mutually_exclusive_group() + kind_group.add_argument( + '--only-pattern', + action='store_true', + help='restrict to pattern-draw cases', + ) + kind_group.add_argument( + '--only-convert', + action='store_true', + help='restrict to convert cases', + ) + p.add_argument( + '--only', + default=None, + help='comma-separated names; keep cases whose lhs OR rhs of " -> " matches', + ) + p.add_argument( + '--ionly', + default=None, + help='comma-separated names; keep cases whose lhs (pattern or src fmt) matches', + ) + p.add_argument( + '--oonly', + default=None, + help='comma-separated names; keep cases whose rhs (dst fmt) matches', + ) + p.add_argument( + '--cases', + default=None, + help='comma-separated full case names (e.g. "smpte -> BGR888,RGB888 -> BGR888")', + ) + p.add_argument( + '--tsv', + action='store_true', + help='emit tab-separated rows on stdout; meta and warnings go to stderr', + ) + args = p.parse_args() + + w, h = args.width, args.height + mp = (w * h) / 1e6 + + rec = pixpat.Rec[args.rec] + color_range = pixpat.Range[args.color_range] + + kinds: set[str] = {'pattern', 'convert'} + if args.only_pattern: + kinds = {'pattern'} + elif args.only_convert: + kinds = {'convert'} + + cases = _build_cases(w, h, kinds) + cases = _filter_cases(cases, args.only, args.ionly, args.oonly, args.cases) + if not cases: + print('no cases matched filter') + return 1 + + info: Callable[[str], None] = (lambda m: print(m, file=sys.stderr)) if args.tsv else print + + info( + f'Resolution: {w}x{h} ({mp:.2f} MP/frame), ' + f'iters={args.iters}, warmup={args.warmup}, ' + f'pp threads={args.pp_threads}, ' + f'rec={rec.name}, range={color_range.name}' + ) + + name_w = max((len(c.name) for c in cases), default=20) + 2 + + if args.tsv: + print('\t'.join(['case', 'pp_mps', 'pp_fps'])) + else: + print() + header = f'{"case":<{name_w}} {"pp MP/s":>9} {"pp fps":>8}' + print(header) + print('-' * len(header)) + + rng = np.random.default_rng(0) + + for case in cases: + if case.kind == 'convert' and case.src_arr is not None: + case.src_arr[...] = rng.integers(0, 256, size=case.src_arr.shape, dtype=np.uint8) + + run = _bind(case, args.pp_threads, rec=rec, color_range=color_range) + gc.disable() + try: + for _ in range(args.warmup): + run() + t = _time_n(run, args.iters) + except (pixpat.PixpatError, ValueError, TypeError) as e: + info(f'{case.name:<{name_w}} skipped: {type(e).__name__}: {e}') + continue + finally: + gc.enable() + + fps = 1.0 / t + mps = fps * mp + if args.tsv: + print(f'{case.name}\t{mps:.0f}\t{fps:.0f}') + else: + print(f'{case.name:<{name_w}} {mps:>9.0f} {fps:>8.0f}') + + return 0 + + +if __name__ == '__main__': + raise SystemExit(main()) diff --git a/pixpat-python/tests/test_basic.py b/pixpat-python/tests/test_basic.py new file mode 100644 index 0000000..2ed8bb7 --- /dev/null +++ b/pixpat-python/tests/test_basic.py @@ -0,0 +1,539 @@ +"""Smoke tests for the pixpat Python bindings.""" + +import pytest + +import pixpat + + +def test_supported_formats_nonempty(): + formats = pixpat.supported_formats() + assert isinstance(formats, list) + assert len(formats) > 0 + assert all(isinstance(f, str) for f in formats) + assert 'XRGB8888' in formats + assert 'NV12' in formats + + +def test_is_supported(): + assert pixpat.is_supported('XRGB8888') + assert pixpat.is_supported('NV12') + assert not pixpat.is_supported('NOT_A_REAL_FORMAT') + + +def test_draw_pattern_xrgb8888(): + w, h = 64, 32 + stride = w * 4 + data = bytearray(stride * h) + buf = pixpat.Buffer([data], 'XRGB8888', w, h, [stride]) + pixpat.draw_pattern(buf, 'smpte') + assert any(b != 0 for b in data) + + +def test_draw_pattern_unknown_pattern_raises(): + data = bytearray(64 * 32 * 4) + buf = pixpat.Buffer([data], 'XRGB8888', 64, 32, [64 * 4]) + with pytest.raises(ValueError): + pixpat.draw_pattern(buf, 'bogus') + + +def test_draw_pattern_unknown_format_raises(): + data = bytearray(64 * 32 * 4) + buf = pixpat.Buffer([data], 'NOT_A_REAL_FORMAT', 64, 32, [64 * 4]) + with pytest.raises(pixpat.PixpatError): + pixpat.draw_pattern(buf) + + +def test_draw_pattern_readonly_buffer_raises(): + data = bytes(64 * 32 * 4) + buf = pixpat.Buffer([data], 'XRGB8888', 64, 32, [64 * 4]) + with pytest.raises((TypeError, BufferError)): + pixpat.draw_pattern(buf) + + +def _alloc_xrgb(w, h): + stride = w * 4 + return bytearray(stride * h), stride + + +def _first_pixel_bgr(data): + # BGR888-style byte order: pixpat XRGB8888 stores B at byte 0, + # G at byte 1, R at byte 2, X at byte 3. See + # README "Format names and byte order". + return data[0], data[1], data[2] + + +def test_draw_pattern_plain_red_str_params(): + w, h = 32, 16 + data, stride = _alloc_xrgb(w, h) + buf = pixpat.Buffer([data], 'XRGB8888', w, h, [stride]) + pixpat.draw_pattern(buf, 'plain', params='color=ff0000') + b, g, r = _first_pixel_bgr(data) + assert (b, g, r) == (0, 0, 0xFF) + + +def test_draw_pattern_plain_red_dict_params(): + w, h = 32, 16 + data, stride = _alloc_xrgb(w, h) + buf = pixpat.Buffer([data], 'XRGB8888', w, h, [stride]) + pixpat.draw_pattern(buf, 'plain', params={'color': 'ff0000'}) + b, g, r = _first_pixel_bgr(data) + assert (b, g, r) == (0, 0, 0xFF) + + +def test_draw_pattern_plain_alpha_passes_through_to_argb(): + w, h = 16, 8 + stride = w * 4 + data = bytearray(stride * h) + buf = pixpat.Buffer([data], 'ARGB8888', w, h, [stride]) + # 8-char form is alpha-first: AARRGGBB. + pixpat.draw_pattern(buf, 'plain', params={'color': '80112233'}) + # ARGB8888 byte order (LSB-first): B, G, R, A + assert (data[0], data[1], data[2], data[3]) == (0x33, 0x22, 0x11, 0x80) + + +def test_draw_pattern_plain_accepts_0x_prefix(): + w, h = 16, 8 + stride = w * 4 + data = bytearray(stride * h) + buf = pixpat.Buffer([data], 'XRGB8888', w, h, [stride]) + pixpat.draw_pattern(buf, 'plain', params={'color': '0xff0000'}) + assert (data[0], data[1], data[2]) == (0, 0, 0xFF) + + +def test_draw_pattern_plain_16bpc_rgb(): + """16-bpc form: 12 hex chars, RRRRGGGGBBBB. ABGR16161616 names + A-B-G-R MSB-first in a 64-bit word; stored little-endian, the + bytes are R lo, R hi, G lo, G hi, B lo, B hi, A lo, A hi.""" + w, h = 4, 2 + stride = w * 8 + data = bytearray(stride * h) + buf = pixpat.Buffer([data], 'ABGR16161616', w, h, [stride]) + pixpat.draw_pattern(buf, 'plain', params={'color': '0xfedc00000000'}) + # R=0xFEDC, G=0x0000, B=0x0000, A=0xFFFF (default opaque). + pix = bytes(data[:8]) + assert pix == b'\xdc\xfe\x00\x00\x00\x00\xff\xff' + + +def test_draw_pattern_plain_16bpc_argb(): + w, h = 4, 2 + stride = w * 8 + data = bytearray(stride * h) + buf = pixpat.Buffer([data], 'ABGR16161616', w, h, [stride]) + # 16-char form is alpha-first: AAAARRRRGGGGBBBB. + pixpat.draw_pattern(buf, 'plain', params={'color': '0x80007f000000ffff'}) + pix = bytes(data[:8]) + # R=0x7F00, G=0x0000, B=0xFFFF, A=0x8000. + assert pix == b'\x00\x7f\x00\x00\xff\xff\x00\x80' + + +def test_draw_pattern_plain_missing_color_raises(): + w, h = 32, 16 + data, stride = _alloc_xrgb(w, h) + buf = pixpat.Buffer([data], 'XRGB8888', w, h, [stride]) + with pytest.raises(pixpat.PixpatError): + pixpat.draw_pattern(buf, 'plain') + with pytest.raises(pixpat.PixpatError): + pixpat.draw_pattern(buf, 'plain', params='') + + +def test_draw_pattern_plain_malformed_color_raises(): + w, h = 32, 16 + data, stride = _alloc_xrgb(w, h) + buf = pixpat.Buffer([data], 'XRGB8888', w, h, [stride]) + for bad in ('color=zzzzzz', 'color=abc', 'color=', 'color=1234567'): + with pytest.raises(pixpat.PixpatError): + pixpat.draw_pattern(buf, 'plain', params=bad) + + +def test_draw_pattern_malformed_params_string_raises(): + """Top-level params parse failure (not pattern-specific).""" + w, h = 32, 16 + data, stride = _alloc_xrgb(w, h) + buf = pixpat.Buffer([data], 'XRGB8888', w, h, [stride]) + for bad in ('foo,bar', '=ff0000', ',color=ff0000'): + with pytest.raises(pixpat.PixpatError): + pixpat.draw_pattern(buf, 'plain', params=bad) + + +def test_draw_pattern_dict_params_with_separators_rejected_by_python(): + w, h = 32, 16 + data, stride = _alloc_xrgb(w, h) + buf = pixpat.Buffer([data], 'XRGB8888', w, h, [stride]) + with pytest.raises(ValueError): + pixpat.draw_pattern(buf, 'plain', params={'color': 'ff,00,00'}) + with pytest.raises(ValueError): + pixpat.draw_pattern(buf, 'plain', params={'col=or': 'ff0000'}) + + +def test_draw_pattern_unknown_params_keys_ignored(): + """Patterns silently ignore keys they don't read; unknown keys + alongside the recognised one still let the call succeed.""" + w, h = 32, 16 + data, stride = _alloc_xrgb(w, h) + buf = pixpat.Buffer([data], 'XRGB8888', w, h, [stride]) + pixpat.draw_pattern(buf, 'plain', params={'color': 'ff0000', 'unknown_key': '42'}) + b, g, r = _first_pixel_bgr(data) + assert (b, g, r) == (0, 0, 0xFF) + + +def test_draw_pattern_params_ignored_by_kmstest(): + """Patterns that don't read params accept arbitrary params strings.""" + w, h = 32, 16 + data, stride = _alloc_xrgb(w, h) + buf = pixpat.Buffer([data], 'XRGB8888', w, h, [stride]) + pixpat.draw_pattern(buf, 'kmstest', params={'whatever': 'foo'}) + assert any(b != 0 for b in data) + + +def _xrgb_pixel_at(data, stride, x, y): + off = y * stride + x * 4 + return data[off], data[off + 1], data[off + 2] # B, G, R + + +@pytest.mark.parametrize( + 'pattern,extra', + [ + ('hramp', {}), + ('vramp', {}), + ('dramp', {}), + ('zoneplate', {}), + ('checker', {}), + ('checker', {'params': 'cell=1'}), + ('checker', {'params': {'cell': '32'}}), + ], +) +def test_draw_pattern_phase3_smoke(pattern, extra): + w, h = 64, 32 + data, stride = _alloc_xrgb(w, h) + buf = pixpat.Buffer([data], 'XRGB8888', w, h, [stride]) + pixpat.draw_pattern(buf, pattern, **extra) + assert any(b != 0 for b in data) + + +def test_draw_pattern_checker_default_cell_first_pixel_white(): + w, h = 64, 32 + data, stride = _alloc_xrgb(w, h) + buf = pixpat.Buffer([data], 'XRGB8888', w, h, [stride]) + pixpat.draw_pattern(buf, 'checker') + # (0,0) is in the first cell -> white. + assert _xrgb_pixel_at(data, stride, 0, 0) == (0xFF, 0xFF, 0xFF) + # Default cell=8: (8,0) is the next cell horizontally -> black. + assert _xrgb_pixel_at(data, stride, 8, 0) == (0, 0, 0) + + +def test_draw_pattern_checker_cell1_alternates(): + w, h = 16, 8 + data, stride = _alloc_xrgb(w, h) + buf = pixpat.Buffer([data], 'XRGB8888', w, h, [stride]) + pixpat.draw_pattern(buf, 'checker', params={'cell': '1'}) + p00 = _xrgb_pixel_at(data, stride, 0, 0) + p10 = _xrgb_pixel_at(data, stride, 1, 0) + p01 = _xrgb_pixel_at(data, stride, 0, 1) + p11 = _xrgb_pixel_at(data, stride, 1, 1) + assert p00 != p10 + assert p00 != p01 + assert p00 == p11 # diagonal repeats + + +@pytest.mark.parametrize('cell', ['0', '-1', 'oops', '1.5']) +def test_draw_pattern_checker_invalid_cell_raises(cell): + w, h = 16, 8 + data, stride = _alloc_xrgb(w, h) + buf = pixpat.Buffer([data], 'XRGB8888', w, h, [stride]) + with pytest.raises(pixpat.PixpatError): + pixpat.draw_pattern(buf, 'checker', params={'cell': cell}) + + +def test_draw_pattern_hramp_stripes(): + """hramp: 4 horizontal stripes (R, G, B, gray), each ramping along x. + The right edge (x=W-1) hits 0xFF in the active channel(s).""" + w, h = 32, 16 # h=16 → stripes at rows [0,3], [4,7], [8,11], [12,15] + data, stride = _alloc_xrgb(w, h) + buf = pixpat.Buffer([data], 'XRGB8888', w, h, [stride]) + pixpat.draw_pattern(buf, 'hramp') + assert _xrgb_pixel_at(data, stride, w - 1, 0) == (0, 0, 0xFF) # R + assert _xrgb_pixel_at(data, stride, w - 1, 4) == (0, 0xFF, 0) # G + assert _xrgb_pixel_at(data, stride, w - 1, 8) == (0xFF, 0, 0) # B + assert _xrgb_pixel_at(data, stride, w - 1, h - 1) == (0xFF, 0xFF, 0xFF) # gray + # Left edge is the 0 end of every ramp. + for y in (0, 4, 8, 12): + assert _xrgb_pixel_at(data, stride, 0, y) == (0, 0, 0) + + +def test_draw_pattern_vramp_columns(): + """vramp: 4 vertical columns (R, G, B, gray), each ramping along y.""" + w, h = 16, 32 + data, stride = _alloc_xrgb(w, h) + buf = pixpat.Buffer([data], 'XRGB8888', w, h, [stride]) + pixpat.draw_pattern(buf, 'vramp') + # Bottom row hits max in the active channel(s). + assert _xrgb_pixel_at(data, stride, 0, h - 1) == (0, 0, 0xFF) # R + assert _xrgb_pixel_at(data, stride, 4, h - 1) == (0, 0xFF, 0) # G + assert _xrgb_pixel_at(data, stride, 8, h - 1) == (0xFF, 0, 0) # B + assert _xrgb_pixel_at(data, stride, 12, h - 1) == (0xFF, 0xFF, 0xFF) # gray + # Top row is the 0 end. + for x in (0, 4, 8, 12): + assert _xrgb_pixel_at(data, stride, x, 0) == (0, 0, 0) + # Within a column, luma is monotonic. + prev = -1 + for y in range(h): + b, g, r = _xrgb_pixel_at(data, stride, 12, y) # gray column + assert b == g == r + assert b >= prev + prev = b + + +def test_draw_pattern_zoneplate_center_white(): + """Even sizes don't sample exactly at the center; pick odd dimensions + so (W//2, H//2) is the center pixel where cos(0)=1 → white.""" + w, h = 65, 33 + data, stride = _alloc_xrgb(w, h) + buf = pixpat.Buffer([data], 'XRGB8888', w, h, [stride]) + pixpat.draw_pattern(buf, 'zoneplate') + b, g, r = _xrgb_pixel_at(data, stride, w // 2, h // 2) + # Center pixel: phase=0, gray ≈ 1.0. + assert b == g == r and b >= 0xFE + + +def test_convert_roundtrip_xrgb8888(): + w, h = 64, 32 + src = bytearray(w * h * 4) + mid = bytearray(w * h * 8) + dst = bytearray(w * h * 4) + pixpat.draw_pattern(pixpat.Buffer([src], 'XRGB8888', w, h, [w * 4]), 'smpte') + pixpat.convert( + pixpat.Buffer([mid], 'ABGR16161616', w, h, [w * 8]), + pixpat.Buffer([src], 'XRGB8888', w, h, [w * 4]), + ) + pixpat.convert( + pixpat.Buffer([dst], 'XRGB8888', w, h, [w * 4]), + pixpat.Buffer([mid], 'ABGR16161616', w, h, [w * 8]), + ) + assert src == dst + + +def test_convert_readonly_bytes_src(): + """Source planes may be read-only (bytes); the C library only reads src.""" + w, h = 64, 32 + src_writable = bytearray(w * h * 4) + pixpat.draw_pattern(pixpat.Buffer([src_writable], 'XRGB8888', w, h, [w * 4]), 'smpte') + src_readonly = bytes(src_writable) + dst = bytearray(w * h * 8) + pixpat.convert( + pixpat.Buffer([dst], 'ABGR16161616', w, h, [w * 8]), + pixpat.Buffer([src_readonly], 'XRGB8888', w, h, [w * 4]), + ) + assert any(b != 0 for b in dst) + + +def test_convert_dimension_mismatch_raises(): + w, h = 64, 32 + src = bytearray(w * h * 4) + dst = bytearray((w * 2) * h * 8) + with pytest.raises(ValueError, match='dimensions'): + pixpat.convert( + pixpat.Buffer([dst], 'ABGR16161616', w * 2, h, [w * 2 * 8]), + pixpat.Buffer([src], 'XRGB8888', w, h, [w * 4]), + ) + + +def test_draw_pattern_nv12_semi_planar(): + """Semi-planar NV12: separate Y and interleaved-UV planes.""" + w, h = 64, 32 + y = bytearray(w * h) + uv = bytearray(w * (h // 2)) + pixpat.draw_pattern(pixpat.Buffer([y, uv], 'NV12', w, h, [w, w]), 'smpte') + assert any(b != 0 for b in y) + assert any(b != 0 for b in uv) + + +def test_draw_pattern_yuv420_three_planes(): + """Fully planar YUV420: Y, U and V all at separate plane pointers.""" + w, h = 64, 32 + yp = bytearray(w * h) + up = bytearray((w // 2) * (h // 2)) + vp = bytearray((w // 2) * (h // 2)) + pixpat.draw_pattern( + pixpat.Buffer([yp, up, vp], 'YUV420', w, h, [w, w // 2, w // 2]), + 'smpte', + ) + assert any(b != 0 for b in yp) + assert any(b != 0 for b in up) + assert any(b != 0 for b in vp) + + +def test_convert_xrgb_to_nv12_writes_both_planes(): + """Convert into a multi-plane destination must populate every plane.""" + w, h = 64, 32 + src = bytearray(w * h * 4) + pixpat.draw_pattern(pixpat.Buffer([src], 'XRGB8888', w, h, [w * 4]), 'smpte') + y = bytearray(w * h) + uv = bytearray(w * (h // 2)) + pixpat.convert( + pixpat.Buffer([y, uv], 'NV12', w, h, [w, w]), + pixpat.Buffer([src], 'XRGB8888', w, h, [w * 4]), + ) + assert any(b != 0 for b in y) + assert any(b != 0 for b in uv) + + +def test_convert_yuv420_src_three_planes(): + """Planar YUV is a supported convert source — the C library reads + all three plane pointers, so make sure the binding wires them up.""" + w, h = 64, 32 + yp = bytearray(w * h) + up = bytearray((w // 2) * (h // 2)) + vp = bytearray((w // 2) * (h // 2)) + pixpat.draw_pattern( + pixpat.Buffer([yp, up, vp], 'YUV420', w, h, [w, w // 2, w // 2]), + 'smpte', + ) + dst = bytearray(w * h * 4) + pixpat.convert( + pixpat.Buffer([dst], 'XRGB8888', w, h, [w * 4]), + pixpat.Buffer([yp, up, vp], 'YUV420', w, h, [w, w // 2, w // 2]), + ) + assert any(b != 0 for b in dst) + + +def test_convert_bayer_src(): + """Bayer formats are supported as a convert source (decoded by + nearest-neighbor demosaic per the public header docstring).""" + w, h = 64, 32 + src = bytearray(w * h) + pixpat.draw_pattern(pixpat.Buffer([src], 'SRGGB8', w, h, [w]), 'smpte') + dst = bytearray(w * h * 4) + pixpat.convert( + pixpat.Buffer([dst], 'XRGB8888', w, h, [w * 4]), + pixpat.Buffer([src], 'SRGGB8', w, h, [w]), + ) + assert any(b != 0 for b in dst) + + +def test_convert_roundtrip_yuyv(): + """YUYV (packed 4:2:2) — chroma is averaged on write and replicated on + read, so the second leg must not change the buffer.""" + w, h = 64, 32 + src = bytearray(w * h * 2) + mid = bytearray(w * h * 8) + dst = bytearray(w * h * 2) + pixpat.draw_pattern(pixpat.Buffer([src], 'YUYV', w, h, [w * 2]), 'smpte') + pixpat.convert( + pixpat.Buffer([mid], 'AVUY16161616', w, h, [w * 8]), + pixpat.Buffer([src], 'YUYV', w, h, [w * 2]), + ) + pixpat.convert( + pixpat.Buffer([dst], 'YUYV', w, h, [w * 2]), + pixpat.Buffer([mid], 'AVUY16161616', w, h, [w * 8]), + ) + assert src == dst + + +def test_convert_roundtrip_y8(): + """Y8 grayscale — chroma is discarded on write and synthesized on read, + Y values themselves should roundtrip exactly.""" + w, h = 64, 32 + src = bytearray(w * h) + mid = bytearray(w * h * 8) + dst = bytearray(w * h) + pixpat.draw_pattern(pixpat.Buffer([src], 'Y8', w, h, [w]), 'smpte') + pixpat.convert( + pixpat.Buffer([mid], 'AVUY16161616', w, h, [w * 8]), + pixpat.Buffer([src], 'Y8', w, h, [w]), + ) + pixpat.convert( + pixpat.Buffer([dst], 'Y8', w, h, [w]), + pixpat.Buffer([mid], 'AVUY16161616', w, h, [w * 8]), + ) + assert src == dst + + +def test_convert_roundtrip_nv12(): + """NV12 semi-planar 4:2:0 — chroma averaged into a 2x2 block on write, + replicated on read; second leg is a no-op.""" + w, h = 64, 32 + y_src = bytearray(w * h) + uv_src = bytearray(w * (h // 2)) + mid = bytearray(w * h * 8) + y_dst = bytearray(w * h) + uv_dst = bytearray(w * (h // 2)) + pixpat.draw_pattern(pixpat.Buffer([y_src, uv_src], 'NV12', w, h, [w, w]), 'smpte') + pixpat.convert( + pixpat.Buffer([mid], 'AVUY16161616', w, h, [w * 8]), + pixpat.Buffer([y_src, uv_src], 'NV12', w, h, [w, w]), + ) + pixpat.convert( + pixpat.Buffer([y_dst, uv_dst], 'NV12', w, h, [w, w]), + pixpat.Buffer([mid], 'AVUY16161616', w, h, [w * 8]), + ) + assert y_src == y_dst + assert uv_src == uv_dst + + +def test_convert_roundtrip_xv15(): + """P030 semi-planar 4:2:0, 10 bits packed 3-per-32-bit. Width and height + chosen so the Y plane fits 3 samples per word and chroma is on a 2x2 grid.""" + w, h = 96, 32 # multiples of 3 (Y group) and 2 (v_sub) + y_words = (w // 3) * h + uv_words = (w // 3 // 2) * (h // 2) + y_src = bytearray(y_words * 4) + uv_src = bytearray(uv_words * 8) + mid = bytearray(w * h * 8) + y_dst = bytearray(y_words * 4) + uv_dst = bytearray(uv_words * 8) + pixpat.draw_pattern( + pixpat.Buffer([y_src, uv_src], 'P030', w, h, [(w // 3) * 4, (w // 3 // 2) * 8]), + 'smpte', + ) + pixpat.convert( + pixpat.Buffer([mid], 'AVUY16161616', w, h, [w * 8]), + pixpat.Buffer([y_src, uv_src], 'P030', w, h, [(w // 3) * 4, (w // 3 // 2) * 8]), + ) + pixpat.convert( + pixpat.Buffer([y_dst, uv_dst], 'P030', w, h, [(w // 3) * 4, (w // 3 // 2) * 8]), + pixpat.Buffer([mid], 'AVUY16161616', w, h, [w * 8]), + ) + assert y_src == y_dst + assert uv_src == uv_dst + + +def test_convert_roundtrip_x403(): + """T430 fully-planar 4:4:4 with 3-samples-per-32-bit packing on each + plane; no chroma subsampling so first leg is already lossless.""" + w, h = 96, 32 # width must be a multiple of 3 + plane_words = (w // 3) * h + y_src = bytearray(plane_words * 4) + cb_src = bytearray(plane_words * 4) + cr_src = bytearray(plane_words * 4) + mid = bytearray(w * h * 8) + y_dst = bytearray(plane_words * 4) + cb_dst = bytearray(plane_words * 4) + cr_dst = bytearray(plane_words * 4) + strides = [(w // 3) * 4] * 3 + pixpat.draw_pattern( + pixpat.Buffer([y_src, cb_src, cr_src], 'T430', w, h, strides), + 'smpte', + ) + pixpat.convert( + pixpat.Buffer([mid], 'AVUY16161616', w, h, [w * 8]), + pixpat.Buffer([y_src, cb_src, cr_src], 'T430', w, h, strides), + ) + pixpat.convert( + pixpat.Buffer([y_dst, cb_dst, cr_dst], 'T430', w, h, strides), + pixpat.Buffer([mid], 'AVUY16161616', w, h, [w * 8]), + ) + assert y_src == y_dst + assert cb_src == cb_dst + assert cr_src == cr_dst + + +def test_strides_length_mismatch_multi_plane_raises(): + """Strides count must match planes count — caught in Python before C.""" + w, h = 64, 32 + y = bytearray(w * h) + uv = bytearray(w * (h // 2)) + with pytest.raises(ValueError, match='strides'): + pixpat.draw_pattern( + pixpat.Buffer([y, uv], 'NV12', w, h, [w]), # 1 stride, 2 planes + 'smpte', + ) diff --git a/pixpat-python/tests/test_numpy.py b/pixpat-python/tests/test_numpy.py new file mode 100644 index 0000000..6dbd7bb --- /dev/null +++ b/pixpat-python/tests/test_numpy.py @@ -0,0 +1,110 @@ +"""Empirical check that pixpat works correctly with numpy. + +pixpat does not depend on numpy. This file proves that ndarrays work +as ``Buffer`` planes through the buffer protocol, in the shapes a +caller would typically use. Skipped when numpy is missing. +""" + +import pixpat +import pytest + +np = pytest.importorskip('numpy') + + +def test_xrgb8888_into_uint8_3d_ndarray(): + w, h = 64, 32 + arr = np.zeros((h, w, 4), dtype=np.uint8) + pixpat.draw_pattern(pixpat.Buffer([arr], 'XRGB8888', w, h, [arr.strides[0]]), 'smpte') + assert arr.any() + + +def test_abgr16161616_into_uint16_3d_ndarray(): + w, h = 64, 32 + arr = np.zeros((h, w, 4), dtype=np.uint16) + pixpat.draw_pattern(pixpat.Buffer([arr], 'ABGR16161616', w, h, [arr.strides[0]]), 'smpte') + assert arr.any() + assert arr.dtype == np.uint16 + + +def test_nv12_into_two_ndarrays(): + w, h = 64, 32 + y = np.zeros((h, w), dtype=np.uint8) + uv = np.zeros((h // 2, w), dtype=np.uint8) # interleaved U,V at half-height + pixpat.draw_pattern( + pixpat.Buffer( + planes=[y, uv], + fmt='NV12', + width=w, + height=h, + strides=[y.strides[0], uv.strides[0]], + ), + 'smpte', + ) + assert y.any() + assert uv.any() + + +def test_convert_with_readonly_ndarray_src(): + """A numpy view with writeable=False must work as the convert source.""" + w, h = 64, 32 + src = np.zeros((h, w, 4), dtype=np.uint8) + pixpat.draw_pattern(pixpat.Buffer([src], 'XRGB8888', w, h, [src.strides[0]]), 'smpte') + src.flags.writeable = False + + dst = np.zeros((h, w, 4), dtype=np.uint16) + pixpat.convert( + pixpat.Buffer([dst], 'ABGR16161616', w, h, [dst.strides[0]]), + pixpat.Buffer([src], 'XRGB8888', w, h, [src.strides[0]]), + ) + assert dst.any() + + +def test_roundtrip_ndarrays_byte_for_byte(): + """End-to-end XRGB8888 -> ABGR16161616 -> XRGB8888 with ndarrays only.""" + w, h = 64, 32 + src = np.zeros((h, w, 4), dtype=np.uint8) + mid = np.zeros((h, w, 4), dtype=np.uint16) + dst = np.zeros((h, w, 4), dtype=np.uint8) + + pixpat.draw_pattern(pixpat.Buffer([src], 'XRGB8888', w, h, [src.strides[0]]), 'smpte') + pixpat.convert( + pixpat.Buffer([mid], 'ABGR16161616', w, h, [mid.strides[0]]), + pixpat.Buffer([src], 'XRGB8888', w, h, [src.strides[0]]), + ) + pixpat.convert( + pixpat.Buffer([dst], 'XRGB8888', w, h, [dst.strides[0]]), + pixpat.Buffer([mid], 'ABGR16161616', w, h, [mid.strides[0]]), + ) + assert np.array_equal(src, dst) + + +def test_writable_view_via_view_method(): + """`arr.view()` shares memory and is writable by default — drawing into + it must update the original.""" + w, h = 64, 32 + arr = np.zeros((h, w, 4), dtype=np.uint8) + view = arr.view() + pixpat.draw_pattern(pixpat.Buffer([view], 'XRGB8888', w, h, [view.strides[0]]), 'smpte') + assert arr.any() # the original sees the writes + + +def test_noncontiguous_source_smoke(): + """Discovery test: what happens if the caller hands us a non-C-contiguous + array? pixpat reads ``stride`` bytes per row, so any row-contiguous + layout where ``arr.strides[0]`` is the row stride should work; a + transposed array (where rows aren't even contiguous) is misuse. + + This test passes a *valid* row-contiguous slice — the top half of a + bigger image, taken via slicing — and expects success. It exists to + document the contract: 'rows must be contiguous in memory; pass + arr.strides[0] as the stride'. + """ + big = np.zeros((64, 64, 4), dtype=np.uint8) + top_half = big[:32] # shape (32, 64, 4); rows still C-contiguous + assert top_half.strides[0] == 64 * 4 + pixpat.draw_pattern( + pixpat.Buffer([top_half], 'XRGB8888', 64, 32, [top_half.strides[0]]), + 'smpte', + ) + assert big[:32].any() + assert not big[32:].any() # the other half stayed untouched diff --git a/pixpat-python/tests/test_threading.py b/pixpat-python/tests/test_threading.py new file mode 100644 index 0000000..99d4620 --- /dev/null +++ b/pixpat-python/tests/test_threading.py @@ -0,0 +1,187 @@ +"""Multi-threaded == single-threaded parity tests. + +The chosen ``num_threads`` is supposed to be transparent: the output +bytes must match the single-threaded output exactly. Anything else is +a threading bug. + +These tests parametrize across one format per writer template so that +every code path in the dispatcher is covered. +""" + +import pytest + +import pixpat + + +# Width must be divisible by 3 (T430/XYYY2101010 pack 3-per-32-bit), 4 +# (Bayer 10P packs 4 pixels per group), and 2 (h_sub). +# Height must be divisible by 2 (v_sub for 4:2:0 formats). +# +# Two sizes: a small one for fast format-coverage parametrization, and +# a larger one (~HD-width) where worker-stripe alignment can interact +# with the writers' chroma logic in non-obvious ways. +SMALL = (192, 64) +LARGE = (1920, 64) + + +def _alloc(fmt, w, h): + """Return (planes, strides) sized for `fmt` at (w, h).""" + if fmt in ('NV12', 'NV21'): # semi-planar 4:2:0 + return [bytearray(w * h), bytearray(w * (h // 2))], [w, w] + if fmt in ('NV16', 'NV61'): # semi-planar 4:2:2 + return [bytearray(w * h), bytearray(w * h)], [w, w] + if fmt == 'P030': # semi-planar 4:2:0, 10-bit packed 3-per-32 + ys, uvs = (w // 3) * 4, (w // 3 // 2) * 8 + return [bytearray(ys * h), bytearray(uvs * (h // 2))], [ys, uvs] + if fmt == 'P230': # semi-planar 4:2:2, 10-bit packed 3-per-32 + ys, uvs = (w // 3) * 4, (w // 3 // 2) * 8 + return [bytearray(ys * h), bytearray(uvs * h)], [ys, uvs] + if fmt in ('YUV420', 'YVU420'): # planar 4:2:0 + return ( + [bytearray(w * h), bytearray((w // 2) * (h // 2)), bytearray((w // 2) * (h // 2))], + [w, w // 2, w // 2], + ) + if fmt in ('YUV422', 'YVU422'): # planar 4:2:2 + return ( + [bytearray(w * h), bytearray((w // 2) * h), bytearray((w // 2) * h)], + [w, w // 2, w // 2], + ) + if fmt in ('YUV444', 'YVU444'): # planar 4:4:4 + return [bytearray(w * h)] * 3, [w] * 3 + if fmt == 'T430': # planar packed 4:4:4, 10-bit + s = (w // 3) * 4 + return [bytearray(s * h)] * 3, [s] * 3 + if fmt in ('Y8', 'R8', 'RGB332'): + return [bytearray(w * h)], [w] + if fmt == 'XYYY2101010': + s = (w // 3) * 4 + return [bytearray(s * h)], [s] + if fmt in ('YUYV', 'YVYU', 'UYVY', 'VYUY'): + return [bytearray(w * h * 2)], [w * 2] + if fmt in ('Y210', 'Y212', 'Y216'): + # 4:2:2, two pixels per 64-bit word. + return [bytearray(w * h * 4)], [w * 4] + if fmt == 'VUY888': + # 24-bit packed YUV, 3 bytes per pixel (storage uint32_t). + return [bytearray(w * h * 3)], [w * 3] + if fmt in ('XVUY2101010', 'XVUY8888'): + return [bytearray(w * h * 4)], [w * 4] + if fmt in ('AVUY16161616', 'ABGR16161616'): + return [bytearray(w * h * 8)], [w * 8] + # MIPI CSI-2 byte packing (Bayer SXXXX10P/12P and grayscale Y10P/Y12P). + if fmt.endswith('10P'): + s = (w // 4) * 5 + return [bytearray(s * h)], [s] + if fmt.endswith('12P'): + s = (w // 2) * 3 + return [bytearray(s * h)], [s] + # Plain Bayer + if fmt.startswith('S') and fmt[-1] == '8': + return [bytearray(w * h)], [w] + if fmt.startswith('S'): + return [bytearray(w * h * 2)], [w * 2] + # 16-bit RGB + if fmt.endswith('565') or fmt.endswith('1555') or fmt.endswith('4444'): + return [bytearray(w * h * 2)], [w * 2] + # 32-bit RGB (8888 / 2101010 / 1010102 / 888 in 32-bit storage) + return [bytearray(w * h * 4)], [w * 4] + + +# One format per writer template in pixpat.cpp. +DRAW_FORMATS = [ + 'XRGB8888', # ARGB_Writer + 'RGB565', # ARGB_Writer (16-bit) + 'ABGR16161616', # ARGB_Writer (normalized wide) + 'XVUY2101010', # YUV_Writer + 'AVUY16161616', # YUV_Writer (normalized wide) + 'YUYV', # YUVPackedWriter + 'NV12', # YUVSemiPlanarWriter v_sub=2 + 'NV16', # YUVSemiPlanarWriter v_sub=1 + 'P030', # YUVSemiPlanarWriter v_sub=2, 10-bit packed + 'P230', # YUVSemiPlanarWriter v_sub=1, 10-bit packed + 'YUV420', # YUVPlanarWriter v_sub=2 + 'YUV422', # YUVPlanarWriter v_sub=1 + 'YUV444', # YUVPlanarWriter no subsampling + 'T430', # YUVPlanarPackedWriter + 'Y8', # Y_Writer (1 sample per byte) + 'XYYY2101010', # Y_Writer (3 samples per word) + 'Y10P', # GrayPacked_Writer (CSI-2 byte packing, Y-only) + 'Y210', # YUVPackedWriter with X-padding entries (4:2:2 in 64-bit word) + 'R8', # MonoRGB_Writer (single R channel) + 'SRGGB8', # Bayer_Writer + 'SRGGB10', # Bayer_Writer + 'SRGGB10P', # BayerPacked_Writer + 'SRGGB12P', # BayerPacked_Writer +] + +# Convert sources: same minus Bayer (Bayer formats have no read path). +CONVERT_FORMATS = [f for f in DRAW_FORMATS if not f.startswith('S')] + + +@pytest.mark.parametrize('fmt', DRAW_FORMATS) +@pytest.mark.parametrize( + 'pattern', + ['smpte', 'kmstest', 'checker', 'hramp', 'vramp', 'dramp', 'zoneplate'], +) +@pytest.mark.parametrize('size', [SMALL, LARGE], ids=['small', 'large']) +def test_draw_pattern_threaded_matches_single(fmt, pattern, size): + """draw_pattern with num_threads=4 must produce identical bytes + to num_threads=1 for every supported format.""" + w, h = size + s_planes, strides = _alloc(fmt, w, h) + pixpat.draw_pattern(pixpat.Buffer(s_planes, fmt, w, h, strides), pattern, num_threads=1) + + t_planes, _ = _alloc(fmt, w, h) + pixpat.draw_pattern(pixpat.Buffer(t_planes, fmt, w, h, strides), pattern, num_threads=4) + + for i, (s, t) in enumerate(zip(s_planes, t_planes)): + assert s == t, f'{fmt} ({pattern}, {w}x{h}): plane {i} differs threaded vs single' + + +@pytest.mark.parametrize('fmt', ['XRGB8888', 'NV12', 'YUV420', 'BGR888']) +@pytest.mark.parametrize('size', [SMALL, LARGE], ids=['small', 'large']) +def test_draw_pattern_plain_threaded_matches_single(fmt, size): + """draw_pattern with params= must also be bit-identical across + thread counts. Spot-check a few representative formats.""" + w, h = size + params = {'color': '12ab34'} + s_planes, strides = _alloc(fmt, w, h) + pixpat.draw_pattern( + pixpat.Buffer(s_planes, fmt, w, h, strides), 'plain', num_threads=1, params=params + ) + + t_planes, _ = _alloc(fmt, w, h) + pixpat.draw_pattern( + pixpat.Buffer(t_planes, fmt, w, h, strides), 'plain', num_threads=4, params=params + ) + + for i, (s, t) in enumerate(zip(s_planes, t_planes)): + assert s == t, f'{fmt} (plain, {w}x{h}): plane {i} differs threaded vs single' + + +@pytest.mark.parametrize('src_fmt', CONVERT_FORMATS) +@pytest.mark.parametrize('dst_fmt', ['ABGR16161616', 'AVUY16161616', 'XRGB8888', 'NV12', 'YUV420']) +@pytest.mark.parametrize('size', [SMALL, LARGE], ids=['small', 'large']) +def test_convert_threaded_matches_single(src_fmt, dst_fmt, size): + """convert with num_threads=4 must produce identical bytes + to num_threads=1.""" + w, h = size + src_planes, src_strides = _alloc(src_fmt, w, h) + pixpat.draw_pattern(pixpat.Buffer(src_planes, src_fmt, w, h, src_strides), 'smpte') + + s_dst_planes, dst_strides = _alloc(dst_fmt, w, h) + pixpat.convert( + pixpat.Buffer(s_dst_planes, dst_fmt, w, h, dst_strides), + pixpat.Buffer(src_planes, src_fmt, w, h, src_strides), + num_threads=1, + ) + + t_dst_planes, _ = _alloc(dst_fmt, w, h) + pixpat.convert( + pixpat.Buffer(t_dst_planes, dst_fmt, w, h, dst_strides), + pixpat.Buffer(src_planes, src_fmt, w, h, src_strides), + num_threads=4, + ) + + for i, (s, t) in enumerate(zip(s_dst_planes, t_dst_planes)): + assert s == t, f'{src_fmt}->{dst_fmt} ({w}x{h}): plane {i} differs threaded vs single' |
