sciline.Pipeline#

class sciline.Pipeline(providers=None, *, params=None, constraints=None)[source]#

A container for providers that can be assembled into a task graph.

__init__(providers=None, *, params=None, constraints=None)[source]#

Setup a Pipeline from a list providers

Parameters:
  • providers (Optional[Iterable[Callable[..., Any] | Provider]], default: None) – List of callable providers. Each provides its return value. Their arguments and return value must be annotated with type hints.

  • params (Optional[dict[type[Any], Any]], default: None) – Dictionary of concrete values to provide for types.

  • constraints (Optional[Mapping[TypeVar(TypeVar, bound= <member ‘__bound__’ of ‘TypeVar’ objects>, covariant=<member ‘__covariant__’ of ‘TypeVar’ objects>, contravariant=<member ‘__contravariant__’ of ‘TypeVar’ objects>), Iterable[type]]], default: None) – Mapping of type variables to constraints for those type variables. For each entry, the corresponding type variable will be constrained to the given types. This overrides the constraints from the definition of the type variable. The new constraints must be a subset of the constraints in the type variable definition.

Methods

__init__([providers, params, constraints])

Setup a Pipeline from a list providers

bind_and_call(fns, /)

Call the given functions with arguments provided by the pipeline.

compute(tp[, reporter])

Compute result for the given keys.

copy()

get(keys, *[, scheduler, handler, max_depth])

Return a TaskGraph for the given keys.

insert(provider, /)

Insert a callable into the graph that provides its return value.

map(node_values)

Map the graph over given node values.

output_keys()

Returns the keys that are not inputs to any other providers.

reduce(*, func, **kwargs)

Reduce the outputs of a mapped graph into a single value and provider.

to_networkx()

visualize([tp, compact, mode, ...])

Return a graphviz Digraph object representing the graph for the given keys.

visualize_data_graph(**kwargs)

Attributes

index_names

Names of the indices (dimensions) of the graph.

indices

Names and values of the indices of the graph.

underlying_graph

The underlying NetworkX graph.

__getitem__(key)#

Return the subgraph that computes the given key.

Return type:

TypeVar(T, bound= DataGraph)

__setitem__(key, value)#

Provide a concrete value for a type.

Parameters:
  • key (type) – Type to provide a value for.

  • value (Union[DataGraph, Any]) – Concrete value to provide.

Return type:

None

bind_and_call(fns, /)[source]#

Call the given functions with arguments provided by the pipeline.

Parameters:

fns (Callable[..., Any] | Iterable[Callable[..., Any]]) –

Functions to call. The pipeline will provide all arguments based on the function’s type hints.

If this is a single callable, it is called directly. Otherwise, bind_and_call will iterate over it and call all functions. If will in either case call Pipeline.compute() only once.

Returns:

Any – The return values of the functions in the same order as the functions. If only one function is passed, its return value is not wrapped in a tuple.

compute(tp, reporter=None, **kwargs)[source]#

Compute result for the given keys.

Equivalent to self.get(tp).compute().

Parameters:
  • tp (type | Iterable[type] | ‘UnionType) – Type to compute the result for. Can be a single type or an iterable of types.

  • reporter (Reporter | None, default: None) – Optional reporter to track progress of this computation.

  • kwargs (Any) – Keyword arguments passed to the .get() method.

Return type:

Any

get(keys, *, scheduler=None, handler=None, max_depth=4)[source]#

Return a TaskGraph for the given keys.

Parameters:
  • keys (type | Iterable[type] | ‘UnionType) – Type to compute the result for. Can be a single type or an iterable of types.

  • scheduler (Scheduler | None, default: None) – Optional scheduler to use for computing the result. If not given, a NaiveScheduler is used if dask is not installed, otherwise dask’s threaded scheduler is used.

  • handler (ErrorHandler | None, default: None) – Handler for unsatisfied requirements. If not provided, HandleAsBuildTimeException is used, which raises an exception. During development and debugging it can be helpful to use a handler that raises an exception only when the graph is computed. This can be achieved by passing HandleAsComputeTimeException as the handler.

  • max_depth (int, default: 4) – Maximum depth to show in the dependency tree when reporting errors.

Return type:

TaskGraph

property index_names: tuple[Hashable, ...]#

Names of the indices (dimensions) of the graph.

property indices: dict[Hashable, Iterable[Hashable]]#

Names and values of the indices of the graph.

insert(provider, /)#

Insert a callable into the graph that provides its return value.

Parameters:

provider (Callable[..., Any] | Provider) – Either a callable that provides its return value. Its arguments and return value must be annotated with type hints. Or a Provider object that has been constructed from such a callable.

Return type:

None

map(node_values)#

Map the graph over given node values.

Creates a new graph where given nodes and their dependents are duplicated for each given value and values are assigned to the given nodes.

Parameters:

node_values (dict[type, Any]) – Dictionary mapping nodes keys to collections of values.

Returns:

TypeVar(T, bound= DataGraph) – A new graph with mapped nodes.

output_keys()[source]#

Returns the keys that are not inputs to any other providers.

Return type:

tuple[type, ...]

reduce(*, func, **kwargs)#

Reduce the outputs of a mapped graph into a single value and provider.

Parameters:
  • func (Callable[..., Any]) – Function that takes the values to reduce and returns a single value. This function is passed as many arguments as there are values to reduce.

  • kwargs (Any) – Forwarded to cyclebane.Graph.reduce().

Returns:

TypeVar(T, bound= DataGraph) – A new graph with a new node that depends on all sink nodes of the input graph and returns the output of func.

property underlying_graph: DiGraph#

The underlying NetworkX graph.

visualize(tp=None, compact=False, mode='data', cluster_generics=True, cluster_color='#f0f0ff', **kwargs)[source]#

Return a graphviz Digraph object representing the graph for the given keys.

Equivalent to self.get(tp).visualize().

Parameters:
  • tp (Union[type, Iterable[type], None], default: None) – Type to visualize the graph for. Can be a single type or an iterable of types.

  • compact (bool, default: False) – If True, parameter-table-dependent branches are collapsed into a single copy of the branch. Recommended for large graphs with long parameter tables.

  • mode (Literal['data', 'task', 'both'], default: 'data') – If ‘data’, only data nodes are shown. If ‘task’, only task nodes and input data nodes are shown. If ‘both’, all nodes are shown.

  • cluster_generics (bool, default: True) – If True, generic products are grouped into clusters.

  • cluster_color (str | None, default: '#f0f0ff') – Background color of clusters. If None, clusters are dotted.

  • kwargs (Any) – Keyword arguments passed to graphviz.Digraph.

Return type:

Digraph