Coverage for install/scipp/core/bin_remapping.py: 70%
76 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-11-17 01:51 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2024-11-17 01:51 +0000
1# SPDX-License-Identifier: BSD-3-Clause
2# Copyright (c) 2023 Scipp contributors (https://github.com/scipp)
3# @author Simon Heybrock
4import itertools
5import uuid
6from collections.abc import Sequence
7from math import prod
8from typing import TYPE_CHECKING, TypeVar
10from .._scipp import core as _cpp
11from ..typing import Dims
12from .concepts import concrete_dims, irreducible_mask, rewrap_reduced_data
13from .cpp_classes import DataArray, Variable
14from .cumulative import cumsum
15from .operations import where
16from .variable import index
18if TYPE_CHECKING:
19 from .bins import Bins
22def hide_masked(da: DataArray, dim: Dims) -> Variable:
23 if da.bins is None:
24 raise ValueError("Input must be binned")
25 if (mask := irreducible_mask(da, dim)) is not None:
26 # Avoid using boolean indexing since it would result in (partial) content
27 # buffer copy. Instead index just begin/end and reuse content buffer.
28 comps = da.bins.constituents
29 # If the mask is 1-D we can drop entire "rows" or "columns". This can
30 # drastically reduce the number of bins to handle in some cases for better
31 # performance. For 2-D or higher masks we fall back to making bins "empty" by
32 # setting end=begin.
33 if mask.ndim == 1:
34 select = ~mask
35 comps['begin'] = comps['begin'][select]
36 comps['end'] = comps['end'][select]
37 else:
38 comps['end'] = where(mask, comps['begin'], comps['end'])
39 return _cpp._bins_no_validate(**comps) # type: ignore[no-any-return]
40 else:
41 return da.data
44def _with_bin_sizes(var: Variable | DataArray, sizes: Variable) -> Variable:
45 end = cumsum(sizes)
46 begin = end - sizes
47 data = var if var.bins is None else var.bins.constituents['data']
48 dim = var.dim if var.bins is None else var.bins.constituents['dim']
49 return _cpp._bins_no_validate(data=data, dim=dim, begin=begin, end=end) # type: ignore[no-any-return]
52def _concat_bins(var: Variable, dim: Dims) -> Variable:
53 # To concat bins, two things need to happen:
54 # 1. Data needs to be written to a contiguous chunk.
55 # 2. New bin begin/end indices need to be setup.
56 # If the dims to concatenate are the *inner* dims a call to `copy()` performs 1.
57 # Otherwise, we first transpose and then `copy()`.
58 # For step 2. we simply sum the (transposed) input bin sizes over the concat dims,
59 # which `_with_bin_sizes` can use to compute new begin/end indices.
60 changed_dims = list(concrete_dims(var, dim))
61 unchanged_dims = [d for d in var.dims if d not in changed_dims]
62 # TODO It would be possible to support a copy=False parameter, to skip the copy if
63 # the copy would not result in any moving or reordering.
64 out = var.transpose(unchanged_dims + changed_dims).copy()
65 out_bins: Bins[Variable] = out.bins # type: ignore[assignment]
66 sizes = out_bins.size().sum(changed_dims)
67 return _with_bin_sizes(out, sizes)
70def _combine_bins(
71 var: Variable,
72 coords: dict[str, Variable],
73 edges: Sequence[Variable],
74 groups: Sequence[Variable],
75 dim: Dims,
76) -> Variable:
77 from .binning import make_binned
79 # Overview
80 # --------
81 # The purpose of this code is to combine existing bins, but in a more general
82 # manner than `concat`, which combines all bins along a dimension. Here we operate
83 # more like `groupby`, which combines selected subsets and creates a new output dim.
84 #
85 # Approach
86 # --------
87 # The algorithm works conceptually similar to `_concat_bins`, but with an additional
88 # step, calling `make_binned` for grouping within the erased dims. For the final
89 # output binning, instead of summing the input bin sizes over all erased dims, we
90 # sum only within the groups created by `make_binned`.
91 # Preserve subspace dim order of input data, instead of the one given by `dim`
92 concrete_dims_ = concrete_dims(var, dim)
93 changed_dims = [d for d in var.dims if d in concrete_dims_]
94 unchanged_dims = [d for d in var.dims if d not in changed_dims]
95 changed_shape = [var.sizes[d] for d in changed_dims]
96 unchanged_shape = [var.sizes[d] for d in unchanged_dims]
97 changed_volume = prod(changed_shape)
99 # Move modified dims to innermost. Below this enables us to keep other dims
100 # (listed in unchanged_dims) untouched by creating pseudo bins that wrap the entire
101 # changed subspaces. make_binned below will thus only operate within each pseudo
102 # bins, without mixing contents from different unchanged bins.
103 var = var.transpose(unchanged_dims + changed_dims)
104 var_bins: Bins[Variable] = var.bins # type: ignore[assignment]
105 params = DataArray(var_bins.size(), coords=coords)
106 params.coords['begin'] = var_bins.constituents['begin'].copy()
107 params.coords['end'] = var_bins.constituents['end'].copy()
109 # Sizes and begin/end indices of changed subspace
110 sub_sizes = index(changed_volume).broadcast(
111 dims=unchanged_dims, shape=unchanged_shape
112 )
113 params = params.flatten(to=uuid.uuid4().hex)
114 # Setup pseudo binning for unchanged subspace. All further reordering (for grouping
115 # and binning) will then occur *within* those pseudo bins (by splitting them).
116 params_data = _with_bin_sizes(params, sub_sizes)
117 # Apply desired binning/grouping to sizes and begin/end, splitting the pseudo bins.
118 params = make_binned(params_data, edges=edges, groups=groups)
120 # Setup view of source content with desired target bin order
121 source = _cpp._bins_no_validate(
122 data=var_bins.constituents['data'],
123 dim=var_bins.constituents['dim'],
124 begin=params.bins.constituents['data'].coords['begin'], # type: ignore[union-attr]
125 end=params.bins.constituents['data'].coords['end'], # type: ignore[union-attr]
126 )
127 # Call `copy()` to reorder data. This is based on the underlying behavior of `copy`
128 # for binned data: It computes a new contiguous and ordered mapping of bin contents
129 # to the content buffer. The main purpose of that mechanism is to deal, e.g., with
130 # copies of slices, but here we can leverage the same mechanism.
131 # Then we call `_with_bin_sizes` to put in place new indices, "merging" the
132 # reordered input bins to desired output bins.
133 return _with_bin_sizes(
134 source.copy(),
135 sizes=params.data.bins.sum(), # type: ignore[union-attr]
136 )
139def combine_bins(
140 da: DataArray, edges: Sequence[Variable], groups: Sequence[Variable], dim: Dims
141) -> DataArray:
142 if da.bins is None:
143 raise ValueError("Input must be binned")
144 masked = hide_masked(da, dim)
145 if len(edges) == 0 and len(groups) == 0:
146 data = _concat_bins(masked, dim=dim)
147 else:
148 names = [coord.dim for coord in itertools.chain(edges, groups)]
149 coords = {name: da.coords[name] for name in names}
150 data = _combine_bins(masked, coords=coords, edges=edges, groups=groups, dim=dim)
151 out = rewrap_reduced_data(da, data, dim=dim)
152 for coord in itertools.chain(edges, groups):
153 out.coords[coord.dim] = coord
154 return out
157_VarDa = TypeVar('_VarDa', Variable, DataArray)
160def concat_bins(obj: _VarDa, dim: Dims = None) -> _VarDa:
161 da = obj if isinstance(obj, DataArray) else DataArray(obj) # type: ignore[arg-type, redundant-expr]
162 out = combine_bins(da, edges=[], groups=[], dim=dim)
163 return out if isinstance(obj, DataArray) else out.data # type: ignore[redundant-expr]