Replacing providers#

This example shows how to replace a provider in the pipeline using the Pipeline.insert method.

Setup#

Lets look at a situation where we have some “raw” data files and the workflow consists of three steps * loading the raw data * cleaning the raw data * computing a sum of the cleaned data.

[1]:
from typing import NewType
import sciline

Filename = NewType('Filename', str)
RawData = NewType('RawData', list)
CleanData = NewType('CleanData', list)
Result = NewType('Result', list)

filesystem = {'raw.txt': list(map(str, range(10)))}


def load(filename: Filename) -> RawData:
    """Load the data from the filename."""
    data = filesystem[filename]
    return RawData(data)


def clean(raw_data: RawData) -> CleanData:
    """Clean the data, convert from str."""
    return CleanData(list(map(float, raw_data)))


def process(clean_data: CleanData) -> Result:
    """Compute the sum of the clean data."""
    return Result(sum(clean_data))


pipeline = sciline.Pipeline(
    [
        load,
        clean,
        process,
    ],
    params={
        Filename: 'raw.txt',
    },
)
pipeline
[1]:
Name Value Source
CleanData
clean __main__.clean
Filename raw.txt
RawData
load __main__.load
Result
process __main__.process

Replacing a provider using Pipeline.insert#

Let’s say the clean provider doesn’t do all the preprocessing that we want it to do, we also want to remove either the odd or even numbers before processing:

[2]:
from typing import NewType

Target = NewType('Target', str)


def clean_and_remove_some(raw_data: RawData, target: Target) -> CleanData:
    if target == 'odd':
        return [n for n in map(float, raw_data) if n % 2 == 1]
    if target == 'even':
        return [n for n in map(float, raw_data) if n % 2 == 0]
    raise ValueError

To replace the old CleanData provider we need to use Pipeline.insert:

[3]:
pipeline.insert(clean_and_remove_some)
pipeline[Target] = 'odd'
[4]:
pipeline
[4]:
Name Value Source
CleanData
clean_and_remove_some __main__.clean_and_remove_some
Filename raw.txt
RawData
load __main__.load
Result
process __main__.process
Target odd

Now if we select the Result we see that the new provider will be used in the computation:

[5]:
pipeline.get(Result)
[5]:
Output keys:
Result
Scheduler:
NaiveScheduler()
Input keys:
  • Filename
  • Target
[6]:
pipeline.compute(Result)
[6]:
25.0