# SPDX-License-Identifier: BSD-3-Clause
# Copyright (c) 2024 Scipp contributors (https://github.com/scipp)
"""ORSO utilities for Amor."""
import numpy as np
import scipp as sc
from orsopy.fileio import base as orso_base
from orsopy.fileio import data_source as orso_data_source
from orsopy.fileio.orso import Column, Orso, OrsoDataset
from ..reflectometry.orso import (
OrsoDataSource,
OrsoInstrument,
OrsoIofQDataset,
OrsoReduction,
)
from ..reflectometry.types import QResolution, ReflectivityOverQ
[docs]
def build_orso_instrument(events: ReflectivityOverQ) -> OrsoInstrument:
"""Build ORSO instrument metadata from intermediate reduction results for Amor.
This assumes specular reflection and sets the incident angle equal to the computed
scattering angle.
"""
return OrsoInstrument(
orso_data_source.InstrumentSettings(
wavelength=orso_base.ValueRange(*_limits_of_coord(events, "wavelength")),
incident_angle=orso_base.ValueRange(*_limits_of_coord(events, "theta")),
polarization=None, # TODO how can we determine this from the inputs?
)
)
[docs]
def build_orso_iofq_dataset(
iofq: ReflectivityOverQ,
sigma_q: QResolution,
data_source: OrsoDataSource,
reduction: OrsoReduction,
) -> OrsoIofQDataset:
"""Build an ORSO dataset for reduced I-of-Q data and associated metadata."""
header = Orso(
data_source=data_source,
reduction=reduction,
columns=[
Column("Qz", "1/angstrom", "wavevector transfer"),
Column("R", None, "reflectivity"),
Column("sR", None, "standard deviation of reflectivity"),
Column(
"sQz",
"1/angstrom",
"standard deviation of wavevector transfer resolution",
),
],
)
iofq = iofq.hist()
qz = iofq.coords["Q"].to(unit="1/angstrom", copy=False)
if iofq.coords.is_edges("Q"):
qz = sc.midpoints(qz)
r = sc.values(iofq.data)
sr = sc.stddevs(iofq.data)
sqz = sigma_q.to(unit="1/angstrom", copy=False)
data = np.column_stack(tuple(map(_extract_values_array, (qz, r, sr, sqz))))
data = data[np.isfinite(data).all(axis=-1)]
return OrsoIofQDataset(OrsoDataset(header, data))
def _extract_values_array(var: sc.Variable) -> np.ndarray:
if var.variances is not None:
raise sc.VariancesError(
"ORT columns must not have variances. "
"Store the uncertainties as standard deviations in a separate column."
)
if var.ndim != 1:
raise sc.DimensionError(f"ORT columns must be one-dimensional, got {var.sizes}")
return var.values
def _limits_of_coord(data: sc.DataArray, name: str) -> tuple[float, float, str] | None:
if (coord := _get_coord(data, name)) is None:
return None
min_ = coord.min().value
max_ = coord.max().value
# Explicit conversions to float because orsopy does not like np.float* types.
return float(min_), float(max_), _ascii_unit(coord.unit)
def _get_coord(data: sc.DataArray, name: str) -> sc.Variable | None:
if name in data.coords:
return sc.DataArray(data=data.coords[name], masks=data.masks)
if (data.bins is not None) and (name in data.bins.coords):
# Note that .bins.concat() applies the top-level masks
events = data.bins.concat().value
return sc.DataArray(data=events.coords[name], masks=events.masks)
return None
def _ascii_unit(unit: sc.Unit) -> str:
unit = str(unit)
if unit == "Å":
return "angstrom"
return unit
providers = (build_orso_instrument, build_orso_iofq_dataset)