Source code for ess.nmx.streaming
# SPDX-License-Identifier: BSD-3-Clause
# Copyright (c) 2025 Scipp contributors (https://github.com/scipp)
from typing import Any
import scipp as sc
import scippnexus as snx
from ess.reduce.streaming import Accumulator
from .mcstas.load import load_event_data_bank_name
from .types import DetectorBankPrefix, DetectorName, FilePath
[docs]
class MinAccumulator(Accumulator):
"""Accumulator that keeps track of the maximum value seen so far."""
[docs]
def __init__(self, **kwargs: Any) -> None:
super().__init__(**kwargs)
self._cur_min: sc.Variable | None = None
@property
def value(self) -> sc.Variable | None:
return self._cur_min
def _do_push(self, value: sc.Variable) -> None:
new_min = value.min()
if self._cur_min is None:
self._cur_min = new_min
else:
self._cur_min = min(self._cur_min, new_min)
[docs]
class MaxAccumulator(Accumulator):
"""Accumulator that keeps track of the maximum value seen so far."""
[docs]
def __init__(self, **kwargs: Any) -> None:
super().__init__(**kwargs)
self._cur_max: sc.Variable | None = None
@property
def value(self) -> sc.Variable | None:
return self._cur_max
def _do_push(self, value: sc.Variable) -> None:
new_max = value.max()
if self._cur_max is None:
self._cur_max = new_max
else:
self._cur_max = max(self._cur_max, new_max)
[docs]
def calculate_number_of_chunks(
file_path: FilePath,
*,
detector_name: DetectorName,
bank_prefix: DetectorBankPrefix | None = None,
chunk_size: int = 0, # Number of rows to read at a time
) -> int:
"""Calculate number of chunks in the event data.
Parameters
----------
file_path:
Path to the nexus file
detector_name:
Name of the detector to load
pixel_ids:
Pixel ids to generate the data array with the events
chunk_size:
Number of rows to read at a time.
If 0, chunk slice is determined automatically by the ``iter_chunks``.
Note that it only works if the dataset is already chunked.
"""
# Find the data bank name associated with the detector
bank_prefix = load_event_data_bank_name(
detector_name=detector_name, file_path=file_path
)
bank_name = f'{bank_prefix}_dat_list_p_x_y_n_id_t'
with snx.File(file_path, 'r') as f:
root = f["entry1/data"]
(bank_name,) = (name for name in root.keys() if bank_name in name)
with snx.File(file_path, 'r') as f:
root = f["entry1/data"]
dset: snx.Field = root[bank_name]["events"]
if chunk_size == 0:
return len(list(dset.dataset.iter_chunks()))
else:
return dset.shape[0] // chunk_size + int(dset.shape[0] % chunk_size != 0)