ess.reduce.streaming.StreamProcessor#

class ess.reduce.streaming.StreamProcessor(base_workflow, *, dynamic_keys, target_keys, accumulators)[source]#

Wrap a base workflow for streaming processing of chunks.

Note that this class can not determine if the workflow is valid for streamed processing based on the input keys. In particular, it is the responsibility of the user to ensure that the workflow is “linear” with respect to the dynamic keys up to the accumulation keys.

__init__(base_workflow, *, dynamic_keys, target_keys, accumulators)[source]#

Create a stream processor.

Parameters:
  • base_workflow (Pipeline) – Workflow to be used for processing chunks.

  • dynamic_keys (tuple[type, ...]) – Keys that are expected to be updated with each chunk.

  • target_keys (tuple[type, ...]) – Keys to be computed and returned.

  • accumulators (dict[type, Accumulator, Callable[..., Accumulator]] | tuple[type, ...]) – Keys at which to accumulate values and their accumulators. If a tuple is passed, EternalAccumulator is used for all keys. Otherwise, a dict mapping keys to accumulator instances can be passed. If a dict value is a callable, base_workflow.bind_and_call(value) is used to make an instance.

Methods

__init__(base_workflow, *, dynamic_keys, ...)

Create a stream processor.

add_chunk(chunks)