Source code for ess.sans.workflow

# SPDX-License-Identifier: BSD-3-Clause
# Copyright (c) 2023 Scipp contributors (https://github.com/scipp)
from collections.abc import Hashable, Iterable

import pandas as pd
import sciline
import scipp as sc

from ess.reduce.parameter import parameter_mappers

from .types import (
    BackgroundRun,
    CleanSummedQ,
    Denominator,
    DetectorMasks,
    Filename,
    NeXusDetectorName,
    Numerator,
    PixelMaskFilename,
    SampleRun,
)


def _merge(*dicts: dict) -> dict:
    return {key: value for d in dicts for key, value in d.items()}


def merge_contributions(*data: sc.DataArray) -> sc.DataArray:
    if len(data) == 1:
        return data[0]
    reducer = sc.reduce(data)
    return reducer.bins.concat() if data[0].bins is not None else reducer.sum()


[docs] def with_pixel_mask_filenames( workflow: sciline.Pipeline, masks: Iterable[str] ) -> sciline.Pipeline: """ Return modified workflow with pixel mask filenames set. Parameters ---------- workflow: Workflow to modify. masks: List or tuple of pixel mask filenames to set. """ workflow = workflow.copy() workflow[DetectorMasks] = ( workflow[DetectorMasks] .map(pd.DataFrame({PixelMaskFilename: masks}).rename_axis('mask')) .reduce(index='mask', func=_merge) ) return workflow
[docs] def with_banks( workflow: sciline.Pipeline, banks: Iterable[str], index: Iterable[Hashable] | None = None, ) -> sciline.Pipeline: """ Return modified workflow with bank names set. Since banks typically have different Q-resolution the I(Q) of banks are not merged. That is, the resulting workflow will have separate outputs for each bank. Use :py:func:`sciline.compute_mapped` to compute results for all banks. Parameters ---------- workflow: Workflow to modify. banks: List or tuple of bank names to set. index: Index to use for the DataFrame. If not provided, the bank names are used. """ index = index or banks return workflow.map( pd.DataFrame({NeXusDetectorName: banks}, index=index).rename_axis('bank') )
def _set_runs( pipeline: sciline.Pipeline, runs: Iterable[str], key: Hashable, axis_name: str ) -> sciline.Pipeline: pipeline = pipeline.copy() runs = pd.DataFrame({Filename[key]: runs}).rename_axis(axis_name) for part in (Numerator, Denominator): pipeline[CleanSummedQ[key, part]] = ( pipeline[CleanSummedQ[key, part]] .map(runs) .reduce(index=axis_name, func=merge_contributions) ) return pipeline
[docs] def with_sample_runs( workflow: sciline.Pipeline, runs: Iterable[str] ) -> sciline.Pipeline: """ Return modified workflow with sample run filenames set. Parameters ---------- workflow: Workflow to modify. runs: List or tuple of sample run filenames to set. """ return _set_runs(workflow, runs, SampleRun, 'sample_run')
[docs] def with_background_runs( workflow: sciline.Pipeline, runs: Iterable[str] ) -> sciline.Pipeline: """ Return modified workflow with background run filenames set. Parameters ---------- workflow: Workflow to modify. runs: List or tuple of background run filenames to set. """ return _set_runs(workflow, runs, BackgroundRun, 'background_run')
parameter_mappers[PixelMaskFilename] = with_pixel_mask_filenames parameter_mappers[NeXusDetectorName] = with_banks parameter_mappers[Filename[SampleRun]] = with_sample_runs parameter_mappers[Filename[BackgroundRun]] = with_background_runs