# SPDX-License-Identifier: BSD-3-Clause
# Copyright (c) 2023 Scipp contributors (https://github.com/scipp)
"""
Default parameters, providers and utility functions for the loki workflow.
"""
import sciline
import scipp as sc
from ess.reduce import nexus
from ess.sans import providers as sans_providers
from ..sans.common import gravity_vector
from ..sans.types import (
BeamCenter,
CalibratedDetector,
CorrectForGravity,
DetectorData,
DetectorEventData,
DetectorPixelShape,
DimsToKeep,
Incident,
LabFrameTransform,
MonitorEventData,
MonitorType,
NeXusDetector,
NeXusMonitor,
NeXusMonitorName,
NonBackgroundWavelengthRange,
PixelShapePath,
RawDetector,
RawMonitor,
RawMonitorData,
RawSample,
RawSource,
RunType,
SamplePosition,
ScatteringRunType,
SourcePosition,
TofData,
TofMonitor,
TransformationPath,
Transmission,
WavelengthBands,
WavelengthMask,
)
from .io import dummy_load_sample
[docs]
def default_parameters() -> dict:
return {
CorrectForGravity: False,
DimsToKeep: (),
NeXusMonitorName[Incident]: 'monitor_1',
NeXusMonitorName[Transmission]: 'monitor_2',
TransformationPath: 'transform',
PixelShapePath: 'pixel_shape',
NonBackgroundWavelengthRange: None,
WavelengthMask: None,
WavelengthBands: None,
}
[docs]
def LokiAtLarmorWorkflow() -> sciline.Pipeline:
"""
Workflow with default parameters for Loki test at Larmor.
This version of the Loki workflow:
- Uses ISIS XML files to define masks.
- Sets a dummy sample position [0,0,0] since files do not contain this information.
Returns
-------
:
Loki workflow as a sciline.Pipeline
"""
from ess.isissans.io import read_xml_detector_masking
from . import providers as loki_providers
params = default_parameters()
loki_providers = sans_providers + loki_providers
workflow = sciline.Pipeline(providers=loki_providers, params=params)
workflow.insert(read_xml_detector_masking)
# No sample information in the Loki@Larmor files, so we use a dummy sample provider
workflow.insert(dummy_load_sample)
return workflow
DETECTOR_BANK_RESHAPING = {
'larmor_detector': lambda x: x.fold(
dim='detector_number', sizes={'layer': 4, 'tube': 32, 'straw': 7, 'pixel': 512}
)
}
[docs]
def get_source_position(
raw_source: RawSource[RunType],
) -> SourcePosition[RunType]:
return SourcePosition[RunType](raw_source['position'])
[docs]
def get_sample_position(
raw_sample: RawSample[RunType],
) -> SamplePosition[RunType]:
return SamplePosition[RunType](raw_sample['position'])
[docs]
def get_detector_data(
detector: NeXusDetector[ScatteringRunType],
) -> RawDetector[ScatteringRunType]:
da = nexus.extract_detector_data(detector)
if (reshape := DETECTOR_BANK_RESHAPING.get(detector['detector_name'])) is not None:
da = reshape(da)
return RawDetector[ScatteringRunType](da)
[docs]
def calibrate_detector(
detector: RawDetector[ScatteringRunType],
beam_center: BeamCenter,
source_position: SourcePosition[ScatteringRunType],
sample_position: SamplePosition[ScatteringRunType],
) -> CalibratedDetector[ScatteringRunType]:
return CalibratedDetector[ScatteringRunType](
detector.assign_coords(
position=detector.coords['position'] - beam_center,
source_position=source_position,
sample_position=sample_position,
gravity=gravity_vector(),
)
)
[docs]
def get_monitor_data(
monitor: NeXusMonitor[RunType, MonitorType],
source_position: SourcePosition[RunType],
) -> RawMonitor[RunType, MonitorType]:
return RawMonitor[RunType, MonitorType](
nexus.extract_monitor_data(monitor).assign_coords(
position=monitor['position'], source_position=source_position
)
)
def _add_variances(da: sc.DataArray) -> sc.DataArray:
out = da.copy(deep=False)
if out.bins is not None:
content = out.bins.constituents['data']
if content.variances is None:
content.variances = content.values
return out
[docs]
def assemble_detector_data(
detector: CalibratedDetector[ScatteringRunType],
event_data: DetectorEventData[ScatteringRunType],
) -> DetectorData[ScatteringRunType]:
grouped = nexus.group_event_data(
event_data=event_data, detector_number=detector.coords['detector_number']
)
detector.data = grouped.data
return DetectorData[ScatteringRunType](_add_variances(da=detector))
[docs]
def assemble_monitor_data(
monitor_data: RawMonitor[RunType, MonitorType],
event_data: MonitorEventData[RunType, MonitorType],
) -> RawMonitorData[RunType, MonitorType]:
meta = monitor_data.drop_coords('event_time_zero')
da = event_data.assign_coords(meta.coords).assign_masks(meta.masks)
return RawMonitorData[RunType, MonitorType](_add_variances(da=da))
def _convert_to_tof(da: sc.DataArray) -> sc.DataArray:
da.bins.coords['tof'] = da.bins.coords.pop('event_time_offset')
if 'event_time_zero' in da.dims:
da = da.bins.concat('event_time_zero')
return da
[docs]
def data_to_tof(
da: DetectorData[ScatteringRunType],
) -> TofData[ScatteringRunType]:
return TofData[ScatteringRunType](_convert_to_tof(da))
[docs]
def monitor_to_tof(
da: RawMonitorData[RunType, MonitorType],
) -> TofMonitor[RunType, MonitorType]:
return TofMonitor[RunType, MonitorType](_convert_to_tof(da))
[docs]
def detector_pixel_shape(
detector: NeXusDetector[ScatteringRunType],
pixel_shape_path: PixelShapePath,
) -> DetectorPixelShape[ScatteringRunType]:
return DetectorPixelShape[ScatteringRunType](detector[pixel_shape_path])
providers = (
detector_pixel_shape,
detector_lab_frame_transform,
calibrate_detector,
get_detector_data,
get_monitor_data,
get_sample_position,
get_source_position,
assemble_detector_data,
assemble_monitor_data,
data_to_tof,
monitor_to_tof,
)