# SPDX-License-Identifier: BSD-3-Clause
# Copyright (c) 2024 Scipp contributors (https://github.com/scipp)
from collections.abc import Hashable, Iterable
import pandas as pd
import sciline
import scipp as sc
from ess.reduce.nexus.workflow import GenericNeXusWorkflow
from ess.reduce.parameter import parameter_mappers
from . import common, conversions, i_of_q, masking, normalization
from .types import (
BackgroundRun,
CleanSummedQ,
CorrectForGravity,
Denominator,
DetectorBankSizes,
DetectorMasks,
DimsToKeep,
EmptyBeamRun,
Filename,
Incident,
NeXusDetectorName,
Numerator,
PixelMaskFilename,
SampleRun,
TransformationPath,
Transmission,
TransmissionRun,
WavelengthBands,
WavelengthMask,
)
def _merge(*dicts: dict) -> dict:
return {key: value for d in dicts for key, value in d.items()}
def merge_contributions(*data: sc.DataArray) -> sc.DataArray:
if len(data) == 1:
return data[0]
reducer = sc.reduce(data)
return reducer.bins.concat() if data[0].bins is not None else reducer.sum()
[docs]
def with_pixel_mask_filenames(
workflow: sciline.Pipeline, masks: Iterable[str]
) -> sciline.Pipeline:
"""
Return modified workflow with pixel mask filenames set.
Parameters
----------
workflow:
Workflow to modify.
masks:
List or tuple of pixel mask filenames to set.
"""
workflow = workflow.copy()
workflow[DetectorMasks] = (
workflow[DetectorMasks]
.map(pd.DataFrame({PixelMaskFilename: masks}).rename_axis('mask'))
.reduce(index='mask', func=_merge)
)
return workflow
[docs]
def with_banks(
workflow: sciline.Pipeline,
banks: Iterable[str],
index: Iterable[Hashable] | None = None,
) -> sciline.Pipeline:
"""
Return modified workflow with bank names set.
Since banks typically have different Q-resolution the I(Q) of banks are not merged.
That is, the resulting workflow will have separate outputs for each bank. Use
:py:func:`sciline.compute_mapped` to compute results for all banks.
Parameters
----------
workflow:
Workflow to modify.
banks:
List or tuple of bank names to set.
index:
Index to use for the DataFrame. If not provided, the bank names are used.
"""
index = index or banks
return workflow.map(
pd.DataFrame({NeXusDetectorName: banks}, index=index).rename_axis('bank')
)
def _set_runs(
pipeline: sciline.Pipeline, runs: Iterable[str], key: Hashable, axis_name: str
) -> sciline.Pipeline:
pipeline = pipeline.copy()
runs = pd.DataFrame({Filename[key]: runs}).rename_axis(axis_name)
for part in (Numerator, Denominator):
pipeline[CleanSummedQ[key, part]] = (
pipeline[CleanSummedQ[key, part]]
.map(runs)
.reduce(index=axis_name, func=merge_contributions)
)
return pipeline
[docs]
def with_sample_runs(
workflow: sciline.Pipeline, runs: Iterable[str]
) -> sciline.Pipeline:
"""
Return modified workflow with sample run filenames set.
Parameters
----------
workflow:
Workflow to modify.
runs:
List or tuple of sample run filenames to set.
"""
return _set_runs(workflow, runs, SampleRun, 'sample_run')
[docs]
def with_background_runs(
workflow: sciline.Pipeline, runs: Iterable[str]
) -> sciline.Pipeline:
"""
Return modified workflow with background run filenames set.
Parameters
----------
workflow:
Workflow to modify.
runs:
List or tuple of background run filenames to set.
"""
return _set_runs(workflow, runs, BackgroundRun, 'background_run')
parameter_mappers[PixelMaskFilename] = with_pixel_mask_filenames
# TODO: for now, we leave the mapping over detector banks out, because we do not have a
# method to merge the I(Q) of different banks, and we thus cannot compute a single
# result from the workflow. So only a single detector bank can be processed at a time.
# parameter_mappers[NeXusDetectorName] = with_banks
parameter_mappers[Filename[SampleRun]] = with_sample_runs
parameter_mappers[Filename[BackgroundRun]] = with_background_runs
providers = (
*conversions.providers,
*i_of_q.providers,
*masking.providers,
*normalization.providers,
common.beam_center_to_detector_position_offset,
)
"""
List of providers for setting up a Sciline pipeline.
This provides a default workflow, including a beam-center estimation based on a
center-of-mass approach. Providers for loadings files are not included. Combine with
the providers for a specific instrument, such as :py:data:`esssans.sans2d.providers`
to setup a complete workflow.
"""
def SansWorkflow() -> sciline.Pipeline:
"""
Common base for SANS workflows.
Returns
-------
:
SANS workflow as a sciline.Pipeline
"""
workflow = GenericNeXusWorkflow(
run_types=(
SampleRun,
EmptyBeamRun,
BackgroundRun,
TransmissionRun[SampleRun],
TransmissionRun[BackgroundRun],
),
monitor_types=(Incident, Transmission),
)
for provider in providers:
workflow.insert(provider)
workflow[CorrectForGravity] = CorrectForGravity(False)
workflow[DetectorBankSizes] = DetectorBankSizes({})
workflow[DimsToKeep] = DimsToKeep(())
workflow[TransformationPath] = TransformationPath('transform')
workflow[WavelengthBands] = WavelengthBands(None)
workflow[WavelengthMask] = WavelengthMask(None)
return workflow