Workflow - Chunk by Chunk#

In this example, we will process McStas events chunk by chunk, panel by panel.

Build Base Workflow#

[1]:
from ess.nmx.mcstas import McStasWorkflow
from ess.nmx.data import small_mcstas_3_sample
from ess.nmx.types import *

wf = McStasWorkflow()
# Replace with the path to your own file
wf[FilePath] = small_mcstas_3_sample()
wf[MaximumCounts] = 10_000
wf[TimeBinSteps] = 50
wf.visualize(NMXReducedDataGroup, graph_attr={"rankdir": "TD"}, compact=True)
[1]:
../_images/user-guide_workflow_chunk_2_0.svg

Compute Raw Data Metadata#

time-of-flight coordinate and McStasWeight2CountScaleFactor should not be different from chunk to chunk.

Therefore we need to compute TimeBinStep and McStasWeight2CoutScaleFactor before we compute NMXReducedData.

It can be done by ess.reduce.streaming.StreamProcessor.

In this example, MinimumTimeOfArrival, MaximumTimeOfArrival and MaximumProbability will be renewed every time a chunk is added to the streaming processor.

(Min/Max)Accumulator remembers the previous minimum/maximum value and compute new minimum/maximum value with the new chunk.

raw_event_data_chunk_generator yields a chunk of raw event probability from mcstas h5 file.

This example below process the data chunk by chunk with size: CHUNK_SIZE.

[2]:
from functools import partial
from ess.reduce.streaming import StreamProcessor, MaxAccumulator, MinAccumulator

# Stream processor building helper
scalefactor_stream_processor = partial(
    StreamProcessor,
    dynamic_keys=(RawEventProbability,),
    target_keys=(NMXRawDataMetadata,),
    accumulators={
        MaximumProbability: MaxAccumulator,
        MaximumTimeOfArrival: MaxAccumulator,
        MinimumTimeOfArrival: MinAccumulator,
    },
)
metadata_wf = wf.copy()
[3]:
metadata_wf.visualize(NMXRawDataMetadata, graph_attr={"rankdir": "TD"}, compact=True)
[3]:
../_images/user-guide_workflow_chunk_5_0.svg
[4]:
from ess.nmx.types import DetectorName
from ess.nmx.mcstas.load import (
    raw_event_data_chunk_generator,
    mcstas_weight_to_probability_scalefactor,
)
from ess.nmx.streaming import calculate_number_of_chunks
from ipywidgets import IntProgress

CHUNK_SIZE = 10  # Number of event rows to process at once
# Increase this number to speed up the processing
NUM_DETECTORS = 3

# Loop over the detectors
file_path = metadata_wf.compute(FilePath)
raw_data_metadatas = {}

for detector_i in range(0, NUM_DETECTORS):
    temp_wf = metadata_wf.copy()
    temp_wf[DetectorIndex] = detector_i
    detector_name = temp_wf.compute(DetectorName)
    max_chunk_id = calculate_number_of_chunks(
        temp_wf.compute(FilePath), detector_name=detector_name, chunk_size=CHUNK_SIZE
    )
    cur_detector_progress_bar = IntProgress(
        min=0, max=max_chunk_id, description=f"Detector {detector_i}"
    )
    display(cur_detector_progress_bar)

    # Build the stream processor
    processor = scalefactor_stream_processor(temp_wf)
    for da in raw_event_data_chunk_generator(
        file_path=file_path, detector_name=detector_name, chunk_size=CHUNK_SIZE
    ):
        if any(da.sizes.values()) == 0:
            continue
        else:
            results = processor.add_chunk({RawEventProbability: da})
        cur_detector_progress_bar.value += 1
    display(results[NMXRawDataMetadata])
    raw_data_metadatas[detector_i] = results[NMXRawDataMetadata]

# We take the min/maximum values of the scale factor
min_toa = min(dg['min_toa'] for dg in raw_data_metadatas.values())
max_toa = max(dg['max_toa'] for dg in raw_data_metadatas.values())
max_probability = max(dg['max_probability'] for dg in raw_data_metadatas.values())

toa_bin_edges = sc.linspace(dim='t', start=min_toa, stop=max_toa, num=51)
scale_factor = mcstas_weight_to_probability_scalefactor(
    max_counts=wf.compute(MaximumCounts), max_probability=max_probability
)
/tmp/ipykernel_3262/133556714.py:31: UserWarning: The chunk size may be too small < 10_000_000.
Consider increasing the chunk size for better performance.
Hint: NMX typically expect ~10^8 bins as reduced data.
  for da in raw_event_data_chunk_generator(
  • min_toa
    scipp
    Variable
    ()
    float64
    s
    0.09725517074080177
  • max_toa
    scipp
    Variable
    ()
    float64
    s
    0.14286252966923943
  • max_probability
    scipp
    Variable
    ()
    float64
    counts
    14.66504429903704
  • min_toa
    scipp
    Variable
    ()
    float64
    s
    0.09934708174034394
  • max_toa
    scipp
    Variable
    ()
    float64
    s
    0.14327614195731983
  • max_probability
    scipp
    Variable
    ()
    float64
    counts
    14.142899760006335
  • min_toa
    scipp
    Variable
    ()
    float64
    s
    0.0993372097517347
  • max_toa
    scipp
    Variable
    ()
    float64
    s
    0.14527583039391995
  • max_probability
    scipp
    Variable
    ()
    float64
    counts
    14.934219330171954

Compute Final Output#

Now with the scale_factor: McStasWeight2CountScaleFactor, we can compute the final output chunk by chunk.

We will also compute static parameters in advance so that stream processor does not compute them every time another chunk is added.

[5]:
from ess.nmx.mcstas.xml import McStasInstrument

final_wf = wf.copy()
# Set the scale factor and time bin edges
final_wf[McStasWeight2CountScaleFactor] = scale_factor
final_wf[TimeBinSteps] = toa_bin_edges

# Set the crystal rotation manually for now ...
final_wf[CrystalRotation] = sc.vector([0, 0, 0.0], unit='deg')
# Set static info
final_wf[McStasInstrument] = wf.compute(McStasInstrument)
final_wf.visualize(NMXReducedDataGroup, compact=True)
[5]:
../_images/user-guide_workflow_chunk_8_0.svg
[6]:
from functools import partial
from ess.reduce.streaming import StreamProcessor, EternalAccumulator

# Stream processor building helper
final_stream_processor = partial(
    StreamProcessor,
    dynamic_keys=(RawEventProbability,),
    target_keys=(NMXReducedDataGroup,),
    accumulators={NMXReducedCounts: EternalAccumulator},
)
[7]:
from ess.nmx.types import DetectorName
from ess.nmx.mcstas.load import raw_event_data_chunk_generator
from ess.nmx.streaming import calculate_number_of_chunks
from ipywidgets import IntProgress

CHUNK_SIZE = 10  # Number of event rows to process at once
# Increase this number to speed up the processing
NUM_DETECTORS = 3
final_wf[TimeBinSteps] = sc.linspace(
    't', 0.1, 0.15, 51, unit='s'
)  # Time bin edges can be calculated from the data
# But streaming processor only sees the first chunk
# So we need to set it manually before processing the first chunk
# It is a bit cumbersome since we have to know the range of the time bins in advance

# Loop over the detectors
file_path = final_wf.compute(FilePath)
for detector_i in range(0, NUM_DETECTORS):
    temp_wf = final_wf.copy()
    temp_wf[DetectorIndex] = detector_i
    # First compute static information
    detector_name = temp_wf.compute(DetectorName)
    temp_wf[PixelIds] = temp_wf.compute(PixelIds)
    max_chunk_id = calculate_number_of_chunks(
        file_path, detector_name=detector_name, chunk_size=CHUNK_SIZE
    )
    cur_detector_progress_bar = IntProgress(
        min=0, max=max_chunk_id, description=f"Detector {detector_i}"
    )
    display(cur_detector_progress_bar)

    # Build the stream processor
    processor = final_stream_processor(temp_wf)
    for da in raw_event_data_chunk_generator(
        file_path=file_path, detector_name=detector_name, chunk_size=CHUNK_SIZE
    ):
        if any(da.sizes.values()) == 0:
            continue
        else:
            results = processor.add_chunk({RawEventProbability: da})
        cur_detector_progress_bar.value += 1

    result = results[NMXReducedDataGroup]
    display(result)

/tmp/ipykernel_3262/116780297.py:34: UserWarning: The chunk size may be too small < 10_000_000.
Consider increasing the chunk size for better performance.
Hint: NMX typically expect ~10^8 bins as reduced data.
  for da in raw_event_data_chunk_generator(
  • counts
    scipp
    DataArray
    (id: 1638400, t: 50)
    float64
    counts
    0.0, 0.0, ..., 0.0, 0.0
  • proton_charge
    scipp
    Variable
    ()
    float64
    counts
    10.316198883013373
  • crystal_rotation
    scipp
    Variable
    ()
    vector3
    deg
    [0. 0. 0.]
  • sample_position
    scipp
    Variable
    ()
    vector3
    m
    [0. 0. 0.]
  • source_position
    scipp
    Variable
    ()
    vector3
    m
    [ -0.53123 0. -157.405 ]
  • sample_name
    scipp
    Variable
    ()
    string
    sampleMantid
  • fast_axis
    scipp
    Variable
    ()
    vector3
    𝟙
    [ 0.999986 0. -0.00529148]
  • slow_axis
    scipp
    Variable
    ()
    vector3
    𝟙
    [0. 1. 0.]
  • origin_position
    scipp
    Variable
    ()
    vector3
    m
    [-0.248454 -0.25 0.292 ]
  • position
    scipp
    Variable
    (id: 1638400)
    vector3
    m
    [-0.248454 -0.25 0.292 ], [-0.24805401 -0.25 0.29199788], ..., [0.26273884 0.2616 0.28929499], [0.26313884 0.2616 0.28929288]
  • detector_shape
    scipp
    Variable
    ()
    PyObject
    (1280, 1280)
  • x_pixel_size
    scipp
    Variable
    ()
    float64
    m
    0.0004
  • y_pixel_size
    scipp
    Variable
    ()
    float64
    m
    0.0004
  • detector_name
    scipp
    Variable
    ()
    string
    nD_Mantid_0
  • counts
    scipp
    DataArray
    (id: 1638400, t: 50)
    float64
    counts
    0.0, 0.0, ..., 0.0, 0.0
  • proton_charge
    scipp
    Variable
    ()
    float64
    counts
    8.93992515179566
  • crystal_rotation
    scipp
    Variable
    ()
    vector3
    deg
    [0. 0. 0.]
  • sample_position
    scipp
    Variable
    ()
    vector3
    m
    [0. 0. 0.]
  • source_position
    scipp
    Variable
    ()
    vector3
    m
    [ -0.53123 0. -157.405 ]
  • sample_name
    scipp
    Variable
    ()
    string
    sampleMantid
  • fast_axis
    scipp
    Variable
    ()
    vector3
    𝟙
    [-0.00531614 0. -0.99998587]
  • slow_axis
    scipp
    Variable
    ()
    vector3
    𝟙
    [0. 1. 0.]
  • origin_position
    scipp
    Variable
    ()
    vector3
    m
    [-0.288666 -0.25 0.252 ]
  • position
    scipp
    Variable
    (id: 1638400)
    vector3
    m
    [-0.288666 -0.25 0.252 ], [-0.28866813 -0.25 0.25160001], ..., [-0.29138361 0.2616 -0.25919278], [-0.29138574 0.2616 -0.25959277]
  • detector_shape
    scipp
    Variable
    ()
    PyObject
    (1280, 1280)
  • x_pixel_size
    scipp
    Variable
    ()
    float64
    m
    0.0004
  • y_pixel_size
    scipp
    Variable
    ()
    float64
    m
    0.0004
  • detector_name
    scipp
    Variable
    ()
    string
    nD_Mantid_1
  • counts
    scipp
    DataArray
    (id: 1638400, t: 50)
    float64
    counts
    0.0, 0.0, ..., 0.0, 0.0
  • proton_charge
    scipp
    Variable
    ()
    float64
    counts
    9.867459273413878
  • crystal_rotation
    scipp
    Variable
    ()
    vector3
    deg
    [0. 0. 0.]
  • sample_position
    scipp
    Variable
    ()
    vector3
    m
    [0. 0. 0.]
  • source_position
    scipp
    Variable
    ()
    vector3
    m
    [ -0.53123 0. -157.405 ]
  • sample_name
    scipp
    Variable
    ()
    string
    sampleMantid
  • fast_axis
    scipp
    Variable
    ()
    vector3
    𝟙
    [0.00531614 0. 0.99998587]
  • slow_axis
    scipp
    Variable
    ()
    vector3
    𝟙
    [0. 1. 0.]
  • origin_position
    scipp
    Variable
    ()
    vector3
    m
    [ 0.288667 -0.25 -0.251 ]
  • position
    scipp
    Variable
    (id: 1638400)
    vector3
    m
    [ 0.288667 -0.25 -0.251 ], [ 0.28866913 -0.25 -0.25060001], ..., [0.29138461 0.2616 0.26019278], [0.29138674 0.2616 0.26059277]
  • detector_shape
    scipp
    Variable
    ()
    PyObject
    (1280, 1280)
  • x_pixel_size
    scipp
    Variable
    ()
    float64
    m
    0.0004
  • y_pixel_size
    scipp
    Variable
    ()
    float64
    m
    0.0004
  • detector_name
    scipp
    Variable
    ()
    string
    nD_Mantid_2