# SPDX-License-Identifier: BSD-3-Clause
# Copyright (c) 2023 Scipp contributors (https://github.com/scipp)
"""This module provides tools for running workflows in a streaming fashion."""
from abc import ABC, abstractmethod
from collections.abc import Callable
from copy import deepcopy
from typing import TYPE_CHECKING, Any, Generic, Literal, TypeVar
import networkx as nx
import sciline
import scipp as sc
if TYPE_CHECKING:
import graphviz
T = TypeVar('T')
[docs]
def maybe_hist(value: T) -> T:
"""
Convert value to a histogram if it is not already a histogram.
This is the default pre-processing used by accumulators.
Parameters
----------
value:
Value to be converted to a histogram.
Returns
-------
:
Histogram.
"""
if not isinstance(value, sc.Variable | sc.DataArray):
return value
return value if value.bins is None else value.hist()
[docs]
class Accumulator(ABC, Generic[T]):
"""
Abstract base class for accumulators.
Accumulators are used to accumulate values over multiple chunks.
"""
[docs]
def __init__(self, preprocess: Callable[[T], T] | None = maybe_hist) -> None:
"""
Parameters
----------
preprocess:
Preprocessing function to be applied to pushed values prior to accumulation.
"""
self._preprocess = preprocess
[docs]
def push(self, value: T) -> None:
"""
Push a value to the accumulator.
Parameters
----------
value:
Value to be pushed to the accumulator.
"""
if self._preprocess is not None:
value = self._preprocess(value)
self._do_push(value)
@abstractmethod
def _do_push(self, value: T) -> None: ...
@property
def is_empty(self) -> bool:
"""
Check if the accumulator is empty.
Returns
-------
:
True if the accumulator is empty, False otherwise.
"""
return False
@property
def value(self) -> T:
"""
Get the accumulated value.
Returns
-------
:
Accumulated value.
Raises
------
ValueError
If the accumulator is empty.
"""
if self.is_empty:
raise ValueError("Cannot get value from empty accumulator")
return self._get_value()
@abstractmethod
def _get_value(self) -> T:
"""Return the accumulated value, assuming it exists."""
[docs]
@abstractmethod
def clear(self) -> None:
"""
Clear the accumulator, resetting it to its initial state.
"""
[docs]
def on_finalize(self) -> None:
"""
Called after finalize retrieves value.
Override this method to perform custom cleanup after each finalize cycle.
The default implementation does nothing.
"""
[docs]
class EternalAccumulator(Accumulator[T]):
"""
Simple accumulator that adds pushed values immediately.
Does not support event data.
"""
[docs]
def __init__(self, **kwargs: Any) -> None:
super().__init__(**kwargs)
self._value: T | None = None
@property
def is_empty(self) -> bool:
return self._value is None
def _get_value(self) -> T:
return deepcopy(self._value)
def _do_push(self, value: T) -> None:
if self._value is None:
self._value = deepcopy(value)
else:
self._value += value
[docs]
def clear(self) -> None:
"""Clear the accumulated value."""
self._value = None
[docs]
class MeanAccumulator(EternalAccumulator[T]):
"""
Accumulator that computes the mean of pushed values.
Does not support event data.
"""
[docs]
def __init__(self, **kwargs: Any) -> None:
super().__init__(**kwargs)
self._count = 0
def _do_push(self, value: T) -> None:
super()._do_push(value)
self._count += 1
def _get_value(self) -> T:
return super()._get_value() / self._count
[docs]
def clear(self) -> None:
"""Clear the accumulated value and count."""
super().clear()
self._count = 0
[docs]
class RollingAccumulator(Accumulator[T]):
"""
Accumulator that adds pushed values to a rolling window.
Does not support event data.
"""
[docs]
def __init__(self, window: int = 10, **kwargs: Any) -> None:
"""
Parameters
----------
window:
Size of the rolling window.
"""
super().__init__(**kwargs)
self._window = window
self._values: list[T] = []
@property
def is_empty(self) -> bool:
return len(self._values) == 0
def _get_value(self) -> T:
# Naive and potentially slow implementation if values and/or window are large!
return sc.reduce(self._values).sum()
def _do_push(self, value: T) -> None:
self._values.append(value)
if len(self._values) > self._window:
self._values.pop(0)
[docs]
def clear(self) -> None:
"""Clear the accumulated values."""
self._values = []
[docs]
class MinAccumulator(Accumulator):
"""Keeps the minimum value seen so far.
Only supports scalar values.
"""
[docs]
def __init__(self, **kwargs: Any) -> None:
super().__init__(**kwargs)
self._cur_min: sc.Variable | None = None
def _do_push(self, value: sc.Variable) -> None:
if self._cur_min is None:
self._cur_min = value
else:
self._cur_min = min(self._cur_min, value)
@property
def is_empty(self) -> bool:
"""Check if the accumulator has collected a minimum value."""
return self._cur_min is None
def _get_value(self) -> Any:
return self._cur_min
[docs]
def clear(self) -> None:
"""Clear the accumulated minimum value."""
self._cur_min = None
[docs]
class MaxAccumulator(Accumulator):
"""Keeps the maximum value seen so far.
Only supports scalar values.
"""
[docs]
def __init__(self, **kwargs: Any) -> None:
super().__init__(**kwargs)
self._cur_max: sc.Variable | None = None
@property
def is_empty(self) -> bool:
"""Check if the accumulator has collected a maximum value."""
return self._cur_max is None
def _do_push(self, value: sc.Variable) -> None:
if self._cur_max is None:
self._cur_max = value
else:
self._cur_max = max(self._cur_max, value)
def _get_value(self) -> sc.Variable | None:
return self._cur_max
[docs]
def clear(self) -> None:
"""Clear the accumulated maximum value."""
self._cur_max = None
[docs]
class StreamProcessor:
"""
Wrap a base workflow for streaming processing of chunks.
Note that this class can not determine if the workflow is valid for streamed
processing based on the input keys. In particular, it is the responsibility of the
user to ensure that the workflow is "linear" with respect to the dynamic keys up to
the accumulation keys.
Similarly, the stream processor cannot determine from the workflow structure whether
context updates are compatible with the accumulated data. Accumulators are not
cleared automatically. This is best illustrated with an example:
- If the context is the detector rotation angle, and we accumulate I(Q) (or a
prerequisite of I(Q)), then updating the detector angle context is compatible with
previous data, assuming Q for each new chunk is computed based on the angle.
- If the context is the sample temperature, and we accumulate I(Q), then updating
the temperature context is not compatible with previous data. Accumulating I(Q, T)
could be compatible in this case.
Since the correctness cannot be determined from the workflow structure, we recommend
implementing processing steps in a way to catch such problems. For example, adding
the temperature as a coordinate to the I(Q) data array should allow for
automatically raising in the accumulator if the temperature changes.
"""
[docs]
def __init__(
self,
base_workflow: sciline.Pipeline,
*,
dynamic_keys: tuple[sciline.typing.Key, ...],
context_keys: tuple[sciline.typing.Key, ...] = (),
target_keys: tuple[sciline.typing.Key, ...],
accumulators: dict[sciline.typing.Key, Accumulator | Callable[..., Accumulator]]
| tuple[sciline.typing.Key, ...],
allow_bypass: bool = False,
) -> None:
"""
Create a stream processor.
Parameters
----------
base_workflow:
Workflow to be used for processing chunks.
dynamic_keys:
Keys that are expected to be updated with each chunk. These keys cannot
depend on each other or on context_keys.
context_keys:
Keys that define context for processing chunks and may change occasionally.
These keys cannot overlap with dynamic_keys or depend on each other or on
dynamic_keys.
target_keys:
Keys to be computed and returned.
accumulators:
Keys at which to accumulate values and their accumulators. If a tuple is
passed, :py:class:`EternalAccumulator` is used for all keys. Otherwise, a
dict mapping keys to accumulator instances can be passed. If a dict value is
a callable, base_workflow.bind_and_call(value) is used to make an instance.
allow_bypass:
If True, allow bypassing accumulators for keys that are not in the
accumulators dict. This is useful for dynamic keys that are not "terminated"
in any accumulator. USE WITH CARE! This will lead to incorrect results
unless the values for these keys are valid for all chunks comprised in the
final accumulators at the point where :py:meth:`finalize` is called.
"""
self._dynamic_keys = set(dynamic_keys)
self._context_keys = set(context_keys)
# Validate that dynamic and context keys do not overlap
overlap = self._dynamic_keys & self._context_keys
if overlap:
raise ValueError(f"Keys cannot be both dynamic and context: {overlap}")
# Check dynamic/context keys don't depend on other dynamic/context keys
graph = base_workflow.underlying_graph
special_keys = self._dynamic_keys | self._context_keys
for key in special_keys:
if key not in graph:
continue
ancestors = nx.ancestors(graph, key)
special_ancestors = ancestors & special_keys
downstream = 'Dynamic' if key in self._dynamic_keys else 'Context'
if special_ancestors:
raise ValueError(
f"{downstream} key '{key}' depends on other dynamic/context keys: "
f"{special_ancestors}. This is not supported."
)
workflow = sciline.Pipeline()
for key in target_keys:
workflow[key] = base_workflow[key]
for key in dynamic_keys:
workflow[key] = None # hack to prune branches
for key in context_keys:
workflow[key] = None
# Store for visualization (copy in case caller modifies base_workflow later)
self._base_workflow_for_viz = base_workflow.copy()
# Find and pre-compute static nodes as far down the graph as possible
nodes = _find_descendants(workflow, dynamic_keys + context_keys)
last_static = _find_parents(workflow, nodes) - nodes
for key, value in base_workflow.compute(last_static).items():
workflow[key] = value
self._cached_keys = last_static # Store for visualization
# Nodes that may need updating on context change but should be cached otherwise.
dynamic_nodes = _find_descendants(workflow, dynamic_keys)
# Nodes as far "down" in the graph as possible, right before the dynamic nodes.
# This also includes target keys that are not dynamic but context-dependent.
context_to_cache = (
(_find_parents(workflow, dynamic_nodes) | set(target_keys)) - dynamic_nodes
) & _find_descendants(workflow, context_keys)
graph = workflow.underlying_graph
self._context_key_to_cached_context_nodes_map = {
context_key: ({context_key} | nx.descendants(graph, context_key))
& context_to_cache
for context_key in self._context_keys
if context_key in graph
}
self._context_workflow = workflow.copy()
self._process_chunk_workflow = workflow.copy()
self._finalize_workflow = workflow.copy()
self._accumulators = (
accumulators
if isinstance(accumulators, dict)
else {key: EternalAccumulator() for key in accumulators}
)
# Map each accumulator to its dependent dynamic keys
self._accumulator_dependencies = {
acc_key: nx.ancestors(graph, acc_key) & self._dynamic_keys
for acc_key in self._accumulators
if acc_key in graph
}
# Depending on the target_keys, some accumulators can be unused and should not
# be computed when adding a chunk.
self._accumulators = {
key: value for key, value in self._accumulators.items() if key in graph
}
# Create accumulators unless instances were passed. This allows for initializing
# accumulators with arguments that depend on the workflow such as bin edges,
# which would otherwise be hard to obtain.
self._accumulators = {
key: value
if isinstance(value, Accumulator)
else base_workflow.bind_and_call(value)
for key, value in self._accumulators.items()
}
self._target_keys = target_keys
self._allow_bypass = allow_bypass
[docs]
def set_context(self, context: dict[sciline.typing.Key, Any]) -> None:
"""
Set the context for processing chunks.
Parameters
----------
context:
Context to be set.
"""
needs_recompute = set()
for key in context:
if key not in self._context_keys:
raise ValueError(f"Key '{key}' is not a context key")
needs_recompute |= self._context_key_to_cached_context_nodes_map[key]
for key, value in context.items():
self._context_workflow[key] = value
# Propagate context values to finalize workflow so providers that depend
# on context keys receive the updated values during finalize().
self._finalize_workflow[key] = value
results = self._context_workflow.compute(needs_recompute)
for key, value in results.items():
if key in self._target_keys:
# Context-dependent key is direct target, independent of dynamic nodes.
self._finalize_workflow[key] = value
else:
self._process_chunk_workflow[key] = value
[docs]
def add_chunk(
self, chunks: dict[sciline.typing.Key, Any]
) -> dict[sciline.typing.Key, Any]:
"""
Legacy interface for accumulating values from chunks and finalizing the result.
It is recommended to use :py:meth:`accumulate` and :py:meth:`finalize` instead.
Parameters
----------
chunks:
Chunks to be processed.
Returns
-------
:
Finalized result.
"""
self.accumulate(chunks)
return self.finalize()
[docs]
def accumulate(self, chunks: dict[sciline.typing.Key, Any]) -> None:
"""
Accumulate values from chunks without finalizing the result.
Parameters
----------
chunks:
Chunks to be processed.
Raises
------
ValueError
If non-dynamic keys are provided in chunks.
If accumulator computation requires dynamic keys not provided in chunks.
"""
non_dynamic = set(chunks) - self._dynamic_keys
if non_dynamic:
raise ValueError(
f"Can only update dynamic keys. Got non-dynamic keys: {non_dynamic}"
)
accumulators_to_update = []
for acc_key, deps in self._accumulator_dependencies.items():
if deps.isdisjoint(chunks.keys()):
continue
if not deps.issubset(chunks.keys()):
raise ValueError(
f"Accumulator '{acc_key}' requires dynamic keys "
f"{deps - chunks.keys()} not provided in the current chunk."
)
accumulators_to_update.append(acc_key)
for key, value in chunks.items():
self._process_chunk_workflow[key] = value
# There can be dynamic keys that do not "terminate" in any accumulator. In
# that case, we need to make sure they can be and are used when computing
# the target keys.
if self._allow_bypass:
self._finalize_workflow[key] = value
to_accumulate = self._process_chunk_workflow.compute(accumulators_to_update)
for key, processed in to_accumulate.items():
self._accumulators[key].push(processed)
[docs]
def finalize(self) -> dict[sciline.typing.Key, Any]:
"""
Get the final result by computing the target keys based on accumulated values.
Returns
-------
:
Finalized result.
"""
for key in self._accumulators:
self._finalize_workflow[key] = self._accumulators[key].value
result = self._finalize_workflow.compute(self._target_keys)
for acc in self._accumulators.values():
acc.on_finalize()
return result
[docs]
def clear(self) -> None:
"""
Clear all accumulators, resetting them to their initial state.
This is useful for restarting a streaming computation without
creating a new StreamProcessor instance.
"""
for accumulator in self._accumulators.values():
accumulator.clear()
def _get_viz_workflow(self) -> sciline.Pipeline:
"""Create the workflow used for visualization."""
viz_workflow = sciline.Pipeline()
for key in self._target_keys:
viz_workflow[key] = self._base_workflow_for_viz[key]
viz_graph = viz_workflow.underlying_graph
for key in self._dynamic_keys:
if key in viz_graph:
viz_workflow[key] = None
for key in self._context_keys:
if key in viz_graph:
viz_workflow[key] = None
return viz_workflow
def _classify_nodes(self, graph: nx.DiGraph) -> dict[str, set[Any]]:
"""
Classify all nodes in the graph for visualization.
Node categories:
- static: Pre-computed once, not dependent on dynamic or context keys
- cached_keys: Subset of static nodes that are actually cached (last_static)
- dynamic_keys: Input entry points for chunk data
- dynamic_nodes: Downstream of dynamic keys, recomputed per chunk
(excludes nodes downstream of accumulators, which are computed in finalize)
- context_keys: Input entry points for context data
- context_dependent: Downstream of context keys but not dynamic keys
- accumulator_keys: Where values are aggregated across chunks
- target_keys: Final outputs computed in finalize()
- finalize_nodes: Downstream of accumulators, computed in finalize()
"""
all_nodes = set(graph.nodes)
accumulator_keys = set(self._accumulators.keys())
target_keys = set(self._target_keys)
# Compute derived classifications
dynamic_descendants = _find_descendants(graph, self._dynamic_keys)
context_descendants = _find_descendants(graph, self._context_keys)
# Nodes downstream of accumulators are computed in finalize(), not per-chunk
accumulator_descendants = _find_descendants(graph, accumulator_keys)
finalize_nodes = accumulator_descendants - accumulator_keys
# Dynamic nodes: downstream of dynamic keys but NOT downstream of accumulators
# These are recomputed for each chunk
dynamic_nodes = (
dynamic_descendants - self._dynamic_keys - accumulator_descendants
)
# Context-dependent nodes: downstream of context but not of dynamic
context_dependent = (
context_descendants - dynamic_descendants - self._context_keys
)
# Static nodes: not dependent on dynamic or context
static_nodes = all_nodes - dynamic_descendants - context_descendants
return {
'static': static_nodes,
'cached_keys': self._cached_keys & all_nodes,
'dynamic_keys': self._dynamic_keys & all_nodes,
'dynamic_nodes': dynamic_nodes,
'context_keys': self._context_keys & all_nodes,
'context_dependent': context_dependent,
'accumulator_keys': accumulator_keys & all_nodes,
'target_keys': target_keys & all_nodes,
'finalize_nodes': finalize_nodes,
}
[docs]
def visualize(
self,
compact: bool = False,
mode: Literal['data', 'task', 'both'] = 'data',
cluster_generics: bool = True,
cluster_color: str | None = '#f0f0ff',
show_legend: bool = True,
show_static_dependencies: bool = True,
**kwargs: Any,
) -> 'graphviz.Digraph':
"""
Visualize the workflow with node classification styling.
This post-processes sciline's visualization to add styling that highlights:
- Static nodes (gray): Pre-computed once, dependencies of cached nodes
- Static cached nodes (gray, thick border): Pre-computed and cached
- Dynamic keys (green, thick border): Input entry points for chunks
- Dynamic nodes (light green): Recomputed for each chunk
- Context keys (blue, thick border): Input entry points for context
- Context-dependent nodes (light blue): Cached until context changes
- Accumulator keys (orange cylinder): Aggregation points
- Finalize nodes (plum): Computed from accumulators during finalize
- Target keys (double border): Final outputs
Parameters
----------
compact:
If True, parameter-table-dependent branches are collapsed.
mode:
'data' shows only data nodes, 'task' shows task nodes, 'both' shows all.
cluster_generics:
If True, generic products are grouped into clusters.
cluster_color:
Background color of clusters. If None, clusters are dotted.
show_legend:
If True, add a legend explaining the node styles.
show_static_dependencies:
If True (default), show all static nodes including dependencies of cached
nodes. If False, hide the ancestors of cached nodes to simplify the graph.
**kwargs:
Additional arguments passed to graphviz.Digraph.
Returns
-------
:
A graphviz.Digraph with styled nodes.
"""
viz_workflow = self._get_viz_workflow()
if not show_static_dependencies:
# Create a pruned workflow that hides ancestors of cached nodes
viz_workflow = viz_workflow.copy()
for key in self._cached_keys:
if key in viz_workflow.underlying_graph:
viz_workflow[key] = None
graph = viz_workflow.underlying_graph
dot = viz_workflow.visualize(
compact=compact,
mode=mode,
cluster_generics=cluster_generics,
cluster_color=cluster_color,
**kwargs,
)
classifications = self._classify_nodes(graph)
# Build a mapping from formatted names to original keys
key_to_formatted: dict[Any, str] = {}
for key in graph.nodes:
key_to_formatted[key] = _format_key_for_graphviz(key)
# Apply styles by re-adding nodes with updated attributes
for key, formatted_name in key_to_formatted.items():
style = _get_node_style(key, classifications)
if style:
dot.node(formatted_name, **style)
# Add legend
if show_legend:
_add_legend(dot, show_static_dependencies=show_static_dependencies)
return dot
def _find_descendants(
source: sciline.Pipeline | nx.DiGraph,
keys: set[sciline.typing.Key] | tuple[sciline.typing.Key, ...],
) -> set[sciline.typing.Key]:
"""Find all descendants of any key in keys, including the keys themselves."""
graph = source.underlying_graph if hasattr(source, 'underlying_graph') else source
keys_set = set(keys)
descendants = set()
for key in keys_set:
if key in graph:
descendants |= nx.descendants(graph, key)
return descendants | (keys_set & set(graph.nodes))
def _find_parents(
workflow: sciline.Pipeline, keys: tuple[sciline.typing.Key, ...]
) -> set[sciline.typing.Key]:
graph = workflow.underlying_graph
parents = set()
for key in keys:
parents |= set(graph.predecessors(key))
return parents
# =============================================================================
# Visualization helpers
# =============================================================================
# Style definitions for each node category
# Priority order for overlapping categories (higher = takes precedence for fill)
_VIZ_STYLES = {
'static': {
'fillcolor': '#e8e8e8', # Gray
'style': 'filled',
'priority': 0,
},
'cached_keys': {
'fillcolor': '#e8e8e8', # Gray (same as static)
'style': 'filled',
'penwidth': '2.5', # Thick border to distinguish from dependencies
'priority': 1,
},
'context_dependent': {
'fillcolor': '#d4e8f4', # Light blue
'style': 'filled',
'priority': 2,
},
'context_keys': {
'fillcolor': '#87CEEB', # Sky blue
'style': 'filled',
'penwidth': '2.5',
'color': 'black', # Override sciline's red for unsatisfied
'fontcolor': 'black',
'priority': 3,
},
'dynamic_nodes': {
'fillcolor': '#d4f4d4', # Light green
'style': 'filled',
'priority': 4,
},
'dynamic_keys': {
'fillcolor': '#90EE90', # Light green (stronger)
'style': 'filled',
'penwidth': '2.5',
'color': 'black', # Override sciline's red for unsatisfied
'fontcolor': 'black',
'priority': 5,
},
'accumulator_keys': {
'fillcolor': '#FFB347', # Orange
'style': 'filled',
'shape': 'cylinder',
'priority': 6,
},
'finalize_nodes': {
'fillcolor': '#DDA0DD', # Plum (more distinct from cluster color)
'style': 'filled',
'priority': 7,
},
'target_keys': {
'peripheries': '2', # Double border
'priority': 8,
},
}
def _format_key_for_graphviz(key: Any) -> str:
"""Format a key to match sciline's node naming convention."""
from sciline.visualize import _format_type
return _format_type(key).name
def _get_node_style(key: Any, classifications: dict[str, set[Any]]) -> dict[str, str]:
"""
Determine the style for a node based on its classifications.
A node can belong to multiple categories. We combine styles with
higher priority categories taking precedence for conflicting attributes.
"""
applicable = []
for category, keys in classifications.items():
if key in keys:
applicable.append((_VIZ_STYLES[category]['priority'], category))
if not applicable:
return {}
# Sort by priority and merge styles
applicable.sort()
merged: dict[str, str] = {}
for _, category in applicable:
style = _VIZ_STYLES[category].copy()
style.pop('priority')
merged.update(style)
return merged
def _add_legend(
dot: 'graphviz.Digraph', *, show_static_dependencies: bool = True
) -> None:
"""Add a legend subgraph explaining the node styles."""
with dot.subgraph(name='cluster_legend') as legend:
legend.attr(label='Legend', fontsize='14', style='rounded')
legend.attr('node', shape='rectangle', width='1.5', height='0.3')
# Track first node for edge chaining
prev_node = None
if show_static_dependencies:
legend.node(
'legend_static',
'Static',
fillcolor='#e8e8e8',
style='filled',
)
prev_node = 'legend_static'
legend.node(
'legend_cached',
'Static (cached)',
fillcolor='#e8e8e8',
style='filled',
penwidth='2.5',
)
if prev_node:
legend.edge(prev_node, 'legend_cached', style='invis')
prev_node = 'legend_cached'
legend.node(
'legend_context_key',
'Context key (input)',
fillcolor='#87CEEB',
style='filled',
penwidth='2.5',
)
legend.edge(prev_node, 'legend_context_key', style='invis')
prev_node = 'legend_context_key'
legend.node(
'legend_context_dep',
'Context-dependent',
fillcolor='#d4e8f4',
style='filled',
)
legend.edge(prev_node, 'legend_context_dep', style='invis')
prev_node = 'legend_context_dep'
legend.node(
'legend_dynamic_key',
'Dynamic key (input)',
fillcolor='#90EE90',
style='filled',
penwidth='2.5',
)
legend.edge(prev_node, 'legend_dynamic_key', style='invis')
prev_node = 'legend_dynamic_key'
legend.node(
'legend_dynamic_node',
'Dynamic (per chunk)',
fillcolor='#d4f4d4',
style='filled',
)
legend.edge(prev_node, 'legend_dynamic_node', style='invis')
prev_node = 'legend_dynamic_node'
legend.node(
'legend_accumulator',
'Accumulator',
fillcolor='#FFB347',
style='filled',
shape='cylinder',
)
legend.edge(prev_node, 'legend_accumulator', style='invis')
prev_node = 'legend_accumulator'
legend.node(
'legend_finalize',
'Finalize (from accum.)',
fillcolor='#DDA0DD',
style='filled',
)
legend.edge(prev_node, 'legend_finalize', style='invis')
prev_node = 'legend_finalize'
legend.node(
'legend_target',
'Target (output)',
peripheries='2',
)
legend.edge(prev_node, 'legend_target', style='invis')