Workflow - Chunk by Chunk#
In this example, we will process McStas events chunk by chunk, panel by panel.
Build Base Workflow#
[1]:
from ess.nmx.mcstas import McStasWorkflow
from ess.nmx.data import small_mcstas_3_sample
from ess.nmx.types import *
wf = McStasWorkflow()
# Replace with the path to your own file
wf[FilePath] = small_mcstas_3_sample()
wf[MaximumCounts] = 10_000
wf[TimeBinSteps] = 50
wf.visualize(NMXReducedDataGroup, graph_attr={"rankdir": "TD"}, compact=True)
[1]:
Compute Raw Data Metadata#
time-of-flight coordinate and McStasWeight2CountScaleFactor should not be different from chunk to chunk.
Therefore we need to compute TimeBinStep and McStasWeight2CoutScaleFactor before we compute NMXReducedData.
It can be done by ess.reduce.streaming.StreamProcessor.
In this example, MinimumTimeOfArrival, MaximumTimeOfArrival and MaximumProbability will be renewed every time a chunk is added to the streaming processor.
(Min/Max)Accumulator remembers the previous minimum/maximum value and compute new minimum/maximum value with the new chunk.
raw_event_data_chunk_generator yields a chunk of raw event probability from mcstas h5 file.
This example below process the data chunk by chunk with size: CHUNK_SIZE.
[2]:
from functools import partial
from ess.reduce.streaming import StreamProcessor, MaxAccumulator, MinAccumulator
# Stream processor building helper
scalefactor_stream_processor = partial(
StreamProcessor,
dynamic_keys=(RawEventProbability,),
target_keys=(NMXRawDataMetadata,),
accumulators={
MaximumProbability: MaxAccumulator,
MaximumTimeOfArrival: MaxAccumulator,
MinimumTimeOfArrival: MinAccumulator,
},
)
metadata_wf = wf.copy()
[3]:
metadata_wf.visualize(NMXRawDataMetadata, graph_attr={"rankdir": "TD"}, compact=True)
[3]:
[4]:
from ess.nmx.types import DetectorName
from ess.nmx.mcstas.load import (
raw_event_data_chunk_generator,
mcstas_weight_to_probability_scalefactor,
)
from ess.nmx.streaming import calculate_number_of_chunks
from ipywidgets import IntProgress
CHUNK_SIZE = 10 # Number of event rows to process at once
# Increase this number to speed up the processing
NUM_DETECTORS = 3
# Loop over the detectors
file_path = metadata_wf.compute(FilePath)
raw_data_metadatas = {}
for detector_i in range(0, NUM_DETECTORS):
temp_wf = metadata_wf.copy()
temp_wf[DetectorIndex] = detector_i
detector_name = temp_wf.compute(DetectorName)
max_chunk_id = calculate_number_of_chunks(
temp_wf.compute(FilePath), detector_name=detector_name, chunk_size=CHUNK_SIZE
)
cur_detector_progress_bar = IntProgress(
min=0, max=max_chunk_id, description=f"Detector {detector_i}"
)
display(cur_detector_progress_bar)
# Build the stream processor
processor = scalefactor_stream_processor(temp_wf)
for da in raw_event_data_chunk_generator(
file_path=file_path, detector_name=detector_name, chunk_size=CHUNK_SIZE
):
if any(da.sizes.values()) == 0:
continue
else:
results = processor.add_chunk({RawEventProbability: da})
cur_detector_progress_bar.value += 1
display(results[NMXRawDataMetadata])
raw_data_metadatas[detector_i] = results[NMXRawDataMetadata]
# We take the min/maximum values of the scale factor
min_toa = min(dg['min_toa'] for dg in raw_data_metadatas.values())
max_toa = max(dg['max_toa'] for dg in raw_data_metadatas.values())
max_probability = max(dg['max_probability'] for dg in raw_data_metadatas.values())
toa_bin_edges = sc.linspace(dim='t', start=min_toa, stop=max_toa, num=51)
scale_factor = mcstas_weight_to_probability_scalefactor(
max_counts=wf.compute(MaximumCounts), max_probability=max_probability
)
/tmp/ipykernel_3262/133556714.py:31: UserWarning: The chunk size may be too small < 10_000_000.
Consider increasing the chunk size for better performance.
Hint: NMX typically expect ~10^8 bins as reduced data.
for da in raw_event_data_chunk_generator(
- min_toascippVariable()float64s0.09725517074080177
- max_toascippVariable()float64s0.14286252966923943
- max_probabilityscippVariable()float64counts14.66504429903704
- min_toascippVariable()float64s0.09934708174034394
- max_toascippVariable()float64s0.14327614195731983
- max_probabilityscippVariable()float64counts14.142899760006335
- min_toascippVariable()float64s0.0993372097517347
- max_toascippVariable()float64s0.14527583039391995
- max_probabilityscippVariable()float64counts14.934219330171954
Compute Final Output#
Now with the scale_factor: McStasWeight2CountScaleFactor, we can compute the final output chunk by chunk.
We will also compute static parameters in advance so that stream processor does not compute them every time another chunk is added.
[5]:
from ess.nmx.mcstas.xml import McStasInstrument
final_wf = wf.copy()
# Set the scale factor and time bin edges
final_wf[McStasWeight2CountScaleFactor] = scale_factor
final_wf[TimeBinSteps] = toa_bin_edges
# Set the crystal rotation manually for now ...
final_wf[CrystalRotation] = sc.vector([0, 0, 0.0], unit='deg')
# Set static info
final_wf[McStasInstrument] = wf.compute(McStasInstrument)
final_wf.visualize(NMXReducedDataGroup, compact=True)
[5]:
[6]:
from functools import partial
from ess.reduce.streaming import StreamProcessor, EternalAccumulator
# Stream processor building helper
final_stream_processor = partial(
StreamProcessor,
dynamic_keys=(RawEventProbability,),
target_keys=(NMXReducedDataGroup,),
accumulators={NMXReducedCounts: EternalAccumulator},
)
[7]:
from ess.nmx.types import DetectorName
from ess.nmx.mcstas.load import raw_event_data_chunk_generator
from ess.nmx.streaming import calculate_number_of_chunks
from ipywidgets import IntProgress
CHUNK_SIZE = 10 # Number of event rows to process at once
# Increase this number to speed up the processing
NUM_DETECTORS = 3
final_wf[TimeBinSteps] = sc.linspace(
't', 0.1, 0.15, 51, unit='s'
) # Time bin edges can be calculated from the data
# But streaming processor only sees the first chunk
# So we need to set it manually before processing the first chunk
# It is a bit cumbersome since we have to know the range of the time bins in advance
# Loop over the detectors
file_path = final_wf.compute(FilePath)
for detector_i in range(0, NUM_DETECTORS):
temp_wf = final_wf.copy()
temp_wf[DetectorIndex] = detector_i
# First compute static information
detector_name = temp_wf.compute(DetectorName)
temp_wf[PixelIds] = temp_wf.compute(PixelIds)
max_chunk_id = calculate_number_of_chunks(
file_path, detector_name=detector_name, chunk_size=CHUNK_SIZE
)
cur_detector_progress_bar = IntProgress(
min=0, max=max_chunk_id, description=f"Detector {detector_i}"
)
display(cur_detector_progress_bar)
# Build the stream processor
processor = final_stream_processor(temp_wf)
for da in raw_event_data_chunk_generator(
file_path=file_path, detector_name=detector_name, chunk_size=CHUNK_SIZE
):
if any(da.sizes.values()) == 0:
continue
else:
results = processor.add_chunk({RawEventProbability: da})
cur_detector_progress_bar.value += 1
result = results[NMXReducedDataGroup]
display(result)
/tmp/ipykernel_3262/116780297.py:34: UserWarning: The chunk size may be too small < 10_000_000.
Consider increasing the chunk size for better performance.
Hint: NMX typically expect ~10^8 bins as reduced data.
for da in raw_event_data_chunk_generator(
- countsscippDataArray(id: 1638400, t: 50)float64counts0.0, 0.0, ..., 0.0, 0.0
- proton_chargescippVariable()float64counts10.316198883013373
- crystal_rotationscippVariable()vector3deg[0. 0. 0.]
- sample_positionscippVariable()vector3m[0. 0. 0.]
- source_positionscippVariable()vector3m[ -0.53123 0. -157.405 ]
- sample_namescippVariable()stringsampleMantid
- fast_axisscippVariable()vector3𝟙[ 0.999986 0. -0.00529148]
- slow_axisscippVariable()vector3𝟙[0. 1. 0.]
- origin_positionscippVariable()vector3m[-0.248454 -0.25 0.292 ]
- positionscippVariable(id: 1638400)vector3m[-0.248454 -0.25 0.292 ], [-0.24805401 -0.25 0.29199788], ..., [0.26273884 0.2616 0.28929499], [0.26313884 0.2616 0.28929288]
- detector_shapescippVariable()PyObject(1280, 1280)
- x_pixel_sizescippVariable()float64m0.0004
- y_pixel_sizescippVariable()float64m0.0004
- detector_namescippVariable()stringnD_Mantid_0
- countsscippDataArray(id: 1638400, t: 50)float64counts0.0, 0.0, ..., 0.0, 0.0
- proton_chargescippVariable()float64counts8.93992515179566
- crystal_rotationscippVariable()vector3deg[0. 0. 0.]
- sample_positionscippVariable()vector3m[0. 0. 0.]
- source_positionscippVariable()vector3m[ -0.53123 0. -157.405 ]
- sample_namescippVariable()stringsampleMantid
- fast_axisscippVariable()vector3𝟙[-0.00531614 0. -0.99998587]
- slow_axisscippVariable()vector3𝟙[0. 1. 0.]
- origin_positionscippVariable()vector3m[-0.288666 -0.25 0.252 ]
- positionscippVariable(id: 1638400)vector3m[-0.288666 -0.25 0.252 ], [-0.28866813 -0.25 0.25160001], ..., [-0.29138361 0.2616 -0.25919278], [-0.29138574 0.2616 -0.25959277]
- detector_shapescippVariable()PyObject(1280, 1280)
- x_pixel_sizescippVariable()float64m0.0004
- y_pixel_sizescippVariable()float64m0.0004
- detector_namescippVariable()stringnD_Mantid_1
- countsscippDataArray(id: 1638400, t: 50)float64counts0.0, 0.0, ..., 0.0, 0.0
- proton_chargescippVariable()float64counts9.867459273413878
- crystal_rotationscippVariable()vector3deg[0. 0. 0.]
- sample_positionscippVariable()vector3m[0. 0. 0.]
- source_positionscippVariable()vector3m[ -0.53123 0. -157.405 ]
- sample_namescippVariable()stringsampleMantid
- fast_axisscippVariable()vector3𝟙[0.00531614 0. 0.99998587]
- slow_axisscippVariable()vector3𝟙[0. 1. 0.]
- origin_positionscippVariable()vector3m[ 0.288667 -0.25 -0.251 ]
- positionscippVariable(id: 1638400)vector3m[ 0.288667 -0.25 -0.251 ], [ 0.28866913 -0.25 -0.25060001], ..., [0.29138461 0.2616 0.26019278], [0.29138674 0.2616 0.26059277]
- detector_shapescippVariable()PyObject(1280, 1280)
- x_pixel_sizescippVariable()float64m0.0004
- y_pixel_sizescippVariable()float64m0.0004
- detector_namescippVariable()stringnD_Mantid_2