Source code for ess.amor.load

# SPDX-License-Identifier: BSD-3-Clause
# Copyright (c) 2023 Scipp contributors (https://github.com/scipp)
import scipp as sc

from ess.reduce import nexus

from ..reflectometry.load import load_nx
from ..reflectometry.types import (
    DetectorRotation,
    Filename,
    LoadedNeXusDetector,
    NeXusDetectorName,
    RawDetectorData,
    ReducibleDetectorData,
    RunType,
    SampleRotation,
)
from .geometry import Detector, pixel_coordinate_in_lab_frame
from .types import (
    Chopper1Position,
    Chopper2Position,
    ChopperFrequency,
    ChopperPhase,
    RawChopper,
)


[docs] def load_detector( file_path: Filename[RunType], detector_name: NeXusDetectorName[RunType] ) -> LoadedNeXusDetector[RunType]: return nexus.load_detector(file_path=file_path, detector_name=detector_name)
[docs] def load_events( detector: LoadedNeXusDetector[RunType], detector_rotation: DetectorRotation[RunType] ) -> RawDetectorData[RunType]: detector_numbers = sc.arange( "event_id", start=1, stop=(Detector.nBlades * Detector.nWires * Detector.nStripes).value + 1, unit=None, dtype="int32", ) data = ( nexus.extract_detector_data(detector) .bins.constituents["data"] .group(detector_numbers) .fold( "event_id", sizes={ "blade": Detector.nBlades, "wire": Detector.nWires, "stripe": Detector.nStripes, }, ) ) # Recent versions of scippnexus no longer add variances for events by default, so # we add them here if they are missing. if data.bins.constituents["data"].data.variances is None: data.bins.constituents["data"].data.variances = data.bins.constituents[ "data" ].data.values pixel_inds = sc.array(dims=data.dims, values=data.coords["event_id"].values - 1) position, angle_from_center_of_beam = pixel_coordinate_in_lab_frame( pixelID=pixel_inds, nu=detector_rotation ) data.coords["position"] = position.to(unit="m", copy=False) data.coords["angle_from_center_of_beam"] = angle_from_center_of_beam return RawDetectorData[RunType](data)
[docs] def compute_tof( data: RawDetectorData[RunType], phase: ChopperPhase[RunType], frequency: ChopperFrequency[RunType], ) -> ReducibleDetectorData[RunType]: data.bins.coords["tof"] = data.bins.coords.pop("event_time_offset").to( unit="ns", dtype="float64", copy=False ) tof_unit = data.bins.coords["tof"].bins.unit tau = sc.to_unit(1 / (2 * frequency), tof_unit) tof_offset = tau * phase / (180.0 * sc.units.deg) event_time_offset = data.bins.coords["tof"] minimum = -tof_offset frame_bound = tau - tof_offset maximum = 2 * tau - tof_offset offset = sc.where( (minimum < event_time_offset) & (event_time_offset < frame_bound), tof_offset, sc.where( (frame_bound < event_time_offset) & (event_time_offset < maximum), tof_offset - tau, 0.0 * tof_unit, ), ) data.bins.masks["outside_of_pulse"] = (minimum > event_time_offset) | ( event_time_offset > maximum ) data.bins.coords["tof"] += offset data.bins.coords["tof"] -= ( data.coords["angle_from_center_of_beam"].to(unit="deg") / (180.0 * sc.units.deg) ) * tau return ReducibleDetectorData[RunType](data)
[docs] def amor_chopper(f: Filename[RunType]) -> RawChopper[RunType]: return next(load_nx(f, "NXentry/NXinstrument/NXdisk_chopper"))
[docs] def load_amor_chopper_1_position(ch: RawChopper[RunType]) -> Chopper1Position[RunType]: # We know the value has unit 'mm' return sc.vector([0, 0, ch["distance"] - ch["pair_separation"] / 2], unit="mm").to( unit="m" )
[docs] def load_amor_chopper_2_position(ch: RawChopper[RunType]) -> Chopper2Position[RunType]: # We know the value has unit 'mm' return sc.vector([0, 0, ch["distance"] + ch["pair_separation"] / 2], unit="mm").to( unit="m" )
[docs] def load_amor_ch_phase(ch: RawChopper[RunType]) -> ChopperPhase[RunType]: p = ch["phase"]["value"].coords["average_value"].value if getattr(p, "unit", None): return p raise ValueError("No unit was found for the chopper phase")
[docs] def load_amor_ch_frequency(ch: RawChopper[RunType]) -> ChopperFrequency[RunType]: f = ch["rotation_speed"]["value"].coords["average_value"] if getattr(f, "unit", None): return f raise ValueError("No unit was found for the chopper frequency")
[docs] def load_amor_sample_rotation(fp: Filename[RunType]) -> SampleRotation[RunType]: (mu,) = load_nx(fp, "NXentry/NXinstrument/master_parameters/mu") # Jochens Amor code reads the first value of this log # see https://github.com/jochenstahn/amor/blob/140e3192ddb7e7f28acee87e2acaee65ce1332aa/libeos/file_reader.py#L272 # noqa: E501 # might have to change if this field ever becomes truly time-dependent return sc.scalar(mu['value'].data['dim_1', 0]['time', 0].value, unit='deg')
[docs] def load_amor_detector_rotation(fp: Filename[RunType]) -> DetectorRotation[RunType]: (nu,) = load_nx(fp, "NXentry/NXinstrument/master_parameters/nu") # Jochens Amor code reads the first value of this log # see https://github.com/jochenstahn/amor/blob/140e3192ddb7e7f28acee87e2acaee65ce1332aa/libeos/file_reader.py#L272 # noqa: E501 # might have to change if this field ever becomes truly time-dependent return sc.scalar(nu['value'].data['dim_1', 0]['time', 0].value, unit='deg')
providers = ( load_detector, load_events, compute_tof, load_amor_ch_frequency, load_amor_ch_phase, load_amor_chopper_1_position, load_amor_chopper_2_position, load_amor_sample_rotation, load_amor_detector_rotation, amor_chopper, )