Replacing providers#

This example shows how to replace a provider in the pipeline using the Pipeline.insert method.

Setup#

Lets look at a situation where we have some “raw” data files and the workflow consists of three steps

  • loading the raw data

  • cleaning the raw data

  • computing a sum of the cleaned data.

[1]:
from typing import NewType
import sciline

Filename = NewType('Filename', str)
RawData = NewType('RawData', list)
CleanData = NewType('CleanData', list)
Result = NewType('Result', list)

filesystem = {'raw.txt': list(map(str, range(10)))}


def load(filename: Filename) -> RawData:
    """Load the data from the filename."""
    data = filesystem[filename]
    return RawData(data)


def clean(raw_data: RawData) -> CleanData:
    """Clean the data, convert from str."""
    return CleanData(list(map(float, raw_data)))


def process(clean_data: CleanData) -> Result:
    """Compute the sum of the clean data."""
    return Result(sum(clean_data))


pipeline = sciline.Pipeline(
    [
        load,
        clean,
        process,
    ],
    params={
        Filename: 'raw.txt',
    },
)
pipeline
[1]:
Name Value Source
CleanData
clean __main__.clean
Filename raw.txt
RawData
load __main__.load
Result
process __main__.process

Replacing a provider using Pipeline.insert#

Let’s say the clean provider doesn’t do all the preprocessing that we want it to do, we also want to remove either the odd or even numbers before processing:

[2]:
from typing import NewType

Target = NewType('Target', str)


def clean_and_remove_some(raw_data: RawData, target: Target) -> CleanData:
    if target == 'odd':
        return [n for n in map(float, raw_data) if n % 2 == 1]
    if target == 'even':
        return [n for n in map(float, raw_data) if n % 2 == 0]
    raise ValueError

To replace the old CleanData provider we need to use Pipeline.insert:

[3]:
pipeline.insert(clean_and_remove_some)
pipeline[Target] = 'odd'
[4]:
pipeline
[4]:
Name Value Source
CleanData
clean_and_remove_some __main__.clean_and_remove_some
Filename raw.txt
RawData
load __main__.load
Result
process __main__.process
Target odd

Now if we select the Result we see that the new provider will be used in the computation:

[5]:
pipeline.get(Result)
[5]:
Output keys:
Result
Scheduler:
NaiveScheduler()
Input keys:
  • Filename
  • Target
[6]:
pipeline.compute(Result)
[6]:
25.0

Adding a custom intermediate step#

Sometimes, instead of replacing an existing provider with a new one, we wish to simply insert a new custom step in the pipeline.

Say in our example, we wish to add an extra correction to our data between CleanData and Result.

[7]:
pipeline.visualize(Result)
[7]:
../_images/recipes_replacing-providers_13_0.svg

Of course, the ideal way to do this would be to change the input type of process, and insert a provider that takes in CleanData and returns the new type.

However, we do not always have access to the provider code (when pipelines are pre-built and imported from a library), and changing the input type of process is thus not trivial/possible.

To be able to experiment with additional steps in a notebook, a simple method is to compute the intermediate result, modify it in the notebook, and set the value back onto the pipeline.

In our case, we wish to experiment computing the sum of the squares.

[8]:
# First compute the intermediate result
clean_data = pipeline.compute(CleanData)
clean_data
[8]:
[1.0, 3.0, 5.0, 7.0, 9.0]
[9]:
def square(values):
    return [x**2 for x in values]

# Transform the clean_data
data_squared = square(clean_data)
data_squared
[9]:
[1.0, 9.0, 25.0, 49.0, 81.0]

Before setting the new values back into the pipeline, it is highly recommended to make a copy of the pipeline, to avoid corrupting the original data and/or pipeline.

[10]:
# Make a copy of the workflow
new_pl = pipeline.copy()

# Pretend that CleanData is now the squared values
new_pl[CleanData] = data_squared

new_pl.compute(Result)
[10]:
165.0