Source code for ess.beer.clustering

import scipp as sc
from scipy.signal import find_peaks, medfilt

from .conversions import tof_from_t0_estimate_graph
from .types import (
    GeometryCoordTransformGraph,
    RawDetector,
    RunType,
    StreakClusteredData,
)


[docs] def cluster_events_by_streak( da: RawDetector[RunType], gg: GeometryCoordTransformGraph ) -> StreakClusteredData[RunType]: graph = tof_from_t0_estimate_graph(gg) da = da.transform_coords(['dspacing'], graph=graph) da.bins.coords['coarse_d'] = da.bins.coords.pop('dspacing').to(unit='angstrom') # We need to keep these coordinates after binning, # adding them to the binned data coords achieves this. for coord in ('two_theta', 'Ltotal'): da.bins.coords[coord] = sc.bins_like(da, da.coords[coord]) h = da.bins.concat().hist(coarse_d=1000) i_peaks, _ = find_peaks( h.data.values, height=medfilt(h.values, kernel_size=99), distance=3 ) i_valleys, _ = find_peaks( h.data.values.max() - h.data.values, distance=3, height=h.data.values.max() / 2 ) valleys = h.coords['coarse_d'][i_valleys] peaks = sc.array( dims=['coarse_d'], values=h.coords['coarse_d'].values[i_peaks], unit=h.coords['coarse_d'].unit, ) has_peak = peaks.bin(coarse_d=valleys).bins.size().data.to(dtype='bool') filtered_valleys = valleys[ sc.concat( [ has_peak[0], has_peak[:-1] | has_peak[1:], has_peak[-1], ], dim=has_peak.dim, ) ] has_peak = peaks.bin(coarse_d=filtered_valleys).bins.size().data b = ( da.bins.concat() .bin(coarse_d=filtered_valleys) .assign_masks(no_peak=has_peak != sc.scalar(1, unit=None)) ) b = b.drop_coords(('coarse_d',)) b = b.bins.drop_coords(('coarse_d',)) b = b.rename_dims(coarse_d='streak') return b
providers = (cluster_events_by_streak,)