Source code for beamlime.workflow_protocols

# SPDX-License-Identifier: BSD-3-Clause
# Copyright (c) 2024 Scipp contributors (https://github.com/scipp)
from importlib.metadata import entry_points
from pathlib import Path
from typing import NewType, Protocol, runtime_checkable

from ess.reduce.nexus.json_nexus import JSONGroup

try:
    import scipp as sc
except ImportError as e:
    raise ImportError("Please install the scipp to use the ``DummyWorkflow``.") from e


WorkflowName = NewType('WorkflowName', str)
'''Name of the workflow plugin to use'''

WorkflowResult = dict[str, sc.DataArray]
"""Result of a workflow, a dictionary of scipp DataArrays."""


[docs] @runtime_checkable class LiveWorkflow(Protocol): def __init__(self, nexus_filename: Path) -> None: """Initialize the workflow with all static information. The only argument we need is the path to the nexus file or a file object that workflow can read from, since the file carrys all the static information. """ ... def __call__( self, nxevent_data: dict[str, JSONGroup], nxlog: dict[str, JSONGroup] ) -> WorkflowResult: """Call the workflow and return the computed results that can be visualized. Parameters ---------- nxevent_data: Dictionary of event data groups. It should contain empty events even if no events was received since the last call. nxlog: Dictionary of log data groups. It should only contain keys that we received since the last call. Returns ------- : A dictionary of objects that can be visualized. """ ...
[docs] class DummyLiveWorkflow: "Dummy workflow for testing purposes, returning random counts per event and log."
[docs] def __init__(self, nexus_filename: Path) -> None: import numpy as np self.nexus_filename = nexus_filename.as_posix() self.rng = np.random.default_rng() self.x = sc.array(dims=['x'], values=np.arange(10), unit='m')
def __call__( self, nxevent_data: dict[str, JSONGroup], nxlog: dict[str, JSONGroup], ) -> WorkflowResult: from itertools import chain return { f'random-counts\n{self.nexus_filename}\n{name}': sc.DataArray( data=sc.array(dims=['x'], values=self.rng.random(10), unit='counts'), coords={'x': self.x}, ) for name in chain(nxevent_data, nxlog) }
[docs] def provide_beamlime_workflow(workflow: WorkflowName) -> type[LiveWorkflow]: """Provide the workflow plugin class based on the name.""" workflow_plugins = entry_points(group='beamlime.workflow_plugin') return workflow_plugins[workflow].load()