Source code for ess.beer.clustering
import scipp as sc
from scippneutron.conversion.tof import dspacing_from_tof
from scipy.signal import find_peaks, medfilt
from .types import DetectorData, RunType, StreakClusteredData
[docs]
def cluster_events_by_streak(da: DetectorData[RunType]) -> StreakClusteredData[RunType]:
if isinstance(da, sc.DataGroup):
return sc.DataGroup({k: cluster_events_by_streak(v) for k, v in da.items()})
da = da.copy(deep=False)
# TODO: approximate t0 should depend on chopper information
approximate_t0 = sc.scalar(6e-3, unit='s')
da.coords['coarse_d'] = dspacing_from_tof(
tof=da.coords['event_time_offset'] - approximate_t0,
Ltotal=da.coords['L0'],
two_theta=da.coords['two_theta'],
).to(unit='angstrom')
h = da.hist(coarse_d=1000)
i_peaks, _ = find_peaks(
h.data.values, height=medfilt(h.values, kernel_size=99), distance=3
)
i_valleys, _ = find_peaks(
h.data.values.max() - h.data.values, distance=3, height=h.data.values.max() / 2
)
valleys = h.coords['coarse_d'][i_valleys]
peaks = sc.array(
dims=['coarse_d'],
values=h.coords['coarse_d'].values[i_peaks],
unit=h.coords['coarse_d'].unit,
)
has_peak = peaks.bin(coarse_d=valleys).bins.size().data.to(dtype='bool')
filtered_valleys = valleys[
sc.concat(
[
has_peak[0],
has_peak[:-1] | has_peak[1:],
has_peak[-1],
],
dim=has_peak.dim,
)
]
has_peak = peaks.bin(coarse_d=filtered_valleys).bins.size().data
b = da.bin(coarse_d=filtered_valleys).assign_masks(
no_peak=has_peak != sc.scalar(1, unit=None)
)
b = b.drop_coords(('coarse_d',))
b = b.bins.drop_coords(('coarse_d',))
b = b.rename_dims(coarse_d='streak')
return b
providers = (cluster_events_by_streak,)