Rewrite of Sciline’s Pipeline as a Data Graph#
Introduction#
There has been a series of issues and discussions about Sciline’s Pipeline
class and its implementation.
Detect unused parameters #43.
More helpful error messages when pipeline fails to build or compute? #74.
Get missing params from a pipeline #83.
Support for graph operations #107.
Supporting different file handle types is too difficult #140.
A new approach for “parameter tables” #141.
Pruning for repeated workflow calls #148.
Current implementation#
sciline.Pipeline
is a box that can be filled with providers (a provider is callable that can compute a value) as well as values.Providers can provide generic types. The concrete types and values that such providers compute is determined later, when the pipeline is built, based on which instances of the generic outputs are requested (by other providers or by the user when building the pipeline).
Parameter tables and a special
sciline.Series
type are supported to create task graphs with duplicate branches and “reduction” or grouping operations.The pipeline is built by calling
build
on it, which returns asciline.TaskGraph
. Most of the complexity is handled in this step.
The presence of generic providers as well as parameter tables makes the implementation of the pipeline quite complex. It implies that internally a pipeline is not representable as a graph, as (1) generics lead to a task-graph structure that is in principle undefined until the pipeline is built, and (2) parameter tables lead to implicit duplication of task graph branches, which means that if Pipeline
would internally use a graph representation, adding or replacing providers would conflict
with the duplicate structure.
Proposal#
The key idea of this proposal is to introduce sciline.DataGraph
, a directed acyclic graph (DAG), which can roughly be thought of a graph representation of the pipeline. The data graph describes dependencies between data, defined via the type-hints of providers. Providers (or values) are stored as node data.
As the support for generic providers was a hindrance in the current implementation, we propose to restrict this to generic return types with constraints. This means that such a provider defines a known set of outputs, and the data graph can thus be updated with multiple nodes, each with the same provider.
The support for parameter tables would be replaced by using map
and reduce
operations on the data graph.
Whether
Pipeline
will be kept as a wrapper aroundDataGraph
or whetherDataGraph
will be the main interface is not yet clear.This has been prototyped in the
cyclebane
library. Whether this would be integrated into or used by Sciline is not yet clear.
Note on chosen implementation#
Keeping the existing Pipeline
interface, the new functionality has been added in the DataGraph
class, which has been made a base class of Pipeline
. DataGraph
is implemented as a wrapper for cyclebane.Graph
, a new and generic support library based on NetworkX.
Example 1: Basic DataGraph#
[1]:
import sciline
def f1() -> float:
return 1.0
def f2(a: float, b: str) -> int:
return int(a) + len(b)
def f3(a: int) -> list[int]:
return list(range(a))
data_graph = sciline.Pipeline([f1, f3, f2])
data_graph.visualize_data_graph(graph_attr={'rankdir': 'LR'})
[1]:
We can add a value for str
using __setitem__
, build a sciline.TaskGraph
, and compute the result:
[2]:
data_graph[str] = 'abcde'
task_graph = data_graph.get(list[int])
task_graph.compute()
[2]:
[0, 1, 2, 3, 4, 5]
[3]:
task_graph.visualize(graph_attr={'rankdir': 'LR'})
[3]:
Example 2: DataGraph with generic provider#
[4]:
from typing import TypeVar
import sciline
T = TypeVar('T', int, float) # The constraints are mandatory now!
def make_list(length: T) -> list[T]:
return [length, length + length]
def make_dict(key: list[int], value: list[float]) -> dict[int, float]:
return dict(zip(key, value, strict=True))
data_graph = sciline.Pipeline([make_list, make_dict])
data_graph.visualize_data_graph(graph_attr={'rankdir': 'LR'})
[4]:
[5]:
data_graph[int] = 3
data_graph[float] = 1.2
data_graph.get(dict[int, float]).compute()
[5]:
{3: 1.2, 6: 2.4}
Example 3: DataGraph with map and reduce#
[6]:
import sciline
def f1(x: float) -> str:
return str(x)
def f2(x: str) -> int:
return len(x)
def f3(a: int) -> list[int]:
return list(range(a))
data_graph = sciline.Pipeline([f1, f2, f3])
data_graph.visualize_data_graph(graph_attr={'rankdir': 'LR'})
[6]:
[7]:
import pandas as pd
params = pd.DataFrame({float: [0.1, 1.0, 10.0]})
params
[7]:
<class 'float'> | |
---|---|
0 | 0.1 |
1 | 1.0 |
2 | 10.0 |
[8]:
def concat_strings(*strings: str) -> str:
return '+'.join(strings)
data_graph[str] = data_graph[str].map(params).reduce(func=concat_strings)
data_graph.visualize_data_graph(graph_attr={'rankdir': 'LR'})
[8]:
[9]:
tg = data_graph.get(list[int])
tg.visualize(graph_attr={'rankdir': 'LR'})
[9]:
[10]:
tg.compute()
[10]:
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
Criticism#
The map
and reduce
operations kind of break out of the core idea of Sciline. It is some sort of intermediate state between declarative and imperative programming (as in Sciline and Dask, respectively). The example above may be re-imagined as something along the lines of
# Assuming with_value returns a copy of the graph with the value set
branches = map(data_graph[str].with_value, params[float])
# Not actually `dask.delayed`, but you get the idea
data_graph[str] = dask.delayed(concat_strings)(branches)
The graph could then be optimized to remove duplicate nodes (part of data_graph[str]
, but not a descendant of float
).