from typing import Optional
from typing import Tuple
from typing import Union
import itertools as it
from multiprocessing.pool import ThreadPool
from queue import Queue
import numpy as np
from . import Decoder
from . import CudaTensor
from . import DLDataType
from . import array_to_tensor
from . import uint8
from . import float32
from . import make_transform
from . import warp_affine
from . import lighting
from . import _AUGPY_TO_NUMPY_DTYPES
[docs]class DecodeWarp(object):
r"""
Use :py:func:`Decoder.decode` to decode JPEG images in memory
and apply :py:func:`warp_affine` into batch tensor buffers.
DecodeWarp instances allocate buffers and decoders on the
:ref:`py/core:current_device`.
Parameters:
batch_size: number of samples in a batch
shape: shape of one image in the batch :math:`(C,H,W)`
background: tensor with :math:`C` background color values
dtype: type used for buffer tensors
cpu_threads: number of parallel decoders
num_buffers: number of buffers tensors
decode_buffer_size: size of pre-allocated buffer for
decoding; must be larger than the number of subpixels;
if ``None`` a new buffer is allocated every time
"""
def __init__(
self,
batch_size: int,
shape: Tuple[int, int, int],
background: CudaTensor = None,
dtype: DLDataType = uint8,
cpu_threads: int = 1,
num_buffers: int = 2,
decode_buffer_size: Optional[int] = None,
):
self.batch_size = batch_size
self.shape = shape
# create num_buffers batch buffer tensors
self.buffers = [CudaTensor((batch_size,)+tuple(shape), dtype=dtype)
for _ in range(num_buffers)]
self._buffer_queue = Queue()
for b in self.buffers:
b.fill(0)
self._buffer_queue.put(b)
# create background tensor
if background is None:
background = (0,) * shape[0]
self.background = array_to_tensor(np.array(background, dtype=_AUGPY_TO_NUMPY_DTYPES[dtype]))
# create buffers and decoders as necessary
decode_buffers = [
CudaTensor((decode_buffer_size * dtype.itemsize,), dtype=uint8)
if decode_buffer_size else None
for _ in range(cpu_threads)
]
decoders = [Decoder() for _ in range(cpu_threads)]
self._decoder_queue = Queue()
for d, b in zip(decoders, decode_buffers):
self._decoder_queue.put((d, b))
# thread pool for decoding
self._pool = ThreadPool(cpu_threads)
def _decode_warp(self, args):
i, imdata, augmentation, buffer = args
decoder, decode_buffer = self._decoder_queue.get()
try:
im = decoder.decode(imdata, decode_buffer)
m, s = make_transform(im.shape[:-1], self.shape[1:], **augmentation)
warp_affine(im, buffer[i], m, self.background, s)
finally:
self._decoder_queue.put((decoder, decode_buffer))
[docs] def __call__(self, batch: dict) -> dict:
r"""
Decode a list of JPEG images under the ``'image'`` key
and warp them into a batch tensor with the parameters
defined by a list of augmentation dicts under key
``'augmentation'``.
Each set of augmentation parameters is a dict that contains
values for the parameters of the :py:func:`warp_affine`
function. Additional parameters are ignored.
Parameters:
batch: dict ``{'image': [JPEG, JPEG, ...],
'augmentation': [params, params, ...]}``
Returns:
``batch`` where ``'image'`` is replaced by a batch tensor
of transformed images.
"""
buffer = self._buffer_queue.get()
self._pool.map(
self._decode_warp,
zip(range(self.batch_size), batch['image'], batch['augmentation'], it.cycle((buffer,)))
)
batch['image'] = buffer
return batch
[docs] def finalize_batch(self, buffer):
self._buffer_queue.put(buffer)
[docs]class Lighting(object):
r"""
Apply the :py:func:`lighting` function to a batch of images.
The batch tensor must have format :math:`(N,C,H,W)`,
with batch size :math:`N`, number of channels :math:`C`,
height :math:`H`, and width :math:`W`.
Lighting instances allocate buffers on the :ref:`py/core:current_device`.
Parameters:
batch_size: number of samples in a batch
channels: number of channels :math:`C` per image
min_value: minimum brightness value, typically 0 for 8 bit images
max_value: maximum brightness value, typically 255 for 8 bit images
"""
def __init__(
self,
batch_size: int,
channels: int = 3,
min_value: Union[int, float] = 0,
max_value: Union[int, float] = 255
):
self.batch_size = batch_size
self.channels = channels
self.min_value = min_value
self.max_value = max_value
self._param_buffer = CudaTensor((batch_size * (channels + 2),), dtype=float32)
self.gray_buffer = self._param_buffer[:batch_size]
self.contrast_buffer = self._param_buffer[batch_size:batch_size+batch_size]
self.color_buffer = self._param_buffer[batch_size+batch_size:]
[docs] def __call__(self, batch):
r"""
Apply lighting augmentation to a batch of images in a tensor
under the ``'image'`` key, with parameters parameters defined
by a list of augmentation dicts under key ``'augmentation'``.
Modifies the image tensor in-place.
Each set of augmentation parameters is a dict that contains
values for the parameters of the :py:func:`lighting` function.
Additional parameters are ignored.
Parameters:
batch: dict ``{'image': [JPEG, JPEG, ...],
'augmentation': [params, params, ...]}``
Returns:
``batch`` where ``'image'`` has been modified in-place
according to given augmentation parameters.
"""
# copy params to GPU buffer
augmentation = batch['augmentation']
params = [a.get('gamma_gray') or 1.0 for a in augmentation] \
+ [a.get('contrast') or 0.0 for a in augmentation]
default_color = (1,) * self.channels
for a in augmentation:
params.extend(a.get('gamma_color') or default_color)
array_to_tensor(np.array(params, dtype=np.float32), self._param_buffer)
# apply lighting to batch
tensor = batch['image']
lighting(
tensor,
self.gray_buffer,
self.color_buffer,
self.contrast_buffer,
self.min_value,
self.max_value,
tensor
)
return batch