ess.reduce.streaming.StreamProcessor#

class ess.reduce.streaming.StreamProcessor(base_workflow, *, dynamic_keys, target_keys, accumulators, allow_bypass=False)[source]#

Wrap a base workflow for streaming processing of chunks.

Note that this class can not determine if the workflow is valid for streamed processing based on the input keys. In particular, it is the responsibility of the user to ensure that the workflow is “linear” with respect to the dynamic keys up to the accumulation keys.

__init__(base_workflow, *, dynamic_keys, target_keys, accumulators, allow_bypass=False)[source]#

Create a stream processor.

Parameters:
  • base_workflow (Pipeline) – Workflow to be used for processing chunks.

  • dynamic_keys (tuple[type, ...]) – Keys that are expected to be updated with each chunk.

  • target_keys (tuple[type, ...]) – Keys to be computed and returned.

  • accumulators (dict[type, Accumulator, Callable[..., Accumulator]] | tuple[type, ...]) – Keys at which to accumulate values and their accumulators. If a tuple is passed, EternalAccumulator is used for all keys. Otherwise, a dict mapping keys to accumulator instances can be passed. If a dict value is a callable, base_workflow.bind_and_call(value) is used to make an instance.

  • allow_bypass (bool, default: False) – If True, allow bypassing accumulators for keys that are not in the accumulators dict. This is useful for dynamic keys that are not “terminated” in any accumulator. USE WITH CARE! This will lead to incorrect results unless the values for these keys are valid for all chunks comprised in the final accumulators at the point where finalize() is called.

Methods

__init__(base_workflow, *, dynamic_keys, ...)

Create a stream processor.

accumulate(chunks)

Accumulate values from chunks without finalizing the result.

add_chunk(chunks)

Legacy interface for accumulating values from chunks and finalizing the result.

finalize()

Get the final result by computing the target keys based on accumulated values.

accumulate(chunks)[source]#

Accumulate values from chunks without finalizing the result.

Parameters:

chunks (dict[type, Any]) – Chunks to be processed.

Return type:

None

add_chunk(chunks)[source]#

Legacy interface for accumulating values from chunks and finalizing the result.

It is recommended to use accumulate() and finalize() instead.

Parameters:

chunks (dict[type, Any]) – Chunks to be processed.

Returns:

dict[type, Any] – Finalized result.

finalize()[source]#

Get the final result by computing the target keys based on accumulated values.

Returns:

dict[type, Any] – Finalized result.