ess.reduce.streaming.StreamProcessor#
- class ess.reduce.streaming.StreamProcessor(base_workflow, *, dynamic_keys, target_keys, accumulators, allow_bypass=False)[source]#
Wrap a base workflow for streaming processing of chunks.
Note that this class can not determine if the workflow is valid for streamed processing based on the input keys. In particular, it is the responsibility of the user to ensure that the workflow is “linear” with respect to the dynamic keys up to the accumulation keys.
- __init__(base_workflow, *, dynamic_keys, target_keys, accumulators, allow_bypass=False)[source]#
Create a stream processor.
- Parameters:
base_workflow (
Pipeline
) – Workflow to be used for processing chunks.dynamic_keys (
tuple
[type
,...
]) – Keys that are expected to be updated with each chunk.target_keys (
tuple
[type
,...
]) – Keys to be computed and returned.accumulators (
dict
[type
,Accumulator
,Callable
[...
,Accumulator
]] |tuple
[type
,...
]) – Keys at which to accumulate values and their accumulators. If a tuple is passed,EternalAccumulator
is used for all keys. Otherwise, a dict mapping keys to accumulator instances can be passed. If a dict value is a callable, base_workflow.bind_and_call(value) is used to make an instance.allow_bypass (
bool
, default:False
) – If True, allow bypassing accumulators for keys that are not in the accumulators dict. This is useful for dynamic keys that are not “terminated” in any accumulator. USE WITH CARE! This will lead to incorrect results unless the values for these keys are valid for all chunks comprised in the final accumulators at the point wherefinalize()
is called.
Methods
__init__
(base_workflow, *, dynamic_keys, ...)Create a stream processor.
accumulate
(chunks)Accumulate values from chunks without finalizing the result.
add_chunk
(chunks)Legacy interface for accumulating values from chunks and finalizing the result.
finalize
()Get the final result by computing the target keys based on accumulated values.
- add_chunk(chunks)[source]#
Legacy interface for accumulating values from chunks and finalizing the result.
It is recommended to use
accumulate()
andfinalize()
instead.