Source code for ess.powder.masking
# SPDX-License-Identifier: BSD-3-Clause
# Copyright (c) 2024 Scipp contributors (https://github.com/scipp)
"""
Masking functions for the powder workflow.
"""
from collections.abc import Iterable
import numpy as np
import sciline
import scipp as sc
from .types import (
DataWithScatteringCoordinates,
MaskedData,
MaskedDetectorIDs,
PixelMaskFilename,
RunType,
TofMask,
TwoThetaMask,
WavelengthMask,
)
def read_pixel_masks(filename: PixelMaskFilename) -> MaskedDetectorIDs:
"""Read a pixel mask from a Scipp hdf5 file.
Parameters
----------
filename:
Path to the hdf5 file.
"""
return MaskedDetectorIDs({filename: sc.io.load_hdf5(filename)})
def apply_masks(
data: DataWithScatteringCoordinates[RunType],
masked_pixel_ids: MaskedDetectorIDs,
tof_mask_func: TofMask,
wavelength_mask_func: WavelengthMask,
two_theta_mask_func: TwoThetaMask,
) -> MaskedData[RunType]:
""" """
out = data.copy(deep=False)
if len(masked_pixel_ids) > 0:
key = (
set(out.coords.keys()) & {"detector_number", "detector_id", "spectrum"}
).pop()
ids = out.coords[key]
for name, masked in masked_pixel_ids.items():
mask = sc.zeros(sizes=ids.sizes, dtype="bool")
mask.values[np.isin(ids.values, masked.values)] = True
out.masks[name] = mask
for dim, mask in {
"tof": tof_mask_func,
"wavelength": wavelength_mask_func,
"two_theta": two_theta_mask_func,
}.items():
if mask is not None:
if dim in out.bins.coords:
out.bins.masks[dim] = mask(out.bins.coords[dim])
else:
out.masks[dim] = mask(out.coords[dim])
return MaskedData[RunType](out)
providers = (read_pixel_masks, apply_masks)
def _merge(*dicts: dict) -> dict:
return {key: value for d in dicts for key, value in d.items()}
[docs]
def with_pixel_mask_filenames(
workflow: sciline.Pipeline, masks: Iterable[str]
) -> sciline.Pipeline:
"""
Return modified workflow with pixel mask filenames set.
Parameters
----------
workflow:
Workflow to modify.
masks:
List or tuple of pixel mask filenames to set.
"""
workflow = workflow.copy()
# Workaround bug in Cyclebane, which does not allow empty maps
if len(masks) == 0:
workflow[MaskedDetectorIDs] = MaskedDetectorIDs({})
return workflow
workflow[MaskedDetectorIDs] = (
workflow[MaskedDetectorIDs].map({PixelMaskFilename: masks}).reduce(func=_merge)
)
return workflow