"""
The histogram module provides the Histogram class for computing and analyzing
histograms, probability density functions (PDFs), cumulative distribution
functions (CDFs), and percentiles from VoxelImage objects.
This module supports advanced features such as masked regions, regions of
interest (ROI), and segmentation-based phase analysis, enabling efficient voxel
data exploration.
.. todo::
* Improve handling of step changes in quantile functions when calculating percentiles.
* Put countter simply as int
"""
import numpy as np
import numpy.ma as ma
import pandas as pd
import warnings
from numba import njit
from mpi4py import MPI
from rockverse import _assert
from rockverse.errors import collective_raise
from rockverse._utils import rvtqdm
from rockverse.configure import config
comm = config.mpi_comm
mpi_rank = config.mpi_rank
mpi_nprocs = config.mpi_nprocs
@njit()
def _apply_mask_cpu(skip, mask):
nx, ny, nz = skip.shape
for i in range(nx):
for j in range(ny):
for k in range(nz):
if mask[i, j, k]:
skip[i, j, k] = True
@njit()
def _block_count_phases(block_segm, count):
#Detect if nth phase is present in segmentation
nx, ny, nz = block_segm.shape
for i in range(nx):
for j in range(ny):
for k in range(nz):
if count[block_segm[i, j, k]] == 0:
count[block_segm[i, j, k]] = 1
@njit()
def _block_update_histogram(hist, block_data, block_segm, skip, bins, phases):
nx, ny, nz = block_data.shape
hist_shape = hist.shape
num_phases = hist_shape[1] - 1
N = len(bins)
for i in range(nx):
for j in range(ny):
for k in range(nz):
if skip[i, j, k] or block_data[i, j, k] < bins[0] or block_data[i, j, k] > bins[-1]:
continue
value = block_data[i, j, k]
for ind in range(1, N):
if value < bins[ind]:
hist[ind-1, 0] += 1
if num_phases > 0:
phase = block_segm[i, j, k]
for p, ph in enumerate(phases):
if phase == ph:
hist[ind-1, p + 1] += 1
break
break
[docs]
class Histogram():
'''
Compute and manage histograms, probability density functions (PDFs),
cumulative distribution functions (CDFs), and percentiles for VoxelImage data.
This class provides methods to calculate histograms, PDFs, CDFs, and
percentiles, with support for regions of interest, masks, and segmentation.
Parameters
----------
image : VoxelImage
Input image.
bins : int or sequence of scalars, optional
If int, number of equal-width bins in the range of image min and max.
If sequence, defines bin edges (including rightmost edge), and ignores
values outside bins when calculating the histogram.
Default value is 256.
region : Region (optional)
The region of interest in the image. If specified, only voxels within
the region will be considered when computing the histogram.
mask : Boolean VoxelImage (optional)
The mask to apply on the image data. If specified, only unmasked
voxels will be considered when computing the histogram.
segmentation : Unsigned integer VoxelImage (optional)
The segmentation data for image regions. If specified, histograms will
also have individual counts for each segmentation phase.
'''
def __init__(self, image, *, bins=None, region=None, mask=None, segmentation=None):
_assert.rockverse_instance(image, 'image', ('VoxelImage',))
if region is not None:
_assert.rockverse_instance(region, 'region', ('Region',))
self._image = image
self._image.check_mask_and_segmentation(mask=mask, segmentation=segmentation)
self._mask = mask
self._segmentation = segmentation
self._region = region
self._input_bins = bins
self._min = None
self._max = None
self._bins = None
self._phases = None
self._hist = None
self._get_min_max()
self._update_bins()
self._count_phases()
self._update_histogram()
def _get_min_max(self):
if self._image.dtype.kind == 'b': #Boolean image? Treat as 0s and 1s
self._min = 0
self._max = 1
return
local_min = self._image.zarray[0, 0, 0]
local_max = self._image.zarray[0, 0, 0]
local_min = comm.bcast(local_min, root=0)
local_max = comm.bcast(local_max, root=0)
hx, hy, hz = self._image.voxel_length
ox, oy, oz = self._image.voxel_origin
for block_id in rvtqdm(range(self._image.nchunks),
desc=f'Histogram {self._image.field_name} (min/max)',
unit='chunk'):
if block_id % mpi_nprocs != mpi_rank:
continue
box, bex, boy, bey, boz, bez = self._image.chunk_slice_indices(block_id, return_indices=True)
block_data = self._image.zarray[box:bex, boy:bey, boz:bez]
skip = np.zeros((bex-box, bey-boy, bez-boz), dtype='bool')
if self._mask is not None:
_apply_mask_cpu(skip, self._mask.zarray[box:bex, boy:bey, boz:bez])
if self._region is not None:
self._region.mask_chunk_cpu(skip, ox, oy, oz, hx, hy, hz, box, boy, boz)
block_data = ma.masked_array(block_data, mask=skip)
minblock = block_data.min()
if minblock < local_min:
local_min = minblock
maxblock = block_data.max()
if maxblock > local_max:
local_max = maxblock
comm.barrier()
self._min = comm.allreduce(local_min, op=MPI.MIN)
self._max = comm.allreduce(local_max, op=MPI.MAX)
def _update_bins(self):
v = self._input_bins
if self._image.dtype.kind == 'b': #Boolean image? Treat as 0's and 1's
self._bins = np.array([0, 1])
return
if v is None:
self._bins = np.linspace(start=self._min,
stop=self._max,
num=256,
dtype=self._image.dtype)
#Positive integer
elif np.dtype(type(v)).kind in 'ui' and v>0:
self._bins = np.linspace(start=self._min,
stop=self._max,
num=v,
dtype=self._image.dtype)
#Ordered iterable of numbers
elif (hasattr(v, '__iter__') and hasattr(v, '__getitem__')
and all(np.dtype(type(k)).kind in 'uif' for k in v)):
self._bins = np.sort(v)
else:
if mpi_rank == 0:
warnings.warn('Invalid value for bins. Falling back to default.')
self._input_bins = None
self._update_bins()
def _count_phases(self):
if self._segmentation is None:
self._phases = ()
return
count = np.zeros(2**(8*self._segmentation.dtype.itemsize), dtype=int)
for block_id in rvtqdm(range(self._image.nchunks),
desc=f'Histogram {self._image.field_name} (reading segmentation)',
unit='chunk'):
if block_id % mpi_nprocs != mpi_rank:
continue
box, bex, boy, bey, boz, bez = self._image.chunk_slice_indices(block_id, return_indices=True)
block_segm = self._segmentation.zarray[box:bex, boy:bey, boz:bez]
_block_count_phases(block_segm, count)
comm.barrier()
#All-reduce phases
gphases = set()
for rank in range(mpi_nprocs):
lph = comm.bcast(np.argwhere(count>0).flatten(), root=rank)
gphases = gphases.union(set(lph))
self._phases = np.sort(list(gphases)).astype(self._segmentation.dtype)
def _update_histogram(self):
hist = np.zeros((len(self._bins)-1, len(self._phases)+1), dtype=int)
hx, hy, hz = self._image.voxel_length
ox, oy, oz = self._image.voxel_origin
for block_id in rvtqdm(range(self._image.nchunks),
desc=f'Histogram {self._image.field_name} (counting voxels)',
unit='chunk'):
if block_id % mpi_nprocs != mpi_rank:
continue
box, bex, boy, bey, boz, bez = self._image.chunk_slice_indices(block_id, return_indices=True)
block_data = self._image.zarray[box:bex, boy:bey, boz:bez]
if block_data.dtype.kind == 'b': #Boolean?
block_data = block_data.astype('u1')
skip = np.zeros((bex-box, bey-boy, bez-boz), dtype='bool')
if self._mask is not None:
_apply_mask_cpu(skip, self._mask.zarray[box:bex, boy:bey, boz:bez])
if self._region is not None:
self._region.mask_chunk_cpu(skip, ox, oy, oz, hx, hy, hz, box, boy, boz)
if len(self._phases)>0:
block_segm = self._segmentation.zarray[box:bex, boy:bey, boz:bez]
phases = self._phases
else:
block_segm = skip
phases = np.array([0])
_block_update_histogram(hist, block_data, block_segm, skip, self._bins, phases)
comm.barrier()
#All-reduce counts
for c in range(hist.shape[1]):
hist[:, c] = comm.allreduce(hist[:, c], op=MPI.SUM)
self._hist = pd.DataFrame(hist, columns=['full',]+list(self._phases))
@property
def image(self):
'''
The input voxel image.
'''
return self._image
@property
def region(self):
"""
The region associated to the histogram.
"""
return self._region
@property
def mask(self):
"""
The mask associated to the histogram.
"""
return self._mask
@property
def segmentation(self):
"""
The segmentation associated to the histogram.
"""
return self._segmentation
@property
def phases(self):
'''
Tuple with segmentation phases.
'''
return tuple(self._phases)
@property
def bins(self):
"""
The histogram bins.
"""
return self._bins
@property
def bin_centers(self):
"""
The centers of the histogram bins.
"""
return (self._bins[1:].astype(float)+self._bins[:-1].astype(float))/2
@property
def min(self):
"""
The image minimum value.
"""
return self._min
@property
def max(self):
"""
The image maximum value.
"""
return self._max
@property
def count(self):
"""
Get the histogram count as a Pandas DataFrame.
Returns
-------
:class:`pandas.DataFrame`
A data frame containing the histogram values for the full image and each segmentation phase.
"""
return self._hist
@property
def pdf(self):
"""
Compute the Probability Density Function (PDF) from the calculated
histogram. PDF values are normalized such that the total area for the
bins (bin width times histogram height) equals 1.
Returns
-------
pdf : pandas.DataFrame
DataFrame containing the PDF values for the full image and for each
segmentation phase.
"""
h = self._hist.copy(deep=True)
x = self._bins.astype(float)
y = self._hist['full'].values.astype(float)
area = 0.
for k, yi in enumerate(y):
area += (x[k+1]-x[k])*yi
h /= area
return h
@property
def cdf(self):
"""
Compute the Cumulative Distribution Function (CDF) for the full image and
phase by phase as a pandas DataFrame.
Returns
-------
pandas.DataFrame
A DataFrame containing the CDF values for the full image and each
segmentation phase.
Examples
--------
>>> # Get the CDF DataFrame
>>> cdf = histogram.cdf
>>> # Access the CDF values for a specific segmentation phase
>>> phase_cdf = cdf[phase_id]
"""
x = self.bins.astype(float)
pdf = self.pdf
h = pdf*0.
h.loc[len(h)] = 0.
for k in range(1, len(h)):
h.loc[k] = h.loc[k-1] + pdf.loc[k-1]*(x[k]-x[k-1])
return h
[docs]
def percentile(self, q):
"""
Estimate the q-th percentiles by linear interpolation on
histogram CDF and bins.
Parameters
----------
q : float or array-like of floats
Percentage or sequence of percentages. Must obey 0<=q<=100.
Returns
-------
float or array-like of floats
Percentile values.
Examples
--------
>>> # Compute the 10th percentile
>>> p = histogram.percentile(10)
>>> # Compute quartiles (25th, 50th, and 75th percentiles)
>>> Q1, Q2, Q3 = histogram.percentile([25, 50, 75])
"""
x = self.cdf['full'].values.astype(float)
y = self.bins
if not (
(np.dtype(type(q)).kind in 'uif' and 0<=q<=100)
or
(hasattr(q, '__iter__') and hasattr(q, '__getitem__')
and all(np.dtype(type(k)).kind in 'uif' for k in q)
and all(0<=k<=100 for k in q)
)
):
collective_raise(ValueError(
'q must be a number of a iterable of numbers between 0 and 100.'))
return np.interp(np.array(q)/100, x, y)