Source code for ess.powder.correction

# SPDX-License-Identifier: BSD-3-Clause
# Copyright (c) 2023 Scipp contributors (https://github.com/scipp)
"""Correction algorithms for powder diffraction."""

import enum

import sciline
import scipp as sc

from ess.reduce.uncertainty import broadcast_uncertainties

from ._util import event_or_outer_coord
from .types import (
    AccumulatedProtonCharge,
    CaveMonitor,
    DataWithScatteringCoordinates,
    FocussedDataDspacing,
    FocussedDataDspacingTwoTheta,
    IofDspacing,
    IofDspacingTwoTheta,
    NormalizedRunData,
    RunType,
    SampleRun,
    UncertaintyBroadcastMode,
    VanadiumRun,
    WavelengthMonitor,
)


[docs] def normalize_by_monitor_histogram( detector: DataWithScatteringCoordinates[RunType], *, monitor: WavelengthMonitor[RunType, CaveMonitor], uncertainty_broadcast_mode: UncertaintyBroadcastMode, ) -> NormalizedRunData[RunType]: """Normalize detector data by a histogrammed monitor. Parameters ---------- detector: Input event data in wavelength. monitor: A histogrammed monitor in wavelength. uncertainty_broadcast_mode: Choose how uncertainties of the monitor are broadcast to the sample data. Returns ------- : `detector` normalized by a monitor. """ _expect_monitor_covers_range_of_detector( detector=detector, monitor=monitor, dim="wavelength" ) norm = broadcast_uncertainties( monitor, prototype=detector, mode=uncertainty_broadcast_mode ) return detector.bins / sc.lookup(norm, dim="wavelength")
[docs] def normalize_by_monitor_integrated( detector: DataWithScatteringCoordinates[RunType], *, monitor: WavelengthMonitor[RunType, CaveMonitor], uncertainty_broadcast_mode: UncertaintyBroadcastMode, ) -> NormalizedRunData[RunType]: """Normalize detector data by an integrated monitor. The monitor is integrated according to .. math:: M = \\sum_{i=0}^{N-1}\\, m_i (x_{i+1} - x_i) I(x_i, x_{i+1}), where :math:`m_i` is the monitor intensity in bin :math:`i`, :math:`x_i` is the lower bin edge of bin :math:`i`, and :math:`I(x_i, x_{i+1})` selects bins that are within the range of the detector. Parameters ---------- detector: Input event data in wavelength. monitor: A histogrammed monitor in wavelength. uncertainty_broadcast_mode: Choose how uncertainties of the monitor are broadcast to the sample data. Returns ------- : `detector` normalized by a monitor. """ dim = monitor.dim if not monitor.coords.is_edges(dim): raise sc.CoordError( f"Monitor coordinate '{dim}' must be bin-edges to integrate the monitor." ) _expect_monitor_covers_range_of_detector( detector=detector, monitor=monitor, dim=dim ) # Clip `monitor` to the range of `detector`, where the bins at the boundary # may extend past the detector range (how label-based indexing works). det_coord = ( detector.coords[dim] if dim in detector.coords else detector.bins.coords[dim] ) lo = det_coord.min() hi = det_coord.max() monitor = monitor[dim, lo:hi] # Strictly limit `monitor` to the range of `detector`. edges = sc.concat([lo, monitor.coords[dim][1:-1], hi], dim=dim) monitor = sc.rebin(monitor, {dim: edges}) coord = monitor.coords[dim] norm = sc.sum(monitor.data * (coord[1:] - coord[:-1])) norm = broadcast_uncertainties( norm, prototype=detector, mode=uncertainty_broadcast_mode ) return NormalizedRunData[RunType](detector / norm)
def _expect_monitor_covers_range_of_detector( *, detector: sc.DataArray, monitor: sc.DataArray, dim: str ) -> None: det_coord = ( detector.coords[dim] if detector.bins is None else detector.bins.coords[dim] ) if ( monitor.coords[dim].min() > det_coord.min() or monitor.coords[dim].max() < det_coord.max() ): raise ValueError( "Cannot normalize by monitor: The wavelength range of the monitor is " "smaller than the range of the detector." ) def _normalize_by_vanadium( data: sc.DataArray, vanadium: sc.DataArray, uncertainty_broadcast_mode: UncertaintyBroadcastMode, ) -> sc.DataArray: norm = vanadium.hist() norm = broadcast_uncertainties( norm, prototype=data, mode=uncertainty_broadcast_mode ) # Converting to unit 'one' because the division might produce a unit # with a large scale if the proton charges in data and vanadium were # measured with different units. return (data / norm).to(unit="one", copy=False)
[docs] def normalize_by_vanadium_dspacing( data: FocussedDataDspacing[SampleRun], vanadium: FocussedDataDspacing[VanadiumRun], uncertainty_broadcast_mode: UncertaintyBroadcastMode, ) -> IofDspacing: """ Normalize sample data by a vanadium measurement and return intensity vs d-spacing. Parameters ---------- data: Sample data. vanadium: Vanadium data. uncertainty_broadcast_mode: Choose how uncertainties of vanadium are broadcast to the sample data. Defaults to ``UncertaintyBroadcastMode.fail``. """ return IofDspacing( _normalize_by_vanadium(data, vanadium, uncertainty_broadcast_mode) )
[docs] def normalize_by_vanadium_dspacing_and_two_theta( data: FocussedDataDspacingTwoTheta[SampleRun], vanadium: FocussedDataDspacingTwoTheta[VanadiumRun], uncertainty_broadcast_mode: UncertaintyBroadcastMode, ) -> IofDspacingTwoTheta: """ Normalize sample data by a vanadium measurement and return intensity vs (d-spacing, 2theta). Parameters ---------- data: Sample data. vanadium: Vanadium data. uncertainty_broadcast_mode: Choose how uncertainties of vanadium are broadcast to the sample data. Defaults to ``UncertaintyBroadcastMode.fail``. """ return IofDspacingTwoTheta( _normalize_by_vanadium(data, vanadium, uncertainty_broadcast_mode) )
[docs] def normalize_by_proton_charge( data: DataWithScatteringCoordinates[RunType], proton_charge: AccumulatedProtonCharge[RunType], ) -> NormalizedRunData[RunType]: """Normalize data by an accumulated proton charge. Parameters ---------- data: Un-normalized data array as events or a histogram. proton_charge: Accumulated proton charge over the entire run. Returns ------- : ``data / proton_charge`` """ return NormalizedRunData[RunType](data / proton_charge)
[docs] def merge_calibration(*, into: sc.DataArray, calibration: sc.Dataset) -> sc.DataArray: """ Return a scipp.DataArray containing calibration metadata as coordinates. Parameters ---------- into: Base data and metadata for the returned object. calibration: Calibration parameters. Returns ------- : Copy of `into` with additional coordinates and masks from `calibration`. See Also -------- ess.snspowder.powgen.calibration.load_calibration """ for name, coord in calibration.coords.items(): if not sc.identical(into.coords[name], coord): raise ValueError( f"Coordinate {name} of calibration and target dataset do not agree." ) out = into.copy(deep=False) for name in ("difa", "difc", "tzero"): if name in out.coords: raise ValueError( f"Cannot add calibration parameter '{name}' to data, " "there already is metadata with the same name." ) out.coords[name] = calibration[name].data if "calibration" in out.masks: raise ValueError( "Cannot add calibration mask 'calibration' tp data, " "there already is a mask with the same name." ) out.masks["calibration"] = calibration["mask"].data return out
[docs] def apply_lorentz_correction(da: sc.DataArray) -> sc.DataArray: """Perform a Lorentz correction for ToF powder diffraction data. This function uses this definition: .. math:: L = d^4 \\sin\\theta where :math:`d` is d-spacing, :math:`\\theta` is half the scattering angle (note the definitions in https://scipp.github.io/scippneutron/user-guide/coordinate-transformations.html). The Lorentz factor as defined here is suitable for correcting time-of-flight data expressed in wavelength or d-spacing. It follows the definition used by GSAS-II, see page 140 of https://subversion.xray.aps.anl.gov/EXPGUI/gsas/all/GSAS%20Manual.pdf Parameters ---------- da: Input data with coordinates ``two_theta`` and ``dspacing``. Returns ------- : ``da`` multiplied by :math:`L`. Has the same dtype as ``da``. """ # The implementation is optimized under the assumption that two_theta # is small and dspacing and the data are large. out = _shallow_copy(da) dspacing = event_or_outer_coord(da, "dspacing") two_theta = event_or_outer_coord(da, "two_theta") theta = 0.5 * two_theta d4 = dspacing.broadcast(sizes=out.sizes) ** 4 if out.bins is None: out.data = d4.to(dtype=out.dtype, copy=False) out_data = out.data else: out.bins.data = d4.to(dtype=out.bins.dtype, copy=False) out_data = out.bins.data out_data *= sc.sin(theta, out=theta) out_data *= da.data if da.bins is None else da.bins.data return out
def _shallow_copy(da: sc.DataArray) -> sc.DataArray: # See https://github.com/scipp/scipp/issues/2773 out = da.copy(deep=False) if da.bins is not None: out.data = sc.bins(**da.bins.constituents) return out
[docs] class RunNormalization(enum.Enum): """Type of normalization applied to each run.""" monitor_histogram = enum.auto() monitor_integrated = enum.auto() proton_charge = enum.auto()
[docs] def insert_run_normalization( workflow: sciline.Pipeline, run_norm: RunNormalization ) -> None: """Insert providers for a specific normalization into a workflow.""" match run_norm: case RunNormalization.monitor_histogram: workflow.insert(normalize_by_monitor_histogram) case RunNormalization.monitor_integrated: workflow.insert(normalize_by_monitor_integrated) case RunNormalization.proton_charge: workflow.insert(normalize_by_proton_charge)
providers = ( normalize_by_proton_charge, normalize_by_vanadium_dspacing, normalize_by_vanadium_dspacing_and_two_theta, ) """Sciline providers for powder diffraction corrections."""