Source code for ess.reduce.time_of_flight.resample
# SPDX-License-Identifier: BSD-3-Clause
# Copyright (c) 2025 Scipp contributors (https://github.com/scipp)
import numpy as np
import scipp as sc
[docs]
def find_strictly_increasing_sections(var: sc.Variable) -> list[slice]:
"""
Find strictly increasing sections in a coordinate dimension (minimum length 2).
Parameters
----------
var:
The variable to analyze, which should be one-dimensional.
Returns
-------
sections:
Slice objects that can be used extract strictly increasing sections.
"""
values = var.values
finite = np.isfinite(values)
increasing = (np.sign(np.diff(values)) > 0) & finite[:-1] & finite[1:]
# 1 marks the start of an increasing section, -1 marks the end
transitions = np.diff(np.concatenate(([False], increasing, [False])).astype(int))
section_starts = np.where(transitions == 1)[0]
section_ends = np.where(transitions == -1)[0] + np.array(1)
return [
slice(start, end)
for start, end in zip(section_starts, section_ends, strict=True)
if end - start >= 2 # Ensure section has at least 2 points
]
[docs]
def get_min_max(
var: sc.Variable, *, dim: str, slices: list[slice]
) -> tuple[sc.Variable, sc.Variable]:
if not slices:
raise ValueError("No strictly increasing sections found.")
combined = sc.concat([var[dim, slice] for slice in slices], dim)
return combined.min(), combined.max()
[docs]
def make_regular_grid(
var: sc.Variable, *, dim: str, slices: list[slice]
) -> sc.Variable:
"""
Create a regular grid variable based on the min and max of the slices.
The grid is constructed such that it includes the minimum and maximum values
of the strictly increasing sections, with a step size equal to the difference
between the first two values of the section with the minimum start value (which is
not necessarily the first section).
"""
min_val, max_val = get_min_max(var, dim=dim, slices=slices)
first: sc.Variable | None = None
for s in slices:
first = var[dim, s]
if sc.identical(first[0], min_val):
break
if first is None:
# This should not happen if slices are correctly identified and passed from
# find_strictly_increasing_sections.
raise ValueError("Section is not strictly increasing.")
step = first[1] - first[0]
return sc.arange(
dim=dim,
start=min_val.value,
stop=max_val.value + step.value, # Ensure the last bin edge is included
step=step.value,
unit=step.unit,
dtype=step.dtype,
)
[docs]
def rebin_strictly_increasing(da: sc.DataArray, dim: str) -> sc.DataArray:
"""
Find strictly monotonic sections in a coordinate dimension and rebin the data array
into a regular grid based on these sections.
"""
# Ensure the dimension is named like the coordinate.
da = da.rename_dims({da.coords[dim].dim: dim})
slices = find_strictly_increasing_sections(da.coords[dim])
if len(slices) == 1:
return da[dim, slices[0]]
if not slices:
raise ValueError("No strictly increasing sections found.")
if da.coords[dim].dtype not in (sc.DType.float64, sc.DType.float32):
# rebin does not like integer coords.
da = da.assign_coords({dim: da.coords[dim].to(dtype='float64')})
# Slices refer to the indices in the coord, which are bin edges. For slicing data
# we need to stop at the last index minus one.
sections = [da[dim, section.start : section.stop - 1] for section in slices]
edges = make_regular_grid(da.coords[dim], dim=dim, slices=slices)
return sc.reduce([sc.rebin(section, {dim: edges}) for section in sections]).sum()