Replacing providers#
This example shows how to replace a provider in the pipeline using the Pipeline.insert
method.
Setup#
Lets look at a situation where we have some “raw” data files and the workflow consists of three steps * loading the raw data * cleaning the raw data * computing a sum of the cleaned data.
[1]:
from typing import NewType
import sciline
Filename = NewType('Filename', str)
RawData = NewType('RawData', list)
CleanData = NewType('CleanData', list)
Result = NewType('Result', list)
filesystem = {'raw.txt': list(map(str, range(10)))}
def load(filename: Filename) -> RawData:
"""Load the data from the filename."""
data = filesystem[filename]
return RawData(data)
def clean(raw_data: RawData) -> CleanData:
"""Clean the data, convert from str."""
return CleanData(list(map(float, raw_data)))
def process(clean_data: CleanData) -> Result:
"""Compute the sum of the clean data."""
return Result(sum(clean_data))
pipeline = sciline.Pipeline(
[
load,
clean,
process,
],
params={
Filename: 'raw.txt',
},
)
pipeline
[1]:
Name | Value | Source |
---|---|---|
CleanData |
clean__main__.clean | |
Filename | raw.txt | |
RawData |
load__main__.load | |
Result |
process__main__.process |
Replacing a provider using Pipeline.insert
#
Let’s say the clean
provider doesn’t do all the preprocessing that we want it to do, we also want to remove either the odd or even numbers before processing:
[2]:
from typing import NewType
Target = NewType('Target', str)
def clean_and_remove_some(raw_data: RawData, target: Target) -> CleanData:
if target == 'odd':
return [n for n in map(float, raw_data) if n % 2 == 1]
if target == 'even':
return [n for n in map(float, raw_data) if n % 2 == 0]
raise ValueError
To replace the old CleanData
provider we need to use Pipeline.insert
:
[3]:
pipeline.insert(clean_and_remove_some)
pipeline[Target] = 'odd'
[4]:
pipeline
[4]:
Name | Value | Source |
---|---|---|
CleanData |
clean_and_remove_some__main__.clean_and_remove_some | |
Filename | raw.txt | |
RawData |
load__main__.load | |
Result |
process__main__.process | |
Target | odd |
Now if we select the Result
we see that the new provider will be used in the computation:
[5]:
pipeline.get(Result)
[5]:
Output keys:
ResultScheduler:
NaiveScheduler()Input keys:
- Filename
- Target
[6]:
pipeline.compute(Result)
[6]:
25.0