ess.reduce.streaming.StreamProcessor#

class ess.reduce.streaming.StreamProcessor(base_workflow, *, dynamic_keys, context_keys=(), target_keys, accumulators, allow_bypass=False)[source]#

Wrap a base workflow for streaming processing of chunks.

Note that this class can not determine if the workflow is valid for streamed processing based on the input keys. In particular, it is the responsibility of the user to ensure that the workflow is “linear” with respect to the dynamic keys up to the accumulation keys.

Similarly, the stream processor cannot determine from the workflow structure whether context updates are compatible with the accumulated data. Accumulators are not cleared automatically. This is best illustrated with an example:

  • If the context is the detector rotation angle, and we accumulate I(Q) (or a prerequisite of I(Q)), then updating the detector angle context is compatible with previous data, assuming Q for each new chunk is computed based on the angle.

  • If the context is the sample temperature, and we accumulate I(Q), then updating the temperature context is not compatible with previous data. Accumulating I(Q, T) could be compatible in this case.

Since the correctness cannot be determined from the workflow structure, we recommend implementing processing steps in a way to catch such problems. For example, adding the temperature as a coordinate to the I(Q) data array should allow for automatically raising in the accumulator if the temperature changes.

__init__(base_workflow, *, dynamic_keys, context_keys=(), target_keys, accumulators, allow_bypass=False)[source]#

Create a stream processor.

Parameters:
  • base_workflow (Pipeline) – Workflow to be used for processing chunks.

  • dynamic_keys (tuple[type, ...]) – Keys that are expected to be updated with each chunk. These keys cannot depend on each other or on context_keys.

  • context_keys (tuple[type, ...], default: ()) – Keys that define context for processing chunks and may change occasionally. These keys cannot overlap with dynamic_keys or depend on each other or on dynamic_keys.

  • target_keys (tuple[type, ...]) – Keys to be computed and returned.

  • accumulators (dict[type, Accumulator | Callable[..., Accumulator]] | tuple[type, ...]) – Keys at which to accumulate values and their accumulators. If a tuple is passed, EternalAccumulator is used for all keys. Otherwise, a dict mapping keys to accumulator instances can be passed. If a dict value is a callable, base_workflow.bind_and_call(value) is used to make an instance.

  • allow_bypass (bool, default: False) – If True, allow bypassing accumulators for keys that are not in the accumulators dict. This is useful for dynamic keys that are not “terminated” in any accumulator. USE WITH CARE! This will lead to incorrect results unless the values for these keys are valid for all chunks comprised in the final accumulators at the point where finalize() is called.

Methods

__init__(base_workflow, *, dynamic_keys[, ...])

Create a stream processor.

accumulate(chunks)

Accumulate values from chunks without finalizing the result.

add_chunk(chunks)

Legacy interface for accumulating values from chunks and finalizing the result.

clear()

Clear all accumulators, resetting them to their initial state.

finalize()

Get the final result by computing the target keys based on accumulated values.

set_context(context)

Set the context for processing chunks.

visualize([compact, mode, cluster_generics, ...])

Visualize the workflow with node classification styling.

accumulate(chunks)[source]#

Accumulate values from chunks without finalizing the result.

Parameters:

chunks (dict[type, Any]) – Chunks to be processed.

Raises:

ValueError – If non-dynamic keys are provided in chunks. If accumulator computation requires dynamic keys not provided in chunks.

Return type:

None

add_chunk(chunks)[source]#

Legacy interface for accumulating values from chunks and finalizing the result.

It is recommended to use accumulate() and finalize() instead.

Parameters:

chunks (dict[type, Any]) – Chunks to be processed.

Returns:

dict[type, Any] – Finalized result.

clear()[source]#

Clear all accumulators, resetting them to their initial state.

This is useful for restarting a streaming computation without creating a new StreamProcessor instance.

Return type:

None

finalize()[source]#

Get the final result by computing the target keys based on accumulated values.

Returns:

dict[type, Any] – Finalized result.

set_context(context)[source]#

Set the context for processing chunks.

Parameters:

context (dict[type, Any]) – Context to be set.

Return type:

None

visualize(compact=False, mode='data', cluster_generics=True, cluster_color='#f0f0ff', show_legend=True, show_static_dependencies=True, **kwargs)[source]#

Visualize the workflow with node classification styling.

This post-processes sciline’s visualization to add styling that highlights:

  • Static nodes (gray): Pre-computed once, dependencies of cached nodes

  • Static cached nodes (gray, thick border): Pre-computed and cached

  • Dynamic keys (green, thick border): Input entry points for chunks

  • Dynamic nodes (light green): Recomputed for each chunk

  • Context keys (blue, thick border): Input entry points for context

  • Context-dependent nodes (light blue): Cached until context changes

  • Accumulator keys (orange cylinder): Aggregation points

  • Finalize nodes (plum): Computed from accumulators during finalize

  • Target keys (double border): Final outputs

Parameters:
  • compact (bool, default: False) – If True, parameter-table-dependent branches are collapsed.

  • mode (Literal['data', 'task', 'both'], default: 'data') – ‘data’ shows only data nodes, ‘task’ shows task nodes, ‘both’ shows all.

  • cluster_generics (bool, default: True) – If True, generic products are grouped into clusters.

  • cluster_color (str | None, default: '#f0f0ff') – Background color of clusters. If None, clusters are dotted.

  • show_legend (bool, default: True) – If True, add a legend explaining the node styles.

  • show_static_dependencies (bool, default: True) – If True (default), show all static nodes including dependencies of cached nodes. If False, hide the ancestors of cached nodes to simplify the graph.

  • **kwargs (Any) – Additional arguments passed to graphviz.Digraph.

Returns:

Digraph – A graphviz.Digraph with styled nodes.