ess.reduce.streaming.StreamProcessor#

class ess.reduce.streaming.StreamProcessor(base_workflow, *, dynamic_keys, context_keys=(), target_keys, accumulators, allow_bypass=False)[source]#

Wrap a base workflow for streaming processing of chunks.

Note that this class can not determine if the workflow is valid for streamed processing based on the input keys. In particular, it is the responsibility of the user to ensure that the workflow is “linear” with respect to the dynamic keys up to the accumulation keys.

Similarly, the stream processor cannot determine from the workflow structure whether context updates are compatible with the accumulated data. Accumulators are not cleared automatically. This is best illustrated with an example:

  • If the context is the detector rotation angle, and we accumulate I(Q) (or a prerequisite of I(Q)), then updating the detector angle context is compatible with previous data, assuming Q for each new chunk is computed based on the angle.

  • If the context is the sample temperature, and we accumulate I(Q), then updating the temperature context is not compatible with previous data. Accumulating I(Q, T) could be compatible in this case.

Since the correctness cannot be determined from the workflow structure, we recommend implementing processing steps in a way to catch such problems. For example, adding the temperature as a coordinate to the I(Q) data array should allow for automatically raising in the accumulator if the temperature changes.

__init__(base_workflow, *, dynamic_keys, context_keys=(), target_keys, accumulators, allow_bypass=False)[source]#

Create a stream processor.

Parameters:
  • base_workflow (Pipeline) – Workflow to be used for processing chunks.

  • dynamic_keys (tuple[type, ...]) – Keys that are expected to be updated with each chunk. These keys cannot depend on each other or on context_keys.

  • context_keys (tuple[type, ...], default: ()) – Keys that define context for processing chunks and may change occasionally. These keys cannot overlap with dynamic_keys or depend on each other or on dynamic_keys.

  • target_keys (tuple[type, ...]) – Keys to be computed and returned.

  • accumulators (dict[type, Accumulator | Callable[..., Accumulator]] | tuple[type, ...]) – Keys at which to accumulate values and their accumulators. If a tuple is passed, EternalAccumulator is used for all keys. Otherwise, a dict mapping keys to accumulator instances can be passed. If a dict value is a callable, base_workflow.bind_and_call(value) is used to make an instance.

  • allow_bypass (bool, default: False) – If True, allow bypassing accumulators for keys that are not in the accumulators dict. This is useful for dynamic keys that are not “terminated” in any accumulator. USE WITH CARE! This will lead to incorrect results unless the values for these keys are valid for all chunks comprised in the final accumulators at the point where finalize() is called.

Methods

__init__(base_workflow, *, dynamic_keys[, ...])

Create a stream processor.

accumulate(chunks)

Accumulate values from chunks without finalizing the result.

add_chunk(chunks)

Legacy interface for accumulating values from chunks and finalizing the result.

clear()

Clear all accumulators, resetting them to their initial state.

finalize()

Get the final result by computing the target keys based on accumulated values.

set_context(context)

Set the context for processing chunks.

accumulate(chunks)[source]#

Accumulate values from chunks without finalizing the result.

Parameters:

chunks (dict[type, Any]) – Chunks to be processed.

Raises:

ValueError – If non-dynamic keys are provided in chunks. If accumulator computation requires dynamic keys not provided in chunks.

Return type:

None

add_chunk(chunks)[source]#

Legacy interface for accumulating values from chunks and finalizing the result.

It is recommended to use accumulate() and finalize() instead.

Parameters:

chunks (dict[type, Any]) – Chunks to be processed.

Returns:

dict[type, Any] – Finalized result.

clear()[source]#

Clear all accumulators, resetting them to their initial state.

This is useful for restarting a streaming computation without creating a new StreamProcessor instance.

Return type:

None

finalize()[source]#

Get the final result by computing the target keys based on accumulated values.

Returns:

dict[type, Any] – Finalized result.

set_context(context)[source]#

Set the context for processing chunks.

Parameters:

context (dict[type, Any]) – Context to be set.

Return type:

None