# SPDX-License-Identifier: BSD-3-Clause
# Copyright (c) 2023 Scipp contributors (https://github.com/scipp)
import scipp as sc
from scipp.scipy.interpolate import interp1d
from ess.reduce.uncertainty import UncertaintyBroadcastMode, broadcast_uncertainties
from .common import mask_range
from .logging import get_logger
from .types import (
BackgroundRun,
BackgroundSubtractedIofQ,
BackgroundSubtractedIofQxy,
CleanDirectBeam,
CleanMonitor,
CleanQ,
CleanQxy,
CleanSummedQ,
CleanSummedQxy,
DimsToKeep,
DirectBeam,
IofQ,
IofQPart,
IofQxy,
MonitorType,
NonBackgroundWavelengthRange,
QBins,
QxBins,
QyBins,
ReturnEvents,
RunType,
SampleRun,
ScatteringRunType,
WavelengthBins,
WavelengthMonitor,
)
[docs]
def preprocess_monitor_data(
monitor: WavelengthMonitor[RunType, MonitorType],
wavelength_bins: WavelengthBins,
non_background_range: NonBackgroundWavelengthRange,
uncertainties: UncertaintyBroadcastMode,
) -> CleanMonitor[RunType, MonitorType]:
"""
Prepare monitor data for computing the transmission fraction.
The input data are first converted to wavelength (if needed).
If a ``non_background_range`` is provided, it defines the region where data is
considered not to be background, and regions outside are background. A mean
background level will be computed from the background and will be subtracted from
the non-background counts.
Finally, if wavelength bins are provided, the data is rebinned to match the
requested binning.
Parameters
----------
monitor:
The monitor to be pre-processed.
wavelength_bins:
The binning in wavelength to use for the rebinning.
non_background_range:
The range of wavelengths that defines the data which does not constitute
background. Everything outside this range is treated as background counts.
uncertainties:
The mode for broadcasting uncertainties. See
:py:class:`ess.reduce.uncertainty.UncertaintyBroadcastMode` for details.
Returns
-------
:
The input monitors converted to wavelength, cleaned of background counts, and
rebinned to the requested wavelength binning.
"""
background = None
if non_background_range is not None:
mask = sc.DataArray(
data=sc.array(dims=[non_background_range.dim], values=[True]),
coords={non_background_range.dim: non_background_range},
)
background = mask_range(monitor, mask=mask).mean()
if monitor.bins is not None:
monitor = monitor.hist(wavelength=wavelength_bins)
else:
monitor = monitor.rebin(wavelength=wavelength_bins)
if background is not None:
monitor -= broadcast_uncertainties(
background, prototype=monitor, mode=uncertainties
)
return CleanMonitor(monitor)
[docs]
def resample_direct_beam(
direct_beam: DirectBeam, wavelength_bins: WavelengthBins
) -> CleanDirectBeam:
"""
If the wavelength binning of the direct beam function does not match the requested
``wavelength_bins``, perform a 1d interpolation of the function onto the bins.
Parameters
----------
direct_beam:
The DataArray containing the direct beam function (it should have a dimension
of wavelength).
wavelength_bins:
The binning in wavelength that the direct beam function should be resampled to.
Returns
-------
:
The direct beam function resampled to the requested resolution.
"""
if direct_beam is None:
return CleanDirectBeam(
sc.DataArray(
sc.ones(dims=wavelength_bins.dims, shape=[len(wavelength_bins) - 1]),
coords={'wavelength': wavelength_bins},
)
)
if sc.identical(direct_beam.coords['wavelength'], wavelength_bins):
return direct_beam
if direct_beam.variances is not None:
logger = get_logger('sans')
logger.warning(
'An interpolation is being performed on the direct_beam function. '
'The variances in the direct_beam function will be dropped.'
)
func = interp1d(
sc.values(direct_beam),
'wavelength',
fill_value="extrapolate",
bounds_error=False,
)
return CleanDirectBeam(func(wavelength_bins, midpoints=True))
[docs]
def bin_in_q(
data: CleanQ[ScatteringRunType, IofQPart],
q_bins: QBins,
dims_to_keep: DimsToKeep,
) -> CleanSummedQ[ScatteringRunType, IofQPart]:
"""
Merges data from all pixels into a single I(Q) spectrum:
* In the case of event data, events in all bins are concatenated
* In the case of dense data, counts in all spectra are summed
Parameters
----------
data:
A DataArray containing the data that is to be converted to Q.
q_bins:
The binning in Q to be used.
dims_to_keep:
Dimensions that should not be reduced and thus still be present in the final
I(Q) result (this is typically the layer dimension).
Returns
-------
:
The input data converted to Q and then summed over all detector pixels.
"""
out = _bin_in_q(data=data, edges={'Q': q_bins}, dims_to_keep=dims_to_keep)
return CleanSummedQ[ScatteringRunType, IofQPart](out)
[docs]
def bin_in_qxy(
data: CleanQxy[ScatteringRunType, IofQPart],
qx_bins: QxBins,
qy_bins: QyBins,
dims_to_keep: DimsToKeep,
) -> CleanSummedQxy[ScatteringRunType, IofQPart]:
"""
Merges data from all pixels into a single I(Q) spectrum:
* In the case of event data, events in all bins are concatenated
* In the case of dense data, counts in all spectra are summed
Parameters
----------
data:
A DataArray containing the data that is to be converted to Q.
qx_bins:
The binning in Qx to be used.
qy_bins:
The binning in Qy to be used.
dims_to_keep:
Dimensions that should not be reduced and thus still be present in the final
I(Q) result (this is typically the layer dimension).
Returns
-------
:
The input data converted to Qx and Qy and then summed over all detector pixels.
"""
# We make Qx the inner dim, such that plots naturally show Qx on the x-axis.
out = _bin_in_q(
data=data,
edges={'Qy': qy_bins, 'Qx': qx_bins},
dims_to_keep=dims_to_keep,
)
return CleanSummedQxy[ScatteringRunType, IofQPart](out)
def _bin_in_q(
data: sc.DataArray, edges: dict[str, sc.Variable], dims_to_keep: tuple[str, ...]
) -> sc.DataArray:
dims_to_reduce = set(data.dims) - {'wavelength'} - set(dims_to_keep or ())
return (data.hist if data.bins is None else data.bin)(**edges, dim=dims_to_reduce)
def _subtract_background(
sample: sc.DataArray,
background: sc.DataArray,
return_events: ReturnEvents,
) -> sc.DataArray:
if return_events and sample.bins is not None and background.bins is not None:
return sample.bins.concatenate(-background)
if sample.bins is not None:
sample = sample.bins.sum()
if background.bins is not None:
background = background.bins.sum()
return sample - background
[docs]
def subtract_background(
sample: IofQ[SampleRun],
background: IofQ[BackgroundRun],
return_events: ReturnEvents,
) -> BackgroundSubtractedIofQ:
return BackgroundSubtractedIofQ(
_subtract_background(
sample=sample, background=background, return_events=return_events
)
)
[docs]
def subtract_background_xy(
sample: IofQxy[SampleRun],
background: IofQxy[BackgroundRun],
return_events: ReturnEvents,
) -> BackgroundSubtractedIofQxy:
return BackgroundSubtractedIofQxy(
_subtract_background(
sample=sample, background=background, return_events=return_events
)
)
providers = (
preprocess_monitor_data,
resample_direct_beam,
bin_in_q,
bin_in_qxy,
subtract_background,
subtract_background_xy,
)