Source code for rockverse.voxel_image

"""
Handles the basic class for RockVerse Digital Rock Petrophysics,
the :class:`VoxelImage <rockverse.voxel_image.VoxelImage>` class. It builds upon
`Zarr arrays <https://zarr.readthedocs.io/en/stable/user-guide/arrays.html>`_,
adding attributes and methods specifically designed for digital rock
petrophysics in a high-performance, parallel computing environment.
It can efficiently handle large images by leveraging Zarr's chunked storage.

The ``VoxelImage`` class is designed for simplicity, handling complex computational
abstractions under the hood, making it accessible and user-friendly for non-HPC
specialists through high-level functions.
"""

import json
import zarr
from zarr.errors import ContainsArrayError
import numpy as np
from mpi4py import MPI
from rockverse import _assert
from rockverse.errors import collective_raise, collective_only_rank0_runs
from rockverse.voxel_image._math import _array_math
from rockverse.voxel_image._finneypack import fill_finney_pack
from rockverse._utils import rvtqdm, auto_chunk_3d, index_bounding_box
from rockverse.configure import config
comm = config.mpi_comm
mpi_rank = config.mpi_rank
mpi_nprocs = config.mpi_nprocs

[docs] def create(shape, dtype, chunks='nprocs', store=None, path=None, overwrite=False, field_name='', field_unit='', description='', voxel_origin=None, voxel_length=None, voxel_unit='', **kwargs): """ Create empty voxel image. Parameters ---------- shape : tuple Desired image shape. dtype : string or dtype NumPy dtype. Type must be numeric (unsigned integer, integer, float, complex) or boolean. Ex: ``dtype=int``, ``dtype='u2'``, ``dtype='f4'``, ``dtype=complex``. chunks : iterable of ints | int | 'nprocs' | None, optional If iterable of integers, define the chunk shape. If integer, chunks will be as close as possible to cubic shapes, and the number of chunks will match this input number. If 'nprocs', the number of chunks will match the number of MPI processes. If None, False, empty tuple or any other object that makes ``not chunks`` True, chunk shape will be set to the array shape, i.e., single chunk for the whole array. Default is 'nprocs'. store : str | zarr.storage.StoreLike | None, optional A string with the file path in the local file disk, or any valid `Zarr store <https://zarr.readthedocs.io/en/stable/user-guide/storage.html>`_, or ``None`` to use Memory store. Default is None. path : str or None, optional The path of the array within the store. If path is None, the array will be located at the root of the store. overwrite : bool, optional If True, delete all pre-existing data in the store at the specified path before creating the new image. Default value is False. field_name : str, optional Name for the stored image or scalar field. field_unit : str, optional Unit for the stored image or scalar field. description : str, optional Description for the stored image or scalar field. voxel_origin : list or tuple, optional Image voxel coordinate origin in units of `voxel_unit`. This is the spatial coordinate for the voxel with lowest (x, y, z) coordinates. If ``None``, defaults to a tuple of zeros for each image dimension. voxel_length : list or tuple, optional Image voxel length in each dimension. If ``None``, defaults to a tuple of ones for each image dimension. voxel_unit : str, optional Image voxel length unit. **kwargs Additional keyword arguments to be passed to the underlying `Zarr.create_array <https://zarr.readthedocs.io/en/stable/api/zarr/index.html#zarr.create_array>`_ function. Returns ------- VoxelImage The created ``VoxelImage`` object. """ # Check for valid shape --------------------- _assert.iterable.ordered_integers_positive('shape', shape) _assert.iterable.length('shape', shape, 3) # Check for valid dtype --------------------- _assert.condition.voxelimage_dtype('dtype', dtype) # Check for valid voxel_length -------------- if voxel_length is not None: _voxel_length = voxel_length else: _voxel_length = [1 for k in shape] _assert.iterable.ordered_numbers_positive('voxel_length', _voxel_length) _assert.iterable.length('voxel_length', _voxel_length, 3) # Check for valid voxel_origin -------------- if voxel_origin is not None: _voxel_origin = voxel_origin else: _voxel_origin = [0 for k in shape] _assert.iterable.ordered_numbers('voxel_origin', _voxel_origin) _assert.iterable.length('voxel_origin', _voxel_origin, 3) # Check for valid chunks -------------------- if not chunks: _chunks = shape elif chunks == 'nprocs': _chunks = auto_chunk_3d(shape, config.mpi_nprocs) elif isinstance(chunks, int): _chunks = auto_chunk_3d(shape, chunks) else: _chunks = chunks _assert.iterable.ordered_numbers_positive('chunks', _chunks) _assert.iterable.length('chunks', _chunks, 3) # Check for valid overwrite ----------------- _assert.instance('overwrite', overwrite, 'boolean', (bool,)) # Check for valid voxel_unit ---------------- _assert.instance('voxel_unit', voxel_unit, 'string', (str,)) # Check for valid path ---------------------- if path is not None: _assert.instance('path', path, 'string', (str,)) kwargs['shape'] = shape kwargs['dtype'] = dtype kwargs['chunks'] = _chunks kwargs['store'] = store kwargs['overwrite'] = overwrite kwargs['store'] = store kwargs['name'] = path kwargs['zarr_format'] = 3 if 'attributes' not in kwargs: kwargs['attributes'] = {} kwargs['attributes'].update( _ROCKVERSE_DATATYPE='VoxelImage', description=description, field_name=field_name, field_unit=field_unit, voxel_unit=voxel_unit, voxel_origin=_voxel_origin, voxel_length=_voxel_length, ) #Only rank 0 writes metadata to disk if isinstance(store, (str, zarr.storage.LocalStore)): with collective_only_rank0_runs(): if mpi_rank == 0: with zarr.config.set({'array.order': 'C'}): z = zarr.create_array(**kwargs) with zarr.config.set({'array.order': 'C'}): for k in range(mpi_nprocs): if k == mpi_rank: z = zarr.open(store=store, path=kwargs['name'], mode='r+') comm.barrier() else: with zarr.config.set({'array.order': 'C'}): z = zarr.create_array(**kwargs) comm.barrier() return VoxelImage(z)
[docs] def empty(shape, dtype, **kwargs): """ Create an empty voxel image with the given shape. The array will be initialized without any specific values. Parameters ---------- shape : tuple Desired image shape. dtype : string or dtype NumPy dtype. **kwargs Additional keyword arguments to be passed to the underlying :func:`creation function <rockverse.voxel_image.create>`. Returns ------- VoxelImage The created ``VoxelImage`` object. """ kwargs['shape'] = shape kwargs['dtype'] = dtype kwargs['fill_value'] = None return create(**kwargs)
[docs] def zeros(shape, dtype, **kwargs): """ Create a voxel image filled with zeros. Parameters ---------- shape : tuple Desired image shape. dtype : string or dtype NumPy dtype. **kwargs Additional keyword arguments to be passed to the underlying :func:`creation function <rockverse.voxel_image.create>`. Returns ------- VoxelImage The created ``VoxelImage`` object. """ kwargs['shape'] = shape kwargs['dtype'] = dtype kwargs['fill_value'] = 0 return create(**kwargs)
[docs] def ones(shape, dtype, **kwargs): """ Create a voxel image filled with ones. Parameters ---------- shape : tuple Desired image shape. dtype : string or dtype NumPy dtype. **kwargs Additional keyword arguments to be passed to the underlying :func:`creation function <rockverse.voxel_image.create>`. Returns ------- VoxelImage The created ``VoxelImage`` object. """ kwargs['shape'] = shape kwargs['dtype'] = dtype kwargs['fill_value'] = 1 return create(**kwargs)
[docs] def full(shape, dtype, fill_value, **kwargs): """ Create a voxel image filled with a specified value. Parameters ---------- shape : tuple Desired image shape. dtype : string or dtype NumPy dtype. fill_value : scalar Value to fill the array. **kwargs Additional keyword arguments to be passed to the underlying :func:`creation function <rockverse.voxel_image.create>`. Returns ------- VoxelImage The created ``VoxelImage`` object. """ kwargs['shape'] = shape kwargs['dtype'] = dtype kwargs['fill_value'] = fill_value return create(**kwargs)
def _put_meta(a, kwargs): kwargs['shape'] = a.shape for k in ('dtype', 'chunks', 'description', 'field_name', 'field_unit', 'voxel_origin', 'voxel_unit', 'voxel_length'): if k not in kwargs: kwargs[k] = getattr(a, k)
[docs] def empty_like(a, **kwargs): """ Create empty voxel image with same shape, chunks, voxel_origin, voxel_length, and voxel_unit as the given image. Parameters ---------- a : VoxelImage The source voxel image to mimic. **kwargs Additional keyword arguments to be passed to the underlying :func:`creation function <rockverse.voxel_image.create>`. Returns ------- VoxelImage The created ``VoxelImage`` object. """ _assert.rockverse_instance(a, 'a', ('VoxelImage',)) _put_meta(a, kwargs) return empty(**kwargs)
[docs] def zeros_like(a, **kwargs): """ Create voxel image filled with zeros with same shape, chunks, voxel_origin, voxel_length, and voxel_unit as the given image. Parameters ---------- a : VoxelImage The source RockVerse voxel image to mimic. **kwargs Additional keyword arguments to be passed to the underlying :func:`creation function <rockverse.voxel_image.create>`. Returns ------- VoxelImage The created ``VoxelImage`` object. """ _assert.rockverse_instance(a, 'a', ('VoxelImage',)) _put_meta(a, kwargs) return zeros(**kwargs)
[docs] def ones_like(a, **kwargs): """ Create voxel image filled with ones with same shape, chunks, voxel_origin, voxel_length, and voxel_unit as the given image. Parameters ---------- a : VoxelImage The source RockVerse voxel image to mimic. **kwargs Additional keyword arguments to be passed to the underlying :func:`creation function <rockverse.voxel_image.create>`. Returns ------- VoxelImage The created ``VoxelImage`` object. """ _assert.rockverse_instance(a, 'a', ('VoxelImage',)) _put_meta(a, kwargs) return ones(**kwargs)
[docs] def full_like(a, fill_value, **kwargs): """ Create voxel image filled with a specified value with same shape, chunks, voxel_origin, voxel_length, and voxel_unit as the given image. Parameters ---------- a : VoxelImage The source RockVerse voxel image to mimic. fill_value : scalar Value to fill the array. **kwargs Additional keyword arguments to be passed to the underlying :func:`creation function <rockverse.voxel_image.create>`. Returns ------- VoxelImage The created ``VoxelImage`` object. """ _assert.rockverse_instance(a, 'a', ('VoxelImage',)) _put_meta(a, kwargs) kwargs['fill_value'] = fill_value return full(**kwargs)
[docs] def from_array(array, **kwargs): """ Create a new VoxelImage object and copy array data into it. .. note:: If you use this function to create voxel images in a parallel environment, make sure all the processes have the array data available, as each chunk will be processed by the corresponding MPI process. Parameters ---------- array : array-like Input data to be copied. Must support `dtype` and `shape` attributes. **kwargs Additional keyword arguments to be passed to the underlying :func:`creation function <rockverse.voxel_image.create>`. Notice that image shape will match original array shape, and therefore keyword argument ``shape`` will be ignored in ``kwargs``. Returns ------- VoxelImage The created ``VoxelImage`` object. """ if not hasattr(array, 'dtype') or not hasattr(array, 'shape'): collective_raise(TypeError("Input must be array-like and have 'dtype' and 'shape' attributes.")) if 'dtype' not in kwargs: kwargs['dtype'] = array.dtype kwargs['shape'] = array.shape z = create(**kwargs) desc = 'Copying array' for block_id in rvtqdm(range(z.nchunks), desc=desc, unit='chunk'): if block_id % mpi_nprocs == mpi_rank: chunk_indices = z.chunk_slice_indices(block_id) z.zarray[chunk_indices] = array[chunk_indices] comm.barrier() return z
[docs] def sphere_pack(shape, dtype='u1', xlim=(-10, 10), ylim=(-10, 10), zlim=(-10, 10), sphere_radius=1, fill_value=1, **kwargs): ''' Create a sphere pack image. John Finney's experimental disordered packing of equal, hard spheres, optically imaged in 1970, is hard-coded in RockVerse from the sphere centers available in `Digital Rocks Portal <https://www.digitalrocksportal.org/projects/47>`_. The volumetric image is analytically built in normalized coordinates, with sphere radius equal to 1 and (x, y, z) positions for sphere centers roughly between -20 and 20 in each direction. The image can be built at any needed resolution by a combination of shape and spatial limits. .. note:: If you use this data, please remember to `cite the data source <https://www.digitalrocksportal.org/projects/47/cite/>`_ and the `related publications <https://www.digitalrocksportal.org/projects/47/publications/>`_. Parameters ---------- shape : tuple Shape of the image to create. dtype : string or dtype, optional NumPy dtype. Default 8-bit unsigned integer. xlim : tuple, optional Spatial limits of the sphere pack in the x-direction. ylim : tuple, optional Spatial limits of the sphere pack in the y-direction. zlim : tuple, optional Spatial limits of the sphere pack in the z-direction. sphere_radius : float, optional Radius of the spheres to pack. Default is 1 (original pack, touching, non-overlapping hard spheres). Increase this value to simulate overlapping spheres (cement growth, for example) or decrease to use smaller spheres (like grain dissolution). Sphere centers will not change. fill_value : scalar, optional Value to fill the spheres with. Default is 1. **kwargs Additional keyword arguments to be passed to the underlying :func:`creation function <rockverse.voxel_image.create>`. Returns ------- VoxelImage The created ``VoxelImage`` object. ''' ox, fx = xlim oy, fy = ylim oz, fz = zlim nx, ny, nz = shape xm = np.linspace(ox, fx, nx) hx = xm[1]-xm[0] ym = np.linspace(oy, fy, ny) hy = ym[1]-ym[0] zm = np.linspace(oz, fz, nz) hz = zm[1]-zm[0] kwargs['voxel_origin'] = [ox, oy, oz] kwargs['voxel_length'] = [hx, hy, hz] z = zeros(shape, dtype, **kwargs) fill_finney_pack(array=z.zarray, sphere_radius=sphere_radius, hx=hx, hy=hy, hz=hz, ox=ox, oy=oy, oz=oz, fill_value=fill_value) return z
[docs] def import_raw(rawfile, shape, dtype, offset=0, raw_file_order='F', description='', field_name='', field_unit='', voxel_origin=None, voxel_length=None, voxel_unit='', overwrite=False, **kwargs): """ Import raw file as a voxel image. This function imports a raw binary file into a voxel image, adding the necessary metadata specific to digital rock petrophysics in RockVerse. It supports parallel reading to efficiently import large datasets. Parameters ---------- rawfile : str Path to the raw file in the file system. shape : tuple, list, or numpy.ndarray of ints Shape of the array to be created. dtype : str Data type of the raw file, specified as a Numpy typestring formed by the sequence of characters: 1. The character for byte order: * ``<`` if little endian * ``>`` if big endian * ``|`` if not applicable (8-bit numbers) 2. The character for data type: * ``b`` for boolean * ``i`` for signed integer * ``u`` for unsigned integer * ``f`` for floating-point 3. The number of bytes per voxel * ``1`` for 8-bit data * ``2`` for 16-bit data * ``4`` for 32-bit data * ``8`` for 64-bit data * ``16`` for 128-bit data These are the accepted combinations: * ``'|b1'``: boolean * ``'|u1'``: 8-bit unsigned integer * ``'|i1'``: 8-bit signed integer * ``'<u2'``: little endian 16-bit unsigned integer * ``'<u4'``: little endian 32-bit unsigned integer * ``'<u8'``: little endian 64-bit unsigned integer * ``'<u16'``: little endian 128-bit unsigned integer * ``'>u2'``: big endian 16-bit unsigned integer * ``'>u4'``: big endian 32-bit unsigned integer * ``'>u8'``: big endian 64-bit unsigned integer * ``'>u16'``: big endian 128-bit unsigned integer * ``'<i2'``: little endian 16-bit signed integer * ``'<i4'``: little endian 32 -bit signed integer * ``'<i8'``: little endian 64-bit signed integer * ``'<i16'``: little endian 128-bit signed integer * ``'>i2'``: big endian 16-bit signed integer * ``'>i4'``: big endian 32-bit signed integer * ``'>i8'``: big endian 64-bit signed integer * ``'>i16'``: big endian 128-bit signed integer * ``'<f4'``: little endian 32-bit float * ``'<f8'``: little endian 64-bit float * ``'<f16'``: little endian 128-bit float * ``'>f4'``: big endian 32-bit float * ``'>f8'``: big endian 64-bit float * ``'>f16'``: big endian 128-bit float The imported data will be converted to the system's native byte order (little endian or big endian). offset : int, optional Number of bytes to skip at the beginning of the file. Typically a multiple of the byte-size of dtype. Default is 0. raw_file_order : {'C', 'F'}, optional Specify the memory layout order of the raw file: 'C' for C-style (row-major), 'F' for Fortran-style (column-major). Use 'F' when loading raw files exported from Fiji/ImageJ. After importing, C-style memory layout is enforced within chunks for optimal cache performance in RockVerse workflows. Default is 'F'. description : str, optional Description for the stored scalar field. field_name : str, optional Name for the stored scalar field. field_unit : str, optional Unit for the stored scalar field. voxel_origin : tuple, list, or Numpy array of ints, optional Spatial coordinate origin for the first voxel in the array, in units of `voxel_unit`. If None, defaults to a tuple of zeros for each array dimension. voxel_length : tuple, list, or Numpy array of ints or floats, optional Voxel length in each direction. If None, defaults to a tuple of ones for each array dimension. voxel_unit : str, optional Unit for the voxel length. overwrite : bool, optional If True, delete all pre-existing data in the store at the specified path before creating the new array. Default is False, to prevent accidental overwriting of existing data. **kwargs Additional keyword arguments to be passed to the underlying :func:`creation function <rockverse.voxel_image.create>`. Returns ------- VoxelImage The created ``VoxelImage`` object. """ _assert.instance('rawfile', rawfile, 'string', (str,)) _assert.iterable.ordered_integers_positive('shape', shape) if voxel_length is not None: _voxel_length = voxel_length else: _voxel_length = [1 for k in shape] _assert.iterable.ordered_numbers_positive('voxel_length', _voxel_length) if voxel_origin is not None: _voxel_origin = voxel_origin else: _voxel_origin = [0 for k in shape] _assert.iterable.ordered_numbers('voxel_origin', _voxel_origin) _assert.instance('dtype', dtype, 'string', (str,)) _assert.drpdtype('dtype', dtype) _assert.instance('name', field_name, 'string', (str,)) _assert.instance('unit', field_unit, 'string', (str,)) _assert.instance('offset', offset, 'integer', (int,)) _assert.in_group('raw_file_order', raw_file_order, ('C', 'F')) _assert.instance('voxel_unit', voxel_unit, 'string', (str,)) _assert.instance('overwrite', overwrite, 'boolean', (bool,)) kwargs['dtype'] = np.dtype(dtype).kind + str(np.dtype(dtype).itemsize) kwargs['description'] = description kwargs['field_name'] = field_name kwargs['field_unit'] = field_unit kwargs['shape'] = shape kwargs['voxel_origin'] = _voxel_origin kwargs['voxel_length'] = _voxel_length kwargs['voxel_unit'] = voxel_unit kwargs['overwrite'] = overwrite z = create(**kwargs) data = np.memmap(rawfile, dtype=dtype, mode='r', offset=offset, shape=tuple(shape), order=raw_file_order) desc = 'Importing raw file' if kwargs['field_name']: desc = f"({kwargs['field_name']}) {desc}" for block_id in rvtqdm(range(z.nchunks), desc=desc, unit='chunk'): if block_id % mpi_nprocs == mpi_rank: chunk_indices = z.chunk_slice_indices(block_id) z.zarray[chunk_indices] = data[chunk_indices] comm.barrier() return z
[docs] class VoxelImage(): """ The basic type for RockVerse Digital Rock Petrophysics, intended to contain voxelized 3D images and scalar fields in general. The class builds upon `Zarr arrays <https://zarr.readthedocs.io/en/stable/user-guide/arrays.html>`_ by adding attributes and methods specifically designed for digital rock petrophysics in a high performance parallel computing environment. .. note:: This class should not be instantiated directly. Instead, use the provided :ref:`creation functions <voxel image creation functions>`. """ def __init__(self, z): self._zarray = z def __repr__(self): return f"<VoxelImage shape={self.shape} dtype={self.dtype} store={self.zarray.store} path={self.zarray.path}>" def __getitem__(self, selection, rank=None): if isinstance(self.zarray.store, zarr.storage.LocalStore) or mpi_nprocs==1 or self.nchunks==1: return self.zarray[selection] # Memory store needs to reduce slice values if self.zarray.fill_value == 0: selected = self.zarray[selection] else: #this option temporarily doubles memory temp = zeros_like(self, store=None) for block_id in range(self.nchunks): if mpi_rank == block_id % mpi_nprocs: chunk_slice = self.chunk_slice_indices(block_id) temp.zarray[chunk_slice] = self.zarray[chunk_slice] selected = temp.zarray[selection] if rank is None: return comm.allreduce(selected, op=MPI.SUM) return comm.reduce(selected, root=rank, op=MPI.SUM) def __setitem__(self, selection, array): temp = zarr.create_array(store=None, shape=self.shape, dtype=self.dtype) for block_id in range(self.nchunks): if block_id % mpi_nprocs == mpi_rank: chunk_indices = self.chunk_slice_indices(block_id) temp[chunk_indices] = self.zarray[chunk_indices].copy() temp[selection] = array self.zarray[chunk_indices] = temp[chunk_indices].copy() @property def shape(self): """The image shape.""" return self.zarray.shape @property def chunks(self): """The image chunk shape.""" return self.zarray.chunks @property def nchunks(self): """The image number of chunks.""" return self.zarray.nchunks @property def ndim(self): """The image number of dimensions.""" return self.zarray.ndim @property def dtype(self): """The image data type.""" return self.zarray.dtype @property def zarray(self): """The underlying Zarr array object.""" return self._zarray @property def _rockverse_datatype(self): """The RockVerse data type.""" return self.zarray.attrs['_ROCKVERSE_DATATYPE'] @property def description(self): """General image or scalar field description.""" return self.zarray.attrs['description'] @description.setter def description(self, v): _assert.instance('description', v, 'str', (str,)) self.zarray.attrs['description'] = v @property def field_name(self): """Name for the stored scalar field.""" return self.zarray.attrs['field_name'] @field_name.setter def field_name(self, v): _assert.instance('field_name', v, 'str', (str,)) self.zarray.attrs['field_name'] = v @property def field_unit(self): """Unit for the stored scalar field.""" return self.zarray.attrs['field_unit'] @field_unit.setter def field_unit(self, v): _assert.instance('field_unit', v, 'str', (str,)) self.zarray.attrs['field_unit'] = v @property def nx(self): """The number of voxels in x-direction (first axis). Equivalent to shape[0].""" return self.zarray.shape[0] @property def ny(self): """The number of voxels in y-direction (second axis). Equivalent to shape[1].""" return self.zarray.shape[1] @property def nz(self): """The number of voxels in z-direction (third axis). Equivalent to shape[2].""" return self.zarray.shape[2] @property def hx(self): """Voxel length in x-direction (first axis).""" return self.zarray.attrs['voxel_length'][0] @hx.setter def hx(self, v): _assert.condition.positive_integer_or_float('hx', v) self.zarray.attrs['voxel_length'][0] = v @property def hy(self): """Voxel length in y-direction (second axis).""" return self.zarray.attrs['voxel_length'][1] @hy.setter def hy(self, v): _assert.condition.positive_integer_or_float('hy', v) self.zarray.attrs['voxel_length'][1] = v @property def hz(self): """Voxel length in z-direction (third axis).""" return self.zarray.attrs['voxel_length'][2] @hz.setter def hz(self, v): _assert.condition.positive_integer_or_float('hz', v) self.zarray.attrs['voxel_length'][2] = v @property def voxel_length(self): """ Returns the image voxel length in each spatial direction (x, y, z) as a tuple. This is a read-only property. To alter the voxel lengths, use the ``hx``, ``hy``, and ``hz`` properties. """ return tuple(self.zarray.attrs['voxel_length']) @property def h_unit(self): """Image voxel unit.""" return self.zarray.attrs['voxel_unit'] @h_unit.setter def h_unit(self, v): _assert.instance('h_unit', v, 'string', (str,)) self.zarray.attrs['voxel_unit'] = v @property def voxel_unit(self): """Image voxel unit.""" return self.zarray.attrs['voxel_unit'] @voxel_unit.setter def voxel_unit(self, v): _assert.instance('voxel_unit', v, 'string', (str,)) self.zarray.attrs['voxel_unit'] = v @property def meta_data_as_dict(self): """ Return voxel image meta data as a dictionary. """ meta = dict() for k in ('description', 'field_name', 'field_unit', 'voxel_origin', 'voxel_unit', 'voxel_length'): meta[k] = self.zarray.attrs[k] return meta @property def dimensions(self): """ Image total dimension in each direction. Returns the total physical dimensions of the image by multiplying the number of voxels by the voxel length in each spatial direction (x, y, z). """ return self.nx*self.hx, self.ny*self.hy, self.nz*self.hz @property def ox(self): """Spatial x-coordinate for the first voxel in x-direction (first axis).""" return self.zarray.attrs['voxel_origin'][0] @ox.setter def ox(self, v): _assert.condition.integer_or_float('ox', v) self.zarray.attrs['voxel_origin'][0] = v @property def oy(self): """Spatial y-coordinate for the first voxel in y-direction (second axis).""" return self.zarray.attrs['voxel_origin'][1] @oy.setter def oy(self, v): _assert.condition.integer_or_float('oy', v) self.zarray.attrs['voxel_origin'][1] = v @property def oz(self): """Spatial z-coordinate for the first voxel in z-direction (third axis).""" return self.zarray.attrs['voxel_origin'][2] @oz.setter def oz(self, v): _assert.condition.integer_or_float('oz', v) self.zarray.attrs['voxel_origin'][2] = v @property def voxel_origin(self): """ Image voxel origin for each direction. Returns the spatial coordinate origin for the first voxel in the array in each spatial direction (x, y, z) as a tuple. This is a read-only property. To alter the voxel origins, use the ``ox``, ``oy``, and ``oz`` properties. """ return tuple(self.zarray.attrs['voxel_origin']) @property def bounding_box(self): """ Image bounding box in voxel units. Returns the image bounding box, defined by the minimum and maximum voxel coordinates in each spatial direction (x, y, z). The bounding box is calculated based on the voxel origins and lengths. Returns ------- tuple A tuple containing two tuples: ((xmin, ymin, zmin), (xmax, ymax, zmax)), namely the minimum and maximum coordinates in the x, y, and z directions. """ return ((self.ox, self.oy, self.oz), (self.ox + (self.nx-1)*self.hx, self.oy + (self.ny-1)*self.hy, self.oz + (self.nz-1)*self.hz))
[docs] def chunk_slice_indices(self, chunk_id, return_indices=False): """ Calculate the slice indices for a given Zarr chunk. This method computes the first and last+1 indices for a specific Zarr chunk within the array, based on the block's ID. Useful for working with chunked data in parallel processing. Parameters ---------- chunk_id : int The ID of the block for which to calculate the slice indices. Returns ------- tuple If ``return_indices`` is True, a tuple containing six integers: (box, bex, boy, bey, boz, bez). These represent the start and end indices for the block in the x, y, and z directions, respectively. If ``return_indices`` is False, a tuple containing the three slices: (slice(box, bex), slice(boy, bey), slice(boz, bez)). """ if chunk_id >= self.nchunks: raise ValueError(f'chunk_id={chunk_id}: array has only {self.nchunks} chunks.') Nblocks = self.zarray.cdata_shape bk = chunk_id // (Nblocks[0]*Nblocks[1]) bj = (chunk_id - bk*Nblocks[0]*Nblocks[1]) // Nblocks[0] bi = chunk_id - bk*Nblocks[0]*Nblocks[1] - bj*Nblocks[0] box = bi*self.chunks[0] bex = min(box+self.chunks[0], self.shape[0]) boy = bj*self.chunks[1] bey = min(boy+self.chunks[1], self.shape[1]) boz = bk*self.chunks[2] bez = min(boz+self.chunks[2], self.shape[2]) if return_indices: return box, bex, boy, bey, boz, bez return (slice(box, bex), slice(boy, bey), slice(boz, bez))
[docs] def get_voxel_coordinates(self, i, j, k): """ Get the spatial coordinates of the voxel at a given position. This method calculates the spatial coordinates of the voxel located at the specified (i, j, k) position, taking into account the voxel origin and voxel length. Parameters ---------- i : int The voxel index in the x-direction, 0 <= i < nx. j : int The voxel index in the y-direction, 0 <= j < ny. k : int The voxel index in the z-direction, 0 <= k < nz. Returns ------- tuple A tuple containing the spatial coordinates (x, y, z) of the specified voxel. """ return (self.ox+i*self.hx, self.oy+j*self.hy, self.oz+k*self.hz)
[docs] def get_closest_voxel_index(self, x, y, z, allow_outside=False): """ Get the voxel index closest to a given spatial position. This method calculates the (i, j, k) indices of the image voxel closest to the specified spatial coordinates (x, y, z). Parameters ---------- x : float The spatial coordinate in the x-direction. y : float The spatial coordinate in the y-direction. z : float The spatial coordinate in the z-direction. allow_outside : Boolean If False (default) return None in case the spatial coordinates point to a region outside the image bounding box. If True, return the the voxel indices even if (x, y, z) falls outside the image bounding box. Returns ------- tuple or None A tuple containing the voxel indices (i, j, k) corresponding to the specified spatial coordinates (x, y, z), or None. """ pos = (int(round((x-self.ox)/self.hx)), int(round((y-self.oy)/self.hy)), int(round((z-self.oz)/self.hz))) if 0<=pos[0]<self.nx and 0<=pos[1]<=self.ny and 0<=pos[2]<=self.nz: return pos elif allow_outside: return pos return None
[docs] def check_mask_and_segmentation(self, *, mask=None, segmentation=None): """ Validate mask and segmentation compatibility. This method checks the validity of the provided mask, segmentation, and phases to ensure they are compatible with the voxel image. Parameters ---------- mask : VoxelImage, optional A mask voxel image. If provided, must be a voxel image of boolean dtype with the same shape, voxel length, voxel origin, and voxel unit. segmentation : VoxelImage, optional A segmentation voxel image. If provided, must be a voxel image of boolean or unsigned integer dtype with the same shape, voxel length, voxel origin, and voxel unit. Raises ------ ValueError If any of the provided parameters are invalid or incompatible with the current voxel image. """ if mask is not None: _assert.rockverse_instance(mask, 'mask', ('VoxelImage',)) _assert.dtype('mask', mask, 'boolean', 'b') _assert.same_shape('Mask image', (mask, self)) _assert.same_voxel_length('Mask image', (mask, self)) _assert.same_voxel_origin('Mask image', (mask, self)) _assert.same_voxel_unit('Mask image', (mask, self)) if segmentation is not None: _assert.rockverse_instance(segmentation, 'segmentation', ('VoxelImage',)) _assert.dtype('Segmentation', segmentation, 'boolean or integer', 'bui') _assert.same_shape('Segmentation image', (segmentation, self)) _assert.same_voxel_length('Segmentation image', (segmentation, self)) _assert.same_voxel_origin('Segmentation image', (segmentation, self)) _assert.same_voxel_unit('Segmentation image', (segmentation, self))
[docs] def math(self, value, op, *, mask=None, segmentation=None, phases=None, region=None): """ Element-wise math operations. This method applies math operations for each voxel in the image. Optional parameters allow for selective setting based on mask, segmentation, and phases. Parameters ---------- value : scalar The value to be used in the operation. .. note:: There is no internal check for the correct data type of ``value``. Ensure that the value provided is compatible with the image data type. op : str The operation to be performed. Let :math:`voxel` represent each voxel in the image. See table below: .. list-table:: Operations :header-rows: 1 * - ``op`` - Operation * - 'set' - :math:`voxel = value` * - 'add' - :math:`voxel = voxel + value` * - 'subtract' - :math:`voxel = voxel - value` * - 'multiply' - :math:`voxel = voxel \\times value` * - 'divide' - :math:`voxel = voxel / value` * - 'logical and' - :math:`voxel = voxel \\land value` * - 'logical or' - :math:`voxel = voxel \\lor value` * - 'logical xor' - :math:`voxel = voxel \\oplus value` (exclusive OR) * - 'min' - :math:`voxel = \\min(voxel, value)` * - 'max' - :math:`voxel = \\max(voxel, value)` mask : voxel image, optional Boolean voxel image. If provided, the operation will ignore masked voxels (i.e., where mask is True). segmentation : voxel image, optional Unsigned integer voxel image. If provided, only voxels where the segmentation phase is in ``phases`` will be set to the specified value. phases : iterable of int, optional Any iterable with non negative integers representing the segmentation phases. Used together with ``segmentation``. Segmentation phases not in ``phases`` will be ignored by the operation. region : Region, optional A region specification. If provided, only voxels within the specified region will be set to the specified value. """ self.check_mask_and_segmentation(mask=mask, segmentation=segmentation) _array_math(array1=self, array2=None, value=value, op=op, mask=mask, segmentation=segmentation, phases=phases, region=region)
[docs] def combine(self, image, op, *, mask=None, segmentation=None, phases=None, region=None): """ Combine another voxel image by element-wise math operations. This method applies math operations for each voxel in the images. Optional parameters allow for selective setting based on mask, segmentation, and phases. Parameters ---------- image : VoxelImage The voxel image to be used in the operation. Must have shape, voxel origin, voxel length and voxel unit equal to the ones in the original array. .. note:: There is no internal check for the correct data type of ``a``. Ensure that the ``a`` provided is compatible with the image data type. op : str The operation to be performed. Let :math:`voxel1` represent each voxel in the original image and :math:`voxel2` represent each voxel in ``image``. Valid values for ``op`` are: .. list-table:: Operations :header-rows: 1 * - ``op`` - Operation * - 'copy' - :math:`voxel1 = voxel2` * - 'add' - :math:`voxel1 = voxel1 + voxel2` * - 'subtract' - :math:`voxel1 = voxel1 - voxel2` * - 'multiply' - :math:`voxel1 = voxel1 voxel2` * - 'divide' - :math:`voxel1 = voxel1 / voxel2` * - 'logical and' - :math:`voxel1 = voxel1 \\land voxel2` * - 'logical or' - :math:`voxel1 = voxel1 \\lor voxel2` * - 'logical xor' - :math:`voxel1 = voxel1 \\oplus voxel2` (exclusive OR) * - 'min' - :math:`voxel1 = \\min(voxel1, voxel2)` * - 'max' - :math:`voxel1 = \\max(voxel1, voxel2)` * - 'average' - :math:`voxel1 = (voxel1 + voxel2)/2` * - 'absolute difference' - :math:`voxel1 = |voxel1 - voxel2|` mask : VoxelImage, optional Boolean voxel image. If provided, the operation will ignore masked voxels (i.e., where mask is True). segmentation : VoxelImage, optional Unsigned integer voxel image. If provided, only voxels where the segmentation phase is in ``phases`` will be set to the specified value. phases : iterable of int, optional Any iterable with non negative integers representing the segmentation phases. Used together with ``segmentation``. Segmentation phases not in ``phases`` will be ignored by the operation. region : Region, optional A region specification. If provided, only voxels within the specified region will be set to the specified value. """ self.check_mask_and_segmentation(mask=mask, segmentation=segmentation) _array_math(array1=self, array2=image, value=None, op=op, mask=mask, segmentation=segmentation, phases=phases, region=region)
def __iadd__(self, value, *, mask=None, segmentation=None, phases=None, region=None): self.check_mask_and_segmentation(mask=mask, segmentation=segmentation) if phases is not None: _assert.iterable.any_iterable_non_negative_integers('phases', phases) if isinstance(value, (int, float)): self.math(value, op='add', mask=mask, segmentation=segmentation, phases=phases, region=region) else: collective_raise(TypeError( f"unsupported operand type(s) for +=: 'VoxelImage' and {type(value)}")) return self def broadcast_borders(self, border_voxels=1): """ Exchanges border voxels from each Zarr data block with its immediate neighbors. This method ensures that neighboring processes share boundary data for voxel computations, which is often required in stencil operations, finite difference methods, or other numerical algorithms involving neighboring voxels. Parameters: ----------- border_voxels : non negative integer Number of border voxels to be sent. """ _assert.condition.non_negative_integer('border_voxels', border_voxels) if border_voxels == 0 or mpi_nprocs == 1: return def set_slice(ind, bo, be, border_voxels): if ind == -1: return slice(bo, bo+border_voxels, 1) if ind == 1: return slice(be-border_voxels, be, 1) return slice(bo, be, 1) Nblocks = self.cdata_shape for bli in range(Nblocks[0]): for blj in range(Nblocks[1]): for blk in range(Nblocks[2]): sender_id = bli + blj*Nblocks[0] + blk*Nblocks[0]*Nblocks[1] box, bex, boy, bey, boz, bez = self.chunk_slice_indices(sender_id, return_indices=True) tag = 0 for i in [-1, 0, 1]: if not (0 <= (bli+i) < Nblocks[0]): continue slicex = set_slice(i, box, bex, border_voxels) for j in [-1, 0, 1]: if not (0 <= (blj+j) < Nblocks[1]): continue slicey = set_slice(j, boy, bey, border_voxels) for k in [-1, 0, 1]: if not (0 <= (blk+k) < Nblocks[2]): continue slicez = set_slice(k, boz, bez, border_voxels) if i == 0 and j == 0 and k == 0: continue receiver_id = (bli+i) + (blj+j)*Nblocks[0] + (blk+k)*Nblocks[0]*Nblocks[1] if (receiver_id % mpi_nprocs) == (sender_id % mpi_nprocs): continue if mpi_rank == sender_id % mpi_nprocs: data = self.zarray[slicex, slicey, slicez].copy() comm.send(obj=data, dest=(receiver_id % mpi_nprocs), tag=tag) if mpi_rank == receiver_id % mpi_nprocs: data = comm.recv(None, source=(sender_id % mpi_nprocs), tag=tag) self.zarray[slicex, slicey, slicez] = data.copy()
[docs] def copy(self, store=None, **kwargs): """ Copy the voxel image to a new store. This method supports re-chunking by passing ``chunks`` in ``**kwargs``. This is a pottentially slow method as rechunking requires collective getitem and setitem. Use a LocalStore to save a copy to the file system. Parameters ---------- store : str Path to the exported file in the file system. **kwargs Additional keyword arguments to be passed to the underlying :func:`creation function <rockverse.voxel_image.create>`. Returns ------- VoxelImage The created copy. """ v = empty_like(self, store=store, **kwargs) for block_id in rvtqdm(range(v.nchunks), desc='Copying', unit='chunk'): chunk_indices = v.chunk_slice_indices(block_id) root = block_id % mpi_nprocs data = self.__getitem__(chunk_indices, root) if mpi_rank == root: v.zarray[chunk_indices] = data
[docs] def export_raw(self, filename, dtype=None, order='F', byteorder='=', write_fiji_macro=False): """ Export a voxel image to a raw file and corresponding metadata to a JSON file. This method exports the voxel image data into a raw binary file specified by `filename`. Corresponding metadata is saved in a JSON file with the same name as the raw file but with a `.json` extension. Parameters ---------- filename : str Path to the exported raw file in the file system. dtype : {None, Numpy dtype}, optional Data type to be used in the exported raw file. If None, the original image data type will be used. Boolean types will be cast to unsigned 8-bit integers in the exported file. .. warning:: Pay attention when setting dtype, as there is no internal check to ensure that the exported data will be correctly cast from the original array dtype. order : {'C', 'F'}, optional The order of the exported raw array layout: 'C' for C-style (most rapidly changing index last), 'F' for Fortran-style (most rapidly changing index first). Use 'F' when exporting to Fiji/ImageJ. Default is 'F'. byteorder : {'<', '>', '='}, optional Byte order for the exported raw data. This parameter has precedence over the ``dtype`` parameter. Use ``'<'`` to force little-endian, ``'>'`` to force big-endian, or ``'='`` to keep the original byte order. Default is ``'='``. write_fiji_macro : bool, optional If True, writes a convenience macro file (<filename>.ijm) for opening the exported raw file in ImageJ/Fiji. In this case, byte size in ``dtype`` must be up to 64 bits, and array order must be in Fortran-style. Default is False. Raises ------ ValueError If the array order is not 'F' when `write_fiji_macro` is True. If the byte size in `dtype` exceeds 64 bits when `write_fiji_macro` is True. """ _assert.instance('filename', filename, 'string', (str,)) if dtype is not None: try: dtypeout = np.dtype(dtype) except Exception as e: collective_raise(e) else: dtypeout = self.dtype dtypeout = dtypeout.str _assert.in_group('order', order, ('C', 'F')) _assert.in_group('byteorder', byteorder, ('<', '>', '=')) if byteorder in '<>': dtypeout = byteorder + dtypeout[1:] _assert.instance('write_fiji_macro', write_fiji_macro, 'boolean', (bool,)) if write_fiji_macro: if order != 'F': collective_raise(ValueError( "Exported raw data must be in Fortran order when requesting ImageJ/Fiji macro.")) if int(dtypeout[2:]) > 8: collective_raise(ValueError( "ImageJ/Fiji macro can't be written for data types larger than 64 bits.")) attrs = {'description': self.attrs['description'], 'shape': tuple(self.shape), 'voxel length': tuple(self.attrs['voxel_length']), 'voxel_unit': self.attrs['voxel_unit'], 'voxel_origin': tuple(self.attrs['voxel_origin']), 'array_order': order, 'Numpy dtype': dtypeout, } if dtypeout[1] == 'b': attrdtype = 'boolean' dtypeout = '|b1' else: attrdtype = f'{int(dtypeout[2:])*8}-bit' if dtypeout[1] == 'u': attrdtype = f'{attrdtype} unsigned integer' elif dtypeout[1] == 'i': attrdtype = f'{attrdtype} signed integer' elif dtypeout[1] == 'f': attrdtype = f'{attrdtype} float' if int(dtypeout[2:]) > 1: if dtypeout[0] == '<': attrdtype = f'{attrdtype}, little endian' elif dtypeout[0] == '>': attrdtype = f'{attrdtype}, big endian' attrs['data type'] = attrdtype if mpi_rank == 0: file = np.memmap(filename, dtype=dtypeout, mode='w+', shape=self.shape, order=order) file.flush() comm.barrier() file = np.memmap(filename, dtype=dtypeout, mode='r+', shape=self.shape, order=order) for chunk_id in rvtqdm(range(self.nchunks), desc='Exporting raw file', unit='chunk'): if chunk_id % mpi_nprocs == mpi_rank: chunk_indices = self.chunk_slice_indices(chunk_id) file[chunk_indices] = self.zarray[chunk_indices] comm.barrier() file.flush() comm.barrier() with collective_only_rank0_runs(): if mpi_rank == 0: with open(filename+'.json', 'w') as fp: json.dump(attrs, fp, indent=4) comm.barrier() if write_fiji_macro: cmd = f"//{attrs['description']}\n" if attrs['description'] else '' cmd = f'{cmd}run("Raw...", "open={filename} image=' if int(dtypeout[2:]) == 1: cmd = f'{cmd}8-bit ' else: cmd = f'{cmd}[{int(dtypeout[2:])*8}-bit ' if dtypeout[1] == 'u': cmd = f'{cmd}Unsigned] ' elif dtypeout[1] == 'i': cmd = f'{cmd}Signed] ' elif dtypeout[1] == 'f': cmd = f'{cmd}Real] ' cmd = f'{cmd}width={self.shape[0]} height={self.shape[1]} number={self.shape[2]}' if int(dtypeout[2:]) >1 and dtypeout[0] == '<': cmd = f'{cmd} little-endian' cmd = f'{cmd}");\n' with collective_only_rank0_runs(): if mpi_rank == 0: with (filename+'.ijm', 'w') as fp: fp.write(cmd)
[docs] def create_mask_from_region(self, region, **kwargs): """ Create a mask voxel image. Create boolean voxel image with same shape, chunks, voxel_origin, voxel_length, and voxel_unit as the original image, masking voxels outside the region of interest. Parameters ---------- a : VoxelImage The source voxel image to mimic. **kwargs Additional keyword arguments to be passed to the underlying :func:`creation function <rockverse.voxel_image.create>`. Keyword argument ``dtype`` will be ignored if passed, as the mask has to be of boolean type. Returns ------- VoxelImage The created ``VoxelImage`` object. """ _assert.rockverse_instance(region, 'region', ('Region',)) kwargs['dtype'] = '|b1' if 'description' not in kwargs: kwargs['description'] = f'Mask from {str(region)}' if 'field_name' not in kwargs: kwargs['field_name'] = 'Mask' if 'field_unit' not in kwargs: kwargs['field_unit'] = '' v = full_like(self, fill_value=True, **kwargs) v.math(value=False, op='set', region=region) return v