Rewrite of Sciline’s Pipeline as a Data Graph#

Introduction#

There has been a series of issues and discussions about Sciline’s Pipeline class and its implementation.

  • Detect unused parameters #43.

  • More helpful error messages when pipeline fails to build or compute? #74.

  • Get missing params from a pipeline #83.

  • Support for graph operations #107.

  • Supporting different file handle types is too difficult #140.

  • A new approach for “parameter tables” #141.

  • Pruning for repeated workflow calls #148.

Current implementation#

  • sciline.Pipeline is a box that can be filled with providers (a provider is callable that can compute a value) as well as values.

  • Providers can provide generic types. The concrete types and values that such providers compute is determined later, when the pipeline is built, based on which instances of the generic outputs are requested (by other providers or by the user when building the pipeline).

  • Parameter tables and a special sciline.Series type are supported to create task graphs with duplicate branches and “reduction” or grouping operations.

  • The pipeline is built by calling build on it, which returns a sciline.TaskGraph. Most of the complexity is handled in this step.

The presence of generic providers as well as parameter tables makes the implementation of the pipeline quite complex. It implies that internally a pipeline is not representable as a graph, as (1) generics lead to a task-graph structure that is in principle undefined until the pipeline is built, and (2) parameter tables lead to implicit duplication of task graph branches, which means that if Pipeline would internally use a graph representation, adding or replacing providers would conflict with the duplicate structure.

Proposal#

The key idea of this proposal is to introduce sciline.DataGraph, a directed acyclic graph (DAG), which can roughly be thought of a graph representation of the pipeline. The data graph describes dependencies between data, defined via the type-hints of providers. Providers (or values) are stored as node data.

As the support for generic providers was a hindrance in the current implementation, we propose to restrict this to generic return types with constraints. This means that such a provider defines a known set of outputs, and the data graph can thus be updated with multiple nodes, each with the same provider.

The support for parameter tables would be replaced by using map and reduce operations on the data graph.

  1. Whether Pipeline will be kept as a wrapper around DataGraph or whether DataGraph will be the main interface is not yet clear.

  2. This has been prototyped in the cyclebane library. Whether this would be integrated into or used by Sciline is not yet clear.

Note on chosen implementation#

Keeping the existing Pipeline interface, the new functionality has been added in the DataGraph class, which has been made a base class of Pipeline. DataGraph is implemented as a wrapper for cyclebane.Graph, a new and generic support library based on NetworkX.

Example 1: Basic DataGraph#

[1]:
import sciline


def f1() -> float:
    return 1.0


def f2(a: float, b: str) -> int:
    return int(a) + len(b)


def f3(a: int) -> list[int]:
    return list(range(a))


data_graph = sciline.Pipeline([f1, f3, f2])
data_graph.visualize_data_graph(graph_attr={'rankdir': 'LR'})
[1]:
../../_images/developer_architecture-and-design_rewrite_3_0.svg

We can add a value for str using __setitem__, build a sciline.TaskGraph, and compute the result:

[2]:
data_graph[str] = 'abcde'
task_graph = data_graph.get(list[int])
task_graph.compute()
[2]:
[0, 1, 2, 3, 4, 5]
[3]:
task_graph.visualize(graph_attr={'rankdir': 'LR'})
[3]:
../../_images/developer_architecture-and-design_rewrite_6_0.svg

Example 2: DataGraph with generic provider#

[4]:
from typing import TypeVar
import sciline

T = TypeVar('T', int, float)  # The constraints are mandatory now!


def make_list(length: T) -> list[T]:
    return [length, length + length]


def make_dict(key: list[int], value: list[float]) -> dict[int, float]:
    return dict(zip(key, value, strict=True))


data_graph = sciline.Pipeline([make_list, make_dict])
data_graph.visualize_data_graph(graph_attr={'rankdir': 'LR'})
[4]:
../../_images/developer_architecture-and-design_rewrite_8_0.svg
[5]:
data_graph[int] = 3
data_graph[float] = 1.2
data_graph.get(dict[int, float]).compute()
[5]:
{3: 1.2, 6: 2.4}

Example 3: DataGraph with map and reduce#

[6]:
import sciline


def f1(x: float) -> str:
    return str(x)


def f2(x: str) -> int:
    return len(x)


def f3(a: int) -> list[int]:
    return list(range(a))


data_graph = sciline.Pipeline([f1, f2, f3])
data_graph.visualize_data_graph(graph_attr={'rankdir': 'LR'})
[6]:
../../_images/developer_architecture-and-design_rewrite_11_0.svg
[7]:
import pandas as pd

params = pd.DataFrame({float: [0.1, 1.0, 10.0]})
params
[7]:
<class 'float'>
0 0.1
1 1.0
2 10.0
[8]:
def concat_strings(*strings: str) -> str:
    return '+'.join(strings)


data_graph[str] = data_graph[str].map(params).reduce(func=concat_strings)
data_graph.visualize_data_graph(graph_attr={'rankdir': 'LR'})
[8]:
../../_images/developer_architecture-and-design_rewrite_13_0.svg
[9]:
tg = data_graph.get(list[int])
tg.visualize(graph_attr={'rankdir': 'LR'})
[9]:
../../_images/developer_architecture-and-design_rewrite_14_0.svg
[10]:
tg.compute()
[10]:
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]

Criticism#

The map and reduce operations kind of break out of the core idea of Sciline. It is some sort of intermediate state between declarative and imperative programming (as in Sciline and Dask, respectively). The example above may be re-imagined as something along the lines of

# Assuming with_value returns a copy of the graph with the value set
branches = map(data_graph[str].with_value, params[float])
# Not actually `dask.delayed`, but you get the idea
data_graph[str] = dask.delayed(concat_strings)(branches)

The graph could then be optimized to remove duplicate nodes (part of data_graph[str], but not a descendant of float).