Source code for scippneutron.tof.to_events
# SPDX-License-Identifier: BSD-3-Clause
# Copyright (c) 2024 Scipp contributors (https://github.com/scipp)
from functools import reduce
import numpy as np
import scipp as sc
[docs]
def to_events(
da: sc.DataArray, event_dim: str, events_per_bin: int = 500
) -> sc.DataArray:
"""
Convert a histogrammed data array to an event list.
Each dimension with a bin-edge coordinate is converted to an event coordinate.
The contract is that if we re-histogram the event list with the same bin edges,
we should get the original counts back.
Masks on non-bin-edge dimensions are preserved.
If there are masks on bin-edge dimensions, the masked values are zeroed out in the
original data before the conversion to events.
Parameters
----------
da:
DataArray to convert to events.
event_dim:
Name of the new event dimension.
events_per_bin:
Number of events to generate per bin.
"""
if da.bins is not None:
raise ValueError("Cannot convert a binned DataArray to events.")
rng = np.random.default_rng()
event_coords = {}
edge_dims = []
midp_dims = []
# Separate bin-edge and midpoints coords
for dim in da.dims:
if da.coords.is_edges(dim):
edge_dims.append(dim)
else:
midp_dims.append(dim)
edge_sizes = {dim: da.sizes[dim] for dim in edge_dims}
for dim in edge_dims:
coord = da.coords[dim]
low = sc.broadcast(coord[dim, :-1], sizes=edge_sizes).values
high = sc.broadcast(coord[dim, 1:], sizes=edge_sizes).values
# The numpy.random.uniform function below does not support NaNs, so we need to
# replace them with zeros, and then replace them back after the random numbers
# have been generated.
nans = np.isnan(low) | np.isnan(high)
low = np.where(nans, 0.0, low)
high = np.where(nans, 0.0, high)
# In each bin, we generate a number of events with a uniform distribution.
events = rng.uniform(
low, high, size=(events_per_bin, *list(edge_sizes.values()))
)
events[..., nans] = np.nan
event_coords[dim] = sc.array(
dims=[event_dim, *edge_dims], values=events, unit=coord.unit
)
# Find and apply masks that are on a bin-edge dimension
event_masks = {}
other_masks = {}
edge_dims_set = set(edge_dims)
for key, mask in da.masks.items():
if set(mask.dims) & edge_dims_set:
event_masks[key] = mask
else:
other_masks[key] = mask
data = da.data
if event_masks:
inv_mask = (~reduce(lambda a, b: a | b, event_masks.values())).to(dtype=int)
inv_mask.unit = ''
data = data * inv_mask
# Create the data counts, which are the original counts divided by the number of
# events per bin
sizes = {event_dim: events_per_bin} | da.sizes
val = sc.broadcast(sc.values(data) / float(events_per_bin), sizes=sizes)
kwargs = {'dims': sizes.keys(), 'values': val.values, 'unit': data.unit}
if data.variances is not None:
kwargs['variances'] = sc.broadcast(
sc.variances(data) / float(events_per_bin), sizes=sizes
).values
new_data = sc.array(**kwargs)
new = sc.DataArray(data=new_data, coords=event_coords)
new = new.transpose((*midp_dims, *edge_dims, event_dim)).flatten(
dims=[*edge_dims, event_dim], to=event_dim
)
return new.assign_coords(
{dim: da.coords[dim].copy() for dim in midp_dims}
).assign_masks({key: mask.copy() for key, mask in other_masks.items()})