# SPDX-License-Identifier: BSD-3-Clause
# Copyright (c) 2023 Scipp contributors (https://github.com/scipp)
import numpy as np
import sciline
import scipp as sc
import scippnexus as snx
from ess.powder.types import (
CalibratedDetector,
CalibrationData,
CalibrationFilename,
DetectorData,
Filename,
NeXusComponent,
NeXusDetectorName,
Position,
RunType,
)
from ess.reduce.nexus.types import CalibratedBeamline
from ess.reduce.nexus.workflow import GenericNeXusWorkflow
MANTLE_DETECTOR_ID = sc.index(7)
HIGH_RES_DETECTOR_ID = sc.index(8)
SANS_DETECTOR_ID = sc.index(9)
ENDCAPS_DETECTOR_IDS = tuple(map(sc.index, (3, 4, 5, 6)))
[docs]
class AllRawDetectors(sciline.Scope[RunType, sc.DataGroup], sc.DataGroup):
"""Raw data for all detectors."""
[docs]
def load_geant4_csv(file_path: Filename[RunType]) -> AllRawDetectors[RunType]:
"""Load a GEANT4 CSV file for DREAM.
Parameters
----------
file_path:
Indicates where to load data from.
One of:
- URL of a CSV or zipped CSV file.
- Path to a CSV or zipped CSV file on disk.
- File handle or buffer for reading text or binary data.
Returns
-------
:
A :class:`scipp.DataGroup` containing the loaded events.
"""
events = _load_raw_events(file_path)
detectors = _split_detectors(events)
for det in detectors.values():
_adjust_coords(det)
detectors = _group(detectors)
return AllRawDetectors[RunType](
sc.DataGroup({"instrument": sc.DataGroup(detectors)})
)
[docs]
def get_calibrated_geant4_detector(
detector: NeXusComponent[snx.NXdetector, RunType],
) -> CalibratedDetector[RunType]:
"""
Replacement for :py:func:`ess.reduce.nexus.workflow.get_calibrated_detector`.
Since the Geant4 detectors already have computed positions as well as logical shape,
this just extracts the relevant event data.
"""
return detector['events'].copy(deep=False)
def _load_raw_events(file_path: str) -> sc.DataArray:
table = sc.io.load_csv(
file_path, sep="\t", header_parser="bracket", data_columns=[]
)
table.coords['sumo'] = table.coords['det ID']
table.coords.pop("lambda", None)
table = table.rename_dims(row="event")
return sc.DataArray(
sc.ones(sizes=table.sizes, with_variances=True, unit="counts"),
coords=table.coords,
)
def _adjust_coords(da: sc.DataArray) -> None:
da.coords["position"] = sc.spatial.as_vectors(
da.coords.pop("x_pos"), da.coords.pop("y_pos"), da.coords.pop("z_pos")
)
def _group(detectors: dict[str, sc.DataArray]) -> dict[str, sc.DataGroup]:
elements = ("module", "segment", "counter", "wire", "strip")
def group(key: str, da: sc.DataArray) -> sc.DataArray:
if key in ["high_resolution", "sans"]:
# Only the HR and SANS detectors have sectors.
res = da.group("sector", *elements)
elif key in ["endcap_backward", "endcap_forward"]:
# Other banks only have a single SUMO.
res = da.group("sumo", *elements)
else:
res = da.group(*elements)
res.coords['position'] = res.bins.coords.pop('position').bins.mean()
res.bins.coords.pop("sector", None)
res.bins.coords.pop("sumo", None)
return res
return {key: sc.DataGroup(events=group(key, da)) for key, da in detectors.items()}
def _split_detectors(
data: sc.DataArray, detector_id_name: str = "det ID"
) -> dict[str, sc.DataArray]:
groups = data.group(
sc.concat(
[
MANTLE_DETECTOR_ID,
HIGH_RES_DETECTOR_ID,
SANS_DETECTOR_ID,
*ENDCAPS_DETECTOR_IDS,
],
dim=detector_id_name,
)
)
detectors = {}
if (
mantle := _extract_detector(groups, detector_id_name, MANTLE_DETECTOR_ID)
) is not None:
detectors["mantle"] = mantle.copy()
if (
high_res := _extract_detector(groups, detector_id_name, HIGH_RES_DETECTOR_ID)
) is not None:
detectors["high_resolution"] = high_res.copy()
if (
sans := _extract_detector(groups, detector_id_name, SANS_DETECTOR_ID)
) is not None:
detectors["sans"] = sans.copy()
endcaps_list = [
det
for i in ENDCAPS_DETECTOR_IDS
if (det := _extract_detector(groups, detector_id_name, i)) is not None
]
if endcaps_list:
endcaps = sc.concat(endcaps_list, data.dim)
endcaps = endcaps.bin(
z_pos=sc.array(
dims=["z_pos"],
values=[-np.inf, 0.0, np.inf],
unit=endcaps.coords["z_pos"].unit,
)
)
detectors["endcap_backward"] = endcaps[0].bins.concat().value.copy()
detectors["endcap_forward"] = endcaps[1].bins.concat().value.copy()
return detectors
def _extract_detector(
detector_groups: sc.DataArray, detector_id_name: str, detector_id: sc.Variable
) -> sc.DataArray | None:
events = detector_groups[detector_id_name, detector_id].value
if len(events) == 0:
return None
return events
[docs]
def geant4_load_calibration(filename: CalibrationFilename) -> CalibrationData:
if filename is not None:
# Needed to build a complete pipeline.
raise NotImplementedError(
"Calibration data loading is not implemented for DREAM GEANT4 data."
)
return CalibrationData(None)
[docs]
def dummy_assemble_detector_data(
detector: CalibratedBeamline[RunType],
) -> DetectorData[RunType]:
"""Dummy assembly of detector data, detector already contains neutron data."""
return DetectorData[RunType](detector)
[docs]
def dummy_source_position() -> Position[snx.NXsource, RunType]:
return Position[snx.NXsource, RunType](
sc.vector([np.nan, np.nan, np.nan], unit="mm")
)
[docs]
def dummy_sample_position() -> Position[snx.NXsample, RunType]:
return Position[snx.NXsample, RunType](
sc.vector([np.nan, np.nan, np.nan], unit="mm")
)
[docs]
def LoadGeant4Workflow() -> sciline.Pipeline:
"""
Workflow for loading NeXus data.
"""
wf = GenericNeXusWorkflow()
wf.insert(extract_geant4_detector)
wf.insert(load_geant4_csv)
wf.insert(geant4_load_calibration)
wf.insert(get_calibrated_geant4_detector)
wf.insert(dummy_assemble_detector_data)
wf.insert(dummy_source_position)
wf.insert(dummy_sample_position)
return wf