1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
|
"""ctypes plumbing for libpixpat.
Private module — public API lives in :mod:`pixpat`. Loads the bundled
shared library, declares the C structs and function signatures, and
provides :func:`_fill_buffer` to translate Python plane sequences into
the C ``pixpat_buffer`` layout while pinning the underlying memory via
the buffer protocol.
The buffer-protocol path uses ``PyObject_GetBuffer`` / ``PyBuffer_Release``
directly (rather than ctypes ``from_buffer``) so that read-only inputs
work for the ``src`` side of :func:`pixpat.convert`. ``from_buffer``
unconditionally requires a writable buffer.
"""
import ctypes
import os
import pathlib
from typing import Sequence
_PIXPAT_MAX_PLANES = 4
_lib_override = os.environ.get('PIXPAT_LIB')
if _lib_override:
if not pathlib.Path(_lib_override).exists():
raise ImportError(f'pixpat: PIXPAT_LIB={_lib_override} does not exist')
_lib = ctypes.CDLL(_lib_override)
else:
_lib_dir = pathlib.Path(__file__).parent / '_lib'
_so_candidates = sorted(_lib_dir.glob('libpixpat.so*'))
if not _so_candidates:
raise ImportError(f'pixpat: no libpixpat.so* found in {_lib_dir}')
_lib = ctypes.CDLL(str(_so_candidates[0]))
class _Buffer(ctypes.Structure):
_fields_ = [
('format', ctypes.c_char_p),
('width', ctypes.c_uint32),
('height', ctypes.c_uint32),
('num_planes', ctypes.c_uint32),
('planes', ctypes.c_void_p * _PIXPAT_MAX_PLANES),
('strides', ctypes.c_uint32 * _PIXPAT_MAX_PLANES),
]
class _PatternOpts(ctypes.Structure):
_fields_ = [
('rec', ctypes.c_int),
('range', ctypes.c_int),
('num_threads', ctypes.c_int),
('params', ctypes.c_char_p),
]
class _ConvertOpts(ctypes.Structure):
_fields_ = [
('rec', ctypes.c_int),
('range', ctypes.c_int),
('num_threads', ctypes.c_int),
]
_lib.pixpat_draw_pattern.argtypes = [
ctypes.POINTER(_Buffer),
ctypes.c_char_p,
ctypes.POINTER(_PatternOpts),
]
_lib.pixpat_draw_pattern.restype = ctypes.c_int
_lib.pixpat_convert.argtypes = [
ctypes.POINTER(_Buffer),
ctypes.POINTER(_Buffer),
ctypes.POINTER(_ConvertOpts),
]
_lib.pixpat_convert.restype = ctypes.c_int
_lib.pixpat_format_supported.argtypes = [ctypes.c_char_p]
_lib.pixpat_format_supported.restype = ctypes.c_int
_lib.pixpat_format_count.argtypes = []
_lib.pixpat_format_count.restype = ctypes.c_size_t
_lib.pixpat_format_name.argtypes = [ctypes.c_size_t]
_lib.pixpat_format_name.restype = ctypes.c_char_p
class _Py_buffer(ctypes.Structure):
_fields_ = [
('buf', ctypes.c_void_p),
('obj', ctypes.py_object),
('len', ctypes.c_ssize_t),
('itemsize', ctypes.c_ssize_t),
('readonly', ctypes.c_int),
('ndim', ctypes.c_int),
('format', ctypes.c_char_p),
('shape', ctypes.POINTER(ctypes.c_ssize_t)),
('strides', ctypes.POINTER(ctypes.c_ssize_t)),
('suboffsets', ctypes.POINTER(ctypes.c_ssize_t)),
('internal', ctypes.c_void_p),
]
_PyObject_GetBuffer = ctypes.pythonapi.PyObject_GetBuffer
_PyObject_GetBuffer.argtypes = [
ctypes.py_object,
ctypes.POINTER(_Py_buffer),
ctypes.c_int,
]
_PyObject_GetBuffer.restype = ctypes.c_int
_PyBuffer_Release = ctypes.pythonapi.PyBuffer_Release
_PyBuffer_Release.argtypes = [ctypes.POINTER(_Py_buffer)]
_PyBuffer_Release.restype = None
_PyBUF_SIMPLE = 0
_PyBUF_WRITABLE = 0x0001
class _PinnedBuffers:
"""Holds Py_buffer views for the lifetime of a pixpat call.
Releases each view in ``__exit__`` (or when garbage-collected) so
the underlying objects' buffer-export count drops back to zero.
"""
def __init__(self) -> None:
self._views: list[_Py_buffer] = []
def acquire(self, obj, *, writable: bool) -> int:
view = _Py_buffer()
flags = _PyBUF_WRITABLE if writable else _PyBUF_SIMPLE
# ctypes.pythonapi propagates the set exception (BufferError /
# TypeError) automatically on failure, so no rc check is needed.
_PyObject_GetBuffer(obj, ctypes.byref(view), flags)
self._views.append(view)
return view.buf or 0
def release(self) -> None:
while self._views:
view = self._views.pop()
_PyBuffer_Release(ctypes.byref(view))
def __enter__(self) -> '_PinnedBuffers':
return self
def __exit__(self, exc_type, exc, tb) -> None:
self.release()
def __del__(self) -> None:
self.release()
def _fill_buffer(
buf: _Buffer,
pins: _PinnedBuffers,
planes: Sequence,
fmt: str,
width: int,
height: int,
strides: Sequence[int],
*,
writable: bool,
role: str = '',
) -> None:
"""Populate ``buf`` from a Python plane sequence, pinning each plane.
``writable`` selects whether each plane must be writable: True for
destination buffers (the C library writes into them), False for
source buffers (the C library only reads).
"""
label = f'{role} ' if role else ''
if len(planes) > _PIXPAT_MAX_PLANES:
raise ValueError(f'too many {label}planes: {len(planes)} (max {_PIXPAT_MAX_PLANES})')
if len(strides) != len(planes):
raise ValueError(f'{label}strides has {len(strides)} entries, expected {len(planes)}')
plane_ptrs = (ctypes.c_void_p * _PIXPAT_MAX_PLANES)()
stride_arr = (ctypes.c_uint32 * _PIXPAT_MAX_PLANES)()
for i, plane in enumerate(planes):
try:
addr = pins.acquire(plane, writable=writable)
except BufferError as e:
kind = 'writable' if writable else 'buffer-protocol'
raise TypeError(f'{label}plane {i} does not support the {kind} interface: {e}') from e
plane_ptrs[i] = addr
stride_arr[i] = strides[i]
buf.format = fmt.encode('ascii')
buf.width = width
buf.height = height
buf.num_planes = len(planes)
buf.planes = plane_ptrs
buf.strides = stride_arr
__all__ = [
'_Buffer',
'_PatternOpts',
'_ConvertOpts',
'_PinnedBuffers',
'_lib',
'_fill_buffer',
]
|