Workflow - Chunk by Chunk#
In this example, we will process McStas events chunk by chunk, panel by panel.
Build Base Workflow#
[1]:
from ess.nmx.mcstas import McStasWorkflow
from ess.nmx.data import small_mcstas_3_sample
from ess.nmx.types import *
wf = McStasWorkflow()
# Replace with the path to your own file
wf[FilePath] = small_mcstas_3_sample()
wf[MaximumCounts] = 10_000
wf[TimeBinSteps] = 50
wf.visualize(NMXReducedDataGroup, graph_attr={"rankdir": "TD"}, compact=True)
[1]:
Compute Raw Data Metadata#
time-of-flight
coordinate and McStasWeight2CountScaleFactor
should not be different from chunk to chunk.
Therefore we need to compute TimeBinStep
and McStasWeight2CoutScaleFactor
before we compute NMXReducedData
.
It can be done by ess.reduce.streaming.StreamProcessor
.
In this example, MinimumTimeOfArrival
, MaximumTimeOfArrival
and MaximumProbability
will be renewed every time a chunk is added to the streaming processor.
(Min/Max)Accumulator
remembers the previous minimum/maximum value and compute new minimum/maximum value with the new chunk.
raw_event_data_chunk_generator
yields a chunk of raw event probability from mcstas h5 file.
This example below process the data chunk by chunk with size: CHUNK_SIZE
.
[2]:
from functools import partial
from ess.reduce.streaming import StreamProcessor, MaxAccumulator, MinAccumulator
# Stream processor building helper
scalefactor_stream_processor = partial(
StreamProcessor,
dynamic_keys=(RawEventProbability,),
target_keys=(NMXRawDataMetadata,),
accumulators={
MaximumProbability: MaxAccumulator,
MaximumTimeOfArrival: MaxAccumulator,
MinimumTimeOfArrival: MinAccumulator,
},
)
metadata_wf = wf.copy()
[3]:
metadata_wf.visualize(NMXRawDataMetadata, graph_attr={"rankdir": "TD"}, compact=True)
[3]:
[4]:
from ess.nmx.types import DetectorName
from ess.nmx.mcstas.load import (
raw_event_data_chunk_generator,
mcstas_weight_to_probability_scalefactor,
)
from ess.nmx.streaming import calculate_number_of_chunks
from ipywidgets import IntProgress
CHUNK_SIZE = 10 # Number of event rows to process at once
# Increase this number to speed up the processing
NUM_DETECTORS = 3
# Loop over the detectors
file_path = metadata_wf.compute(FilePath)
raw_data_metadatas = {}
for detector_i in range(0, NUM_DETECTORS):
temp_wf = metadata_wf.copy()
temp_wf[DetectorIndex] = detector_i
detector_name = temp_wf.compute(DetectorName)
max_chunk_id = calculate_number_of_chunks(
temp_wf.compute(FilePath), detector_name=detector_name, chunk_size=CHUNK_SIZE
)
cur_detector_progress_bar = IntProgress(
min=0, max=max_chunk_id, description=f"Detector {detector_i}"
)
display(cur_detector_progress_bar)
# Build the stream processor
processor = scalefactor_stream_processor(temp_wf)
for da in raw_event_data_chunk_generator(
file_path=file_path, detector_name=detector_name, chunk_size=CHUNK_SIZE
):
if any(da.sizes.values()) == 0:
continue
else:
results = processor.add_chunk({RawEventProbability: da})
cur_detector_progress_bar.value += 1
display(results[NMXRawDataMetadata])
raw_data_metadatas[detector_i] = results[NMXRawDataMetadata]
# We take the min/maximum values of the scale factor
min_toa = min(dg['min_toa'] for dg in raw_data_metadatas.values())
max_toa = max(dg['max_toa'] for dg in raw_data_metadatas.values())
max_probability = max(dg['max_probability'] for dg in raw_data_metadatas.values())
toa_bin_edges = sc.linspace(dim='t', start=min_toa, stop=max_toa, num=51)
scale_factor = mcstas_weight_to_probability_scalefactor(
max_counts=wf.compute(MaximumCounts), max_probability=max_probability
)
/tmp/ipykernel_3262/133556714.py:31: UserWarning: The chunk size may be too small < 10_000_000.
Consider increasing the chunk size for better performance.
Hint: NMX typically expect ~10^8 bins as reduced data.
for da in raw_event_data_chunk_generator(
- min_toascippVariable()float64s0.09725517074080177
- max_toascippVariable()float64s0.14286252966923943
- max_probabilityscippVariable()float64counts14.66504429903704
- min_toascippVariable()float64s0.09934708174034394
- max_toascippVariable()float64s0.14327614195731983
- max_probabilityscippVariable()float64counts14.142899760006335
- min_toascippVariable()float64s0.0993372097517347
- max_toascippVariable()float64s0.14527583039391995
- max_probabilityscippVariable()float64counts14.934219330171954
Compute Final Output#
Now with the scale_factor: McStasWeight2CountScaleFactor
, we can compute the final output chunk by chunk.
We will also compute static parameters in advance so that stream processor does not compute them every time another chunk is added.
[5]:
from ess.nmx.mcstas.xml import McStasInstrument
final_wf = wf.copy()
# Set the scale factor and time bin edges
final_wf[McStasWeight2CountScaleFactor] = scale_factor
final_wf[TimeBinSteps] = toa_bin_edges
# Set the crystal rotation manually for now ...
final_wf[CrystalRotation] = sc.vector([0, 0, 0.0], unit='deg')
# Set static info
final_wf[McStasInstrument] = wf.compute(McStasInstrument)
final_wf.visualize(NMXReducedDataGroup, compact=True)
[5]:
[6]:
from functools import partial
from ess.reduce.streaming import StreamProcessor, EternalAccumulator
# Stream processor building helper
final_stream_processor = partial(
StreamProcessor,
dynamic_keys=(RawEventProbability,),
target_keys=(NMXReducedDataGroup,),
accumulators={NMXReducedCounts: EternalAccumulator},
)
[7]:
from ess.nmx.types import DetectorName
from ess.nmx.mcstas.load import raw_event_data_chunk_generator
from ess.nmx.streaming import calculate_number_of_chunks
from ipywidgets import IntProgress
CHUNK_SIZE = 10 # Number of event rows to process at once
# Increase this number to speed up the processing
NUM_DETECTORS = 3
final_wf[TimeBinSteps] = sc.linspace(
't', 0.1, 0.15, 51, unit='s'
) # Time bin edges can be calculated from the data
# But streaming processor only sees the first chunk
# So we need to set it manually before processing the first chunk
# It is a bit cumbersome since we have to know the range of the time bins in advance
# Loop over the detectors
file_path = final_wf.compute(FilePath)
for detector_i in range(0, NUM_DETECTORS):
temp_wf = final_wf.copy()
temp_wf[DetectorIndex] = detector_i
# First compute static information
detector_name = temp_wf.compute(DetectorName)
temp_wf[PixelIds] = temp_wf.compute(PixelIds)
max_chunk_id = calculate_number_of_chunks(
file_path, detector_name=detector_name, chunk_size=CHUNK_SIZE
)
cur_detector_progress_bar = IntProgress(
min=0, max=max_chunk_id, description=f"Detector {detector_i}"
)
display(cur_detector_progress_bar)
# Build the stream processor
processor = final_stream_processor(temp_wf)
for da in raw_event_data_chunk_generator(
file_path=file_path, detector_name=detector_name, chunk_size=CHUNK_SIZE
):
if any(da.sizes.values()) == 0:
continue
else:
results = processor.add_chunk({RawEventProbability: da})
cur_detector_progress_bar.value += 1
result = results[NMXReducedDataGroup]
display(result)
/tmp/ipykernel_3262/116780297.py:34: UserWarning: The chunk size may be too small < 10_000_000.
Consider increasing the chunk size for better performance.
Hint: NMX typically expect ~10^8 bins as reduced data.
for da in raw_event_data_chunk_generator(
- countsscippDataArray(id: 1638400, t: 50)float64counts0.0, 0.0, ..., 0.0, 0.0
- proton_chargescippVariable()float64counts10.316198883013373
- crystal_rotationscippVariable()vector3deg[0. 0. 0.]
- sample_positionscippVariable()vector3m[0. 0. 0.]
- source_positionscippVariable()vector3m[ -0.53123 0. -157.405 ]
- sample_namescippVariable()stringsampleMantid
- fast_axisscippVariable()vector3𝟙[ 0.999986 0. -0.00529148]
- slow_axisscippVariable()vector3𝟙[0. 1. 0.]
- origin_positionscippVariable()vector3m[-0.248454 -0.25 0.292 ]
- positionscippVariable(id: 1638400)vector3m[-0.248454 -0.25 0.292 ], [-0.24805401 -0.25 0.29199788], ..., [0.26273884 0.2616 0.28929499], [0.26313884 0.2616 0.28929288]
- detector_shapescippVariable()PyObject(1280, 1280)
- x_pixel_sizescippVariable()float64m0.0004
- y_pixel_sizescippVariable()float64m0.0004
- detector_namescippVariable()stringnD_Mantid_0
- countsscippDataArray(id: 1638400, t: 50)float64counts0.0, 0.0, ..., 0.0, 0.0
- proton_chargescippVariable()float64counts8.93992515179566
- crystal_rotationscippVariable()vector3deg[0. 0. 0.]
- sample_positionscippVariable()vector3m[0. 0. 0.]
- source_positionscippVariable()vector3m[ -0.53123 0. -157.405 ]
- sample_namescippVariable()stringsampleMantid
- fast_axisscippVariable()vector3𝟙[-0.00531614 0. -0.99998587]
- slow_axisscippVariable()vector3𝟙[0. 1. 0.]
- origin_positionscippVariable()vector3m[-0.288666 -0.25 0.252 ]
- positionscippVariable(id: 1638400)vector3m[-0.288666 -0.25 0.252 ], [-0.28866813 -0.25 0.25160001], ..., [-0.29138361 0.2616 -0.25919278], [-0.29138574 0.2616 -0.25959277]
- detector_shapescippVariable()PyObject(1280, 1280)
- x_pixel_sizescippVariable()float64m0.0004
- y_pixel_sizescippVariable()float64m0.0004
- detector_namescippVariable()stringnD_Mantid_1
- countsscippDataArray(id: 1638400, t: 50)float64counts0.0, 0.0, ..., 0.0, 0.0
- proton_chargescippVariable()float64counts9.867459273413878
- crystal_rotationscippVariable()vector3deg[0. 0. 0.]
- sample_positionscippVariable()vector3m[0. 0. 0.]
- source_positionscippVariable()vector3m[ -0.53123 0. -157.405 ]
- sample_namescippVariable()stringsampleMantid
- fast_axisscippVariable()vector3𝟙[0.00531614 0. 0.99998587]
- slow_axisscippVariable()vector3𝟙[0. 1. 0.]
- origin_positionscippVariable()vector3m[ 0.288667 -0.25 -0.251 ]
- positionscippVariable(id: 1638400)vector3m[ 0.288667 -0.25 -0.251 ], [ 0.28866913 -0.25 -0.25060001], ..., [0.29138461 0.2616 0.26019278], [0.29138674 0.2616 0.26059277]
- detector_shapescippVariable()PyObject(1280, 1280)
- x_pixel_sizescippVariable()float64m0.0004
- y_pixel_sizescippVariable()float64m0.0004
- detector_namescippVariable()stringnD_Mantid_2