Source code for ess.reflectometry.workflow

from collections.abc import Hashable, Sequence
from itertools import chain

import pandas as pd
import sciline
import scipp as sc

from ess.amor.types import RawChopper
from ess.reflectometry.orso import (
    OrsoExperiment,
    OrsoOwner,
    OrsoSample,
    OrsoSampleFilenames,
)
from ess.reflectometry.types import (
    Filename,
    FootprintCorrectedData,
    RunType,
    SampleRotation,
    SampleRun,
)


def _concatenate_event_lists(*das):
    return (
        sc.reduce(das)
        .bins.concat()
        .assign_coords(
            {
                name: das[0].coords[name]
                for name in ('position', 'sample_rotation', 'detector_rotation')
            }
        )
    )


def _any_value(x, *_):
    return x


def _concatenate_lists(*x):
    return list(chain(*x))


[docs] def with_filenames( workflow, runtype: Hashable, runs: Sequence[Filename[RunType]] ) -> sciline.Pipeline: '''Sets a number of :code:`Filename[runtype]` simultaneously. The events from all listed files are concatenated in the workflow. Arguments ---------- workflow: the workflow to copy and add the filenames to runtype: the kind of runtype to add the files as. Example: :code:`SampleRun` or :code:`ReferenceRun`. runs: the list of filenames to map over Returns --------- A copy of the original workflow mapping over the provided files. ''' axis_name = f'{str(runtype).lower()}_runs' df = pd.DataFrame({Filename[runtype]: runs}).rename_axis(axis_name) wf = workflow.copy() mapped = wf.map(df) wf[FootprintCorrectedData[runtype]] = mapped[ FootprintCorrectedData[runtype] ].reduce(index=axis_name, func=_concatenate_event_lists) wf[RawChopper[runtype]] = mapped[RawChopper[runtype]].reduce( index=axis_name, func=_any_value ) wf[SampleRotation[runtype]] = mapped[SampleRotation[runtype]].reduce( index=axis_name, func=_any_value ) if runtype is SampleRun: wf[OrsoSample] = mapped[OrsoSample].reduce(index=axis_name, func=_any_value) wf[OrsoExperiment] = mapped[OrsoExperiment].reduce( index=axis_name, func=_any_value ) wf[OrsoOwner] = mapped[OrsoOwner].reduce(index=axis_name, func=lambda x, *_: x) wf[OrsoSampleFilenames] = mapped[OrsoSampleFilenames].reduce( # When we don't map over filenames # each OrsoSampleFilenames is a list with a single entry. index=axis_name, func=_concatenate_lists, ) return wf