Continue from intermediate results#

It is a common need to be able to continue the pipeline from some intermediate result computed earlier.

TLDR

# Pipeline: Input -> CleanData -> Result
data = pipeline.compute(CleanData)
pipeline[CleanData] = data
result = pipeline.compute(Result)

Setup#

Lets look at a situation where we have some “raw” data files and the workflow consists of three steps * loading the raw data * cleaning the raw data * computing a sum of the cleaned data.

[1]:
from typing import NewType

Filename = NewType('Filename', str)
RawData = NewType('RawData', list)
CleanData = NewType('CleanData', list)
Result = NewType('Result', list)

filesystem = {'raw.txt': list(map(str, range(10)))}

def load(filename: Filename) -> RawData:
    """Load the data from the filename."""
    data = filesystem[filename]
    return RawData(data)

def clean(raw_data: RawData) -> CleanData:
    """Clean the data, convert from str."""
    return CleanData(list(map(float, raw_data)))

def process(clean_data: CleanData) -> Result:
    """Compute the sum of the clean data."""
    return Result(sum(clean_data))

[2]:
import sciline

pipeline = sciline.Pipeline(
    [load, clean, process,],
    params={ Filename: 'raw.txt', })
pipeline
[2]:
Name Value Source
CleanData
clean __main__.clean
Filename raw.txt
RawData
load __main__.load
Result
process __main__.process

Setting intermediate results#

Given a pipeline, we may want to compute an intermediate result for inspection:

[3]:
data = pipeline.compute(CleanData)

If later on we wish to compute a result further down the pipeline (derived from CleanData), this would cause potentially costly re-computation of CleanData, since Sciline does not perform any caching:

[4]:
result = pipeline.compute(Result)  # re-computes CleanData

To avoid this, we can use Pipeline.__setitem__ to replace the provider of CleanData by the previously computed data:

[5]:
pipeline[CleanData] = data
result = pipeline.compute(Result)