Continue from intermediate results#
It is a common need to be able to continue the pipeline from some intermediate result computed earlier.
TLDR
# Pipeline: Input -> CleanData -> Result
data = pipeline.compute(CleanData)
pipeline[CleanData] = data
result = pipeline.compute(Result)
Setup#
Lets look at a situation where we have some “raw” data files and the workflow consists of three steps * loading the raw data * cleaning the raw data * computing a sum of the cleaned data.
[1]:
from typing import NewType
Filename = NewType('Filename', str)
RawData = NewType('RawData', list)
CleanData = NewType('CleanData', list)
Result = NewType('Result', list)
filesystem = {'raw.txt': list(map(str, range(10)))}
def load(filename: Filename) -> RawData:
"""Load the data from the filename."""
data = filesystem[filename]
return RawData(data)
def clean(raw_data: RawData) -> CleanData:
"""Clean the data, convert from str."""
return CleanData(list(map(float, raw_data)))
def process(clean_data: CleanData) -> Result:
"""Compute the sum of the clean data."""
return Result(sum(clean_data))
[2]:
import sciline
pipeline = sciline.Pipeline(
[load, clean, process,],
params={ Filename: 'raw.txt', })
pipeline
[2]:
Name | Value | Source |
---|---|---|
CleanData |
clean__main__.clean | |
Filename | raw.txt | |
RawData |
load__main__.load | |
Result |
process__main__.process |
Setting intermediate results#
Given a pipeline, we may want to compute an intermediate result for inspection:
[3]:
data = pipeline.compute(CleanData)
If later on we wish to compute a result further down the pipeline (derived from CleanData
), this would cause potentially costly re-computation of CleanData
, since Sciline does not perform any caching:
[4]:
result = pipeline.compute(Result) # re-computes CleanData
To avoid this, we can use Pipeline.__setitem__
to replace the provider of CleanData
by the previously computed data:
[5]:
pipeline[CleanData] = data
result = pipeline.compute(Result)