Source code for ess.powder.correction

# SPDX-License-Identifier: BSD-3-Clause
# Copyright (c) 2023 Scipp contributors (https://github.com/scipp)
"""Correction algorithms for powder diffraction."""

import enum
from collections.abc import Mapping, Sequence
from typing import TypeVar

import sciline
import scipp as sc

import ess.reduce
from ess.reduce.uncertainty import broadcast_uncertainties

from ._util import event_or_outer_coord
from .types import (
    AccumulatedProtonCharge,
    CaveMonitor,
    CorrectedDspacing,
    EmptyCanRun,
    EmptyCanSubtractedIofDspacing,
    EmptyCanSubtractedIofDspacingTwoTheta,
    FocussedDataDspacing,
    FocussedDataDspacingTwoTheta,
    IntensityDspacing,
    IntensityDspacingTwoTheta,
    NormalizedDspacing,
    RunType,
    SampleRun,
    UncertaintyBroadcastMode,
    VanadiumRun,
    WavelengthMonitor,
)

_T = TypeVar("_T")


[docs] def normalize_by_monitor_histogram( detector: CorrectedDspacing[RunType], *, monitor: WavelengthMonitor[RunType, CaveMonitor], uncertainty_broadcast_mode: UncertaintyBroadcastMode, ) -> NormalizedDspacing[RunType]: """Normalize detector data by a histogrammed monitor. The detector is normalized according to .. math:: d_i^\\text{Norm} = \\frac{d_i}{m_i} \\Delta \\lambda_i Parameters ---------- detector: Input event data in wavelength. monitor: A histogrammed monitor in wavelength. uncertainty_broadcast_mode: Choose how uncertainties of the monitor are broadcast to the sample data. Returns ------- : `detector` normalized by a monitor. See also -------- ess.reduce.normalization.normalize_by_monitor_histogram: For details and the actual implementation. """ detector, skip_range_check = _mask_out_of_monitor_range_data(detector, monitor) return NormalizedDspacing[RunType]( ess.reduce.normalization.normalize_by_monitor_histogram( detector=detector, monitor=monitor, uncertainty_broadcast_mode=uncertainty_broadcast_mode, skip_range_check=skip_range_check, ) )
[docs] def normalize_by_monitor_integrated( detector: CorrectedDspacing[RunType], *, monitor: WavelengthMonitor[RunType, CaveMonitor], uncertainty_broadcast_mode: UncertaintyBroadcastMode, ) -> NormalizedDspacing[RunType]: """Normalize detector data by an integrated monitor. The detector is normalized according to .. math:: d_i^\\text{Norm} = \\frac{d_i}{\\sum_j\\, m_j} Note that this is not a true integral but only a sum over monitor events. Parameters ---------- detector: Input event data in wavelength. monitor: A histogrammed monitor in wavelength. uncertainty_broadcast_mode: Choose how uncertainties of the monitor are broadcast to the sample data. Returns ------- : `detector` normalized by a monitor. See also -------- ess.reduce.normalization.normalize_by_monitor_integrated: For details and the actual implementation. """ detector, skip_range_check = _mask_out_of_monitor_range_data(detector, monitor) return NormalizedDspacing[RunType]( ess.reduce.normalization.normalize_by_monitor_integrated( detector=detector, monitor=monitor, uncertainty_broadcast_mode=uncertainty_broadcast_mode, skip_range_check=skip_range_check, ) )
def _mask_out_of_monitor_range_data( detector: sc.DataArray, monitor: sc.DataArray ) -> tuple[sc.DataArray, bool]: if (coord := detector.coords.get("wavelength")) is not None: if detector.bins is None and "wavelength" not in coord.dims: # The detector was histogrammed early, and the wavelength was reconstructed # from d-spacing and 2theta, see focus_data_dspacing_and_two_theta. # This introduces unphysical wavelength bins that we need to mask. # # The detector wavelength coord is bin-edges in d-spacing, but we need # the mask to not be bin-edges, so compute the mask on the left and # right d-spacing edges and `or` them together. mon_coord = monitor.coords["wavelength"] mon_lo, mon_hi = mon_coord.min(), mon_coord.max() left, right = coord['dspacing', :-1], coord['dspacing', 1:] out_of_range = left < mon_lo out_of_range |= left > mon_hi out_of_range |= right < mon_lo out_of_range |= right > mon_hi if sc.any(out_of_range): return detector.assign_masks(out_of_wavelength_range=out_of_range), True return detector, False def _filter_keys(mapping: Mapping[str, _T], keys: Sequence[str]) -> dict[str, _T]: return {key: value for key, value in mapping.items() if key in keys} def _histogram_vanadium(vanadium: sc.DataArray, sample: sc.DataArray) -> sc.DataArray: target_coords = _filter_keys(sample.coords, ("dspacing", "two_theta")) if "two_theta" in target_coords and "two_theta" not in vanadium.bins.coords: # Need to provide a two_theta event coord so we can histogram. if sc.identical(vanadium.coords["two_theta"], target_coords["two_theta"]): # Skip the expensive event coord if possible: return vanadium.hist(dspacing=sample.coords["dspacing"]) return vanadium.bins.assign_coords( two_theta=sc.bins_like(vanadium, vanadium.coords["two_theta"]) ).hist(target_coords) return vanadium.hist(target_coords) def _normalize_by_vanadium( data: sc.DataArray, vanadium: sc.DataArray, uncertainty_broadcast_mode: UncertaintyBroadcastMode, ) -> sc.DataArray: norm = ( _histogram_vanadium(vanadium, sample=data) if vanadium.is_binned else vanadium.rebin(_filter_keys(data.coords, ("dspacing", "two_theta"))) ) norm = broadcast_uncertainties( norm, prototype=data, mode=uncertainty_broadcast_mode ) # Convert the unit such that the end-result has unit 'one' because the division # might otherwise produce a unit with a large scale if the proton charges in data # and vanadium were measured with different units. norm = norm.to(unit=data.unit, copy=False) normed = data / norm mask = norm.data == sc.scalar(0.0, unit=norm.unit) if mask.any(): normed.masks['zero_vanadium'] = mask return normed _RunTypeNoVanadium = TypeVar("_RunTypeNoVanadium", SampleRun, EmptyCanRun)
[docs] def normalize_by_vanadium_dspacing( data: FocussedDataDspacing[_RunTypeNoVanadium], vanadium: FocussedDataDspacing[VanadiumRun], uncertainty_broadcast_mode: UncertaintyBroadcastMode, ) -> IntensityDspacing[_RunTypeNoVanadium]: """Normalize sample data binned in d-spacing by a vanadium measurement. If the vanadium data is binned, it gets :func:`histogrammed <scipp.hist>` to the same bins as ``data``. If it is not binned, it gets :func:`rebinned <scipp.rebin>` to the same coordinates as ``data``. Then, the result is computed as .. code-block:: python data / vanadium And any bins where vanadium is zero are masked out with a mask called "zero_vanadium". Parameters ---------- data: Sample data. vanadium: Vanadium data. uncertainty_broadcast_mode: Choose how uncertainties of vanadium are broadcast to the sample data. Defaults to ``UncertaintyBroadcastMode.fail``. Returns ------- : ``data / vanadium``. May contain a mask "zero_vanadium" which is ``True`` for bins where vanadium is zero. See Also -------- normalize_by_vanadium_dspacing_and_two_theta: Normalization for 2d data binned in d-spacing and :math`2\\theta`. """ return IntensityDspacing( _normalize_by_vanadium(data, vanadium, uncertainty_broadcast_mode) )
[docs] def normalize_by_vanadium_dspacing_and_two_theta( data: FocussedDataDspacingTwoTheta[_RunTypeNoVanadium], vanadium: FocussedDataDspacingTwoTheta[VanadiumRun], uncertainty_broadcast_mode: UncertaintyBroadcastMode, ) -> IntensityDspacingTwoTheta[_RunTypeNoVanadium]: """Normalize sample data binned in (d-spacing, 2theta) by a vanadium measurement. If the vanadium data is binned, it gets :func:`histogrammed <scipp.hist>` to the same bins as ``data``. If it is not binned, it gets :func:`rebinned <scipp.rebin>` to the same coordinates as ``data``. Then, the result is computed as .. code-block:: python data / vanadium And any bins where vanadium is zero are masked out with a mask called "zero_vanadium". Parameters ---------- data: Sample data. vanadium: Vanadium data. uncertainty_broadcast_mode: Choose how uncertainties of vanadium are broadcast to the sample data. Defaults to ``UncertaintyBroadcastMode.fail``. Returns ------- : ``data / vanadium``. May contain a mask "zero_vanadium" which is ``True`` for bins where vanadium is zero. See Also -------- normalize_by_vanadium_dspacing: Normalization for 1d data binned in d-spacing. """ return IntensityDspacingTwoTheta( _normalize_by_vanadium(data, vanadium, uncertainty_broadcast_mode) )
[docs] def normalize_by_proton_charge( data: CorrectedDspacing[RunType], proton_charge: AccumulatedProtonCharge[RunType], ) -> NormalizedDspacing[RunType]: """Normalize data by an accumulated proton charge. Parameters ---------- data: Un-normalized data array as events or a histogram. proton_charge: Accumulated proton charge over the entire run. Returns ------- : ``data / proton_charge`` """ return NormalizedDspacing[RunType](data / proton_charge)
[docs] def merge_calibration(*, into: sc.DataArray, calibration: sc.Dataset) -> sc.DataArray: """ Return a scipp.DataArray containing calibration metadata as coordinates. Parameters ---------- into: Base data and metadata for the returned object. calibration: Calibration parameters. Returns ------- : Copy of `into` with additional coordinates and masks from `calibration`. See Also -------- ess.snspowder.powgen.calibration.load_calibration """ for name, coord in calibration.coords.items(): if not sc.identical(into.coords[name], coord): raise ValueError( f"Coordinate {name} of calibration and target dataset do not agree." ) out = into.copy(deep=False) for name in ("difa", "difc", "tzero"): if name in out.coords: raise ValueError( f"Cannot add calibration parameter '{name}' to data, " "there already is metadata with the same name." ) out.coords[name] = calibration[name].data if "calibration" in out.masks: raise ValueError( "Cannot add calibration mask 'calibration' tp data, " "there already is a mask with the same name." ) out.masks["calibration"] = calibration["mask"].data return out
[docs] def apply_lorentz_correction(da: sc.DataArray) -> sc.DataArray: """Perform a Lorentz correction for ToF powder diffraction data. This function uses this definition: .. math:: L = d^4 \\sin\\theta where :math:`d` is d-spacing, :math:`\\theta` is half the scattering angle (note the definitions in https://scipp.github.io/scippneutron/user-guide/coordinate-transformations.html). The Lorentz factor as defined here is suitable for correcting time-of-flight data expressed in wavelength or d-spacing. It follows the definition used by GSAS-II, see page 140 of https://subversion.xray.aps.anl.gov/EXPGUI/gsas/all/GSAS%20Manual.pdf Parameters ---------- da: Input data with coordinates ``two_theta`` and ``dspacing``. Returns ------- : ``da`` multiplied by :math:`L`. Has the same dtype as ``da``. """ dspacing = event_or_outer_coord(da, "dspacing") two_theta = event_or_outer_coord(da, "two_theta") sin_theta = sc.sin(0.5 * two_theta) d4 = dspacing**4 out = da * sin_theta.to(dtype=da.bins.dtype if da.bins else da.dtype, copy=False) out *= d4.to(dtype=da.bins.dtype if da.bins else da.dtype, copy=False) return out
[docs] def subtract_empty_can( data: IntensityDspacing[SampleRun], background: IntensityDspacing[EmptyCanRun], ) -> EmptyCanSubtractedIofDspacing[SampleRun]: return EmptyCanSubtractedIofDspacing(data.bins.concatenate(-background))
[docs] def subtract_empty_can_two_theta( data: IntensityDspacingTwoTheta[SampleRun], background: IntensityDspacingTwoTheta[EmptyCanRun], ) -> EmptyCanSubtractedIofDspacingTwoTheta[SampleRun]: return EmptyCanSubtractedIofDspacingTwoTheta(data.bins.concatenate(-background))
[docs] class RunNormalization(enum.Enum): """Type of normalization applied to each run.""" monitor_histogram = enum.auto() monitor_integrated = enum.auto() proton_charge = enum.auto()
[docs] def insert_run_normalization( workflow: sciline.Pipeline, run_norm: RunNormalization ) -> None: """Insert providers for a specific normalization into a workflow.""" match run_norm: case RunNormalization.monitor_histogram: workflow.insert(normalize_by_monitor_histogram) case RunNormalization.monitor_integrated: workflow.insert(normalize_by_monitor_integrated) case RunNormalization.proton_charge: workflow.insert(normalize_by_proton_charge)
providers = ( normalize_by_proton_charge, normalize_by_vanadium_dspacing, normalize_by_vanadium_dspacing_and_two_theta, subtract_empty_can, subtract_empty_can_two_theta, ) """Sciline providers for powder diffraction corrections."""