Replacing providers#
This example shows how to replace a provider in the pipeline using the Pipeline.insert
method.
Setup#
Lets look at a situation where we have some “raw” data files and the workflow consists of three steps
loading the raw data
cleaning the raw data
computing a sum of the cleaned data.
[1]:
from typing import NewType
import sciline
Filename = NewType('Filename', str)
RawData = NewType('RawData', list)
CleanData = NewType('CleanData', list)
Result = NewType('Result', list)
filesystem = {'raw.txt': list(map(str, range(10)))}
def load(filename: Filename) -> RawData:
"""Load the data from the filename."""
data = filesystem[filename]
return RawData(data)
def clean(raw_data: RawData) -> CleanData:
"""Clean the data, convert from str."""
return CleanData(list(map(float, raw_data)))
def process(clean_data: CleanData) -> Result:
"""Compute the sum of the clean data."""
return Result(sum(clean_data))
pipeline = sciline.Pipeline(
[
load,
clean,
process,
],
params={
Filename: 'raw.txt',
},
)
pipeline
[1]:
Name | Value | Source |
---|---|---|
CleanData |
clean__main__.clean | |
Filename | raw.txt | |
RawData |
load__main__.load | |
Result |
process__main__.process |
Replacing a provider using Pipeline.insert
#
Let’s say the clean
provider doesn’t do all the preprocessing that we want it to do, we also want to remove either the odd or even numbers before processing:
[2]:
from typing import NewType
Target = NewType('Target', str)
def clean_and_remove_some(raw_data: RawData, target: Target) -> CleanData:
if target == 'odd':
return [n for n in map(float, raw_data) if n % 2 == 1]
if target == 'even':
return [n for n in map(float, raw_data) if n % 2 == 0]
raise ValueError
To replace the old CleanData
provider we need to use Pipeline.insert
:
[3]:
pipeline.insert(clean_and_remove_some)
pipeline[Target] = 'odd'
[4]:
pipeline
[4]:
Name | Value | Source |
---|---|---|
CleanData |
clean_and_remove_some__main__.clean_and_remove_some | |
Filename | raw.txt | |
RawData |
load__main__.load | |
Result |
process__main__.process | |
Target | odd |
Now if we select the Result
we see that the new provider will be used in the computation:
[5]:
pipeline.get(Result)
[5]:
Output keys:
ResultScheduler:
NaiveScheduler()Input keys:
- Filename
- Target
[6]:
pipeline.compute(Result)
[6]:
25.0
Adding a custom intermediate step#
Sometimes, instead of replacing an existing provider with a new one, we wish to simply insert a new custom step in the pipeline.
Say in our example, we wish to add an extra correction to our data between CleanData
and Result
.
[7]:
pipeline.visualize(Result)
[7]:
Of course, the ideal way to do this would be to change the input type of process
, and insert a provider that takes in CleanData
and returns the new type.
However, we do not always have access to the provider code (when pipelines are pre-built and imported from a library), and changing the input type of process
is thus not trivial/possible.
To be able to experiment with additional steps in a notebook, a simple method is to compute the intermediate result, modify it in the notebook, and set the value back onto the pipeline.
In our case, we wish to experiment computing the sum of the squares.
[8]:
# First compute the intermediate result
clean_data = pipeline.compute(CleanData)
clean_data
[8]:
[1.0, 3.0, 5.0, 7.0, 9.0]
[9]:
def square(values):
return [x**2 for x in values]
# Transform the clean_data
data_squared = square(clean_data)
data_squared
[9]:
[1.0, 9.0, 25.0, 49.0, 81.0]
Before setting the new values back into the pipeline, it is highly recommended to make a copy of the pipeline, to avoid corrupting the original data and/or pipeline.
[10]:
# Make a copy of the workflow
new_pl = pipeline.copy()
# Pretend that CleanData is now the squared values
new_pl[CleanData] = data_squared
new_pl.compute(Result)
[10]:
165.0