Source code for ess.powder.correction

# SPDX-License-Identifier: BSD-3-Clause
# Copyright (c) 2023 Scipp contributors (https://github.com/scipp)
"""Correction algorithms for powder diffraction."""

from typing import Any

import scipp as sc
from scippneutron.conversion.graph import beamline, tof

from ess.reduce.uncertainty import broadcast_uncertainties

from ._util import event_or_outer_coord
from .logging import get_logger
from .smoothing import lowpass
from .types import (
    AccumulatedProtonCharge,
    FilteredData,
    FocussedDataDspacing,
    FocussedDataDspacingTwoTheta,
    IofDspacing,
    IofDspacingTwoTheta,
    NormalizedByProtonCharge,
    RunType,
    SampleRun,
    UncertaintyBroadcastMode,
    VanadiumRun,
)


[docs] def normalize_by_monitor( data: sc.DataArray, *, monitor: sc.DataArray, wavelength_edges: sc.Variable | None = None, smooth_args: dict[str, Any] | None = None, ) -> sc.DataArray: """ Normalize event data by a monitor. The input is converted to wavelength if it does not already contain wavelengths. Parameters ---------- data: Input event data. monitor: A histogrammed monitor. wavelength_edges: If given, rebin the monitor with these edges. smooth_args: If given, the monitor histogram is smoothed with :func:`ess.powder.lowpass` before dividing into `data`. `smooth_args` is passed as keyword arguments to :func:`ess.powder.lowpass`. If ``None``, the monitor is not smoothed. Returns ------- : `data` normalized by a monitor. """ if "wavelength" not in monitor.coords: monitor = monitor.transform_coords( "wavelength", graph={**beamline.beamline(scatter=False), **tof.elastic("tof")}, keep_inputs=False, keep_intermediate=False, keep_aliases=False, ) if wavelength_edges is not None: monitor = monitor.rebin(wavelength=wavelength_edges) if smooth_args is not None: get_logger().info( "Smoothing monitor for normalization using " "ess.powder.smoothing.lowpass with %s.", smooth_args, ) monitor = lowpass(monitor, dim="wavelength", **smooth_args) return data.bins / sc.lookup(func=monitor, dim="wavelength")
def _normalize_by_vanadium( data: sc.DataArray, vanadium: sc.DataArray, uncertainty_broadcast_mode: UncertaintyBroadcastMode, ) -> sc.DataArray: norm = vanadium.hist() norm = broadcast_uncertainties( norm, prototype=data, mode=uncertainty_broadcast_mode ) # Converting to unit 'one' because the division might produce a unit # with a large scale if the proton charges in data and vanadium were # measured with different units. return (data / norm).to(unit="one", copy=False)
[docs] def normalize_by_vanadium_dspacing( data: FocussedDataDspacing[SampleRun], vanadium: FocussedDataDspacing[VanadiumRun], uncertainty_broadcast_mode: UncertaintyBroadcastMode, ) -> IofDspacing: """ Normalize sample data by a vanadium measurement and return intensity vs d-spacing. Parameters ---------- data: Sample data. vanadium: Vanadium data. uncertainty_broadcast_mode: Choose how uncertainties of vanadium are broadcast to the sample data. Defaults to ``UncertaintyBroadcastMode.fail``. """ return IofDspacing( _normalize_by_vanadium(data, vanadium, uncertainty_broadcast_mode) )
[docs] def normalize_by_vanadium_dspacing_and_two_theta( data: FocussedDataDspacingTwoTheta[SampleRun], vanadium: FocussedDataDspacingTwoTheta[VanadiumRun], uncertainty_broadcast_mode: UncertaintyBroadcastMode, ) -> IofDspacingTwoTheta: """ Normalize sample data by a vanadium measurement and return intensity vs (d-spacing, 2theta). Parameters ---------- data: Sample data. vanadium: Vanadium data. uncertainty_broadcast_mode: Choose how uncertainties of vanadium are broadcast to the sample data. Defaults to ``UncertaintyBroadcastMode.fail``. """ return IofDspacingTwoTheta( _normalize_by_vanadium(data, vanadium, uncertainty_broadcast_mode) )
[docs] def normalize_by_proton_charge( data: FilteredData[RunType], proton_charge: AccumulatedProtonCharge[RunType] ) -> NormalizedByProtonCharge[RunType]: """Normalize data by an accumulated proton charge. Parameters ---------- data: Un-normalized data array as events or a histogram. proton_charge: Accumulated proton charge over the entire run. Returns ------- : ``data / proton_charge`` """ return NormalizedByProtonCharge[RunType](data / proton_charge)
[docs] def merge_calibration(*, into: sc.DataArray, calibration: sc.Dataset) -> sc.DataArray: """ Return a scipp.DataArray containing calibration metadata as coordinates. Parameters ---------- into: Base data and metadata for the returned object. calibration: Calibration parameters. Returns ------- : Copy of `into` with additional coordinates and masks from `calibration`. See Also -------- ess.snspowder.powgen.calibration.load_calibration """ for name, coord in calibration.coords.items(): if not sc.identical(into.coords[name], coord): raise ValueError( f"Coordinate {name} of calibration and target dataset do not agree." ) out = into.copy(deep=False) for name in ("difa", "difc", "tzero"): if name in out.coords: raise ValueError( f"Cannot add calibration parameter '{name}' to data, " "there already is metadata with the same name." ) out.coords[name] = calibration[name].data if "calibration" in out.masks: raise ValueError( "Cannot add calibration mask 'calibration' tp data, " "there already is a mask with the same name." ) out.masks["calibration"] = calibration["mask"].data return out
[docs] def apply_lorentz_correction(da: sc.DataArray) -> sc.DataArray: """Perform a Lorentz correction for ToF powder diffraction data. This function uses this definition: .. math:: L = d^4 \\sin\\theta where :math:`d` is d-spacing, :math:`\\theta` is half the scattering angle (note the definitions in https://scipp.github.io/scippneutron/user-guide/coordinate-transformations.html). The Lorentz factor as defined here is suitable for correcting time-of-flight data expressed in wavelength or d-spacing. It follows the definition used by GSAS-II, see page 140 of https://subversion.xray.aps.anl.gov/EXPGUI/gsas/all/GSAS%20Manual.pdf Parameters ---------- da: Input data with coordinates ``two_theta`` and ``dspacing``. Returns ------- : ``da`` multiplied by :math:`L`. Has the same dtype as ``da``. """ # The implementation is optimized under the assumption that two_theta # is small and dspacing and the data are large. out = _shallow_copy(da) dspacing = event_or_outer_coord(da, "dspacing") two_theta = event_or_outer_coord(da, "two_theta") theta = 0.5 * two_theta d4 = dspacing.broadcast(sizes=out.sizes) ** 4 if out.bins is None: out.data = d4.to(dtype=out.dtype, copy=False) out_data = out.data else: out.bins.data = d4.to(dtype=out.bins.dtype, copy=False) out_data = out.bins.data out_data *= sc.sin(theta, out=theta) out_data *= da.data if da.bins is None else da.bins.data return out
def _shallow_copy(da: sc.DataArray) -> sc.DataArray: # See https://github.com/scipp/scipp/issues/2773 out = da.copy(deep=False) if da.bins is not None: out.data = sc.bins(**da.bins.constituents) return out providers = ( normalize_by_proton_charge, normalize_by_vanadium_dspacing, normalize_by_vanadium_dspacing_and_two_theta, ) """Sciline providers for powder diffraction corrections."""