Source code for scippneutron.tof.fakes

# SPDX-License-Identifier: BSD-3-Clause
# Copyright (c) 2023 Scipp contributors (https://github.com/scipp)
# @author Simon Heybrock
"""
A fake time-of-flight neutron beamline for documentation and testing.

This provides data in a structure as typically provided in a NeXus file, including:

- Detector event data including event_time_offset and event_time_zero
- Monitor event data including event_time_offset and event_time_zero
- Chopper timestamps
"""

from __future__ import annotations

from collections.abc import Callable

import numpy as np
import scipp as sc
from numpy import random

from . import chopper_cascade


[docs] class FakeSource:
[docs] def __init__( self, frequency: sc.Variable, run_length: sc.Variable, events_per_pulse: int = 10000, ): """ Return a fake source. Parameters ---------- frequency: Frequency of the source. run_length: Run length of the source. events_per_pulse: Number of events per pulse. """ self.frequency = frequency self.t0 = self._make_t0(frequency=frequency, run_length=run_length) self.events_per_pulse = events_per_pulse self.rng = random.default_rng(seed=0)
@property def pulse_period(self) -> sc.Variable: return sc.scalar(1.0) / self.frequency @property def number_of_pulses(self) -> int: return len(self.t0) def _make_t0(self, frequency: sc.Variable, run_length: sc.Variable) -> sc.Variable: start = sc.datetime('2019-12-25T06:00:00.0', unit='ns') npulse = (run_length * frequency).to(unit='').value pulses = sc.arange(dim='pulse', start=0, stop=npulse, dtype='int64') return start + (pulses * (1.0 / frequency)).to(dtype='int64', unit='ns')
[docs] class FakePulse: """ Simplified model of a pulse. Currently this is simply a time and wavelength interval. The plan is to also model a tail in the future, e.g., by overlaying multiple pulses. """
[docs] def __init__( self, time_min: sc.Variable, time_max: sc.Variable, wavelength_min: sc.Variable, wavelength_max: sc.Variable, ): self.time_min = time_min self.time_max = time_max self.wavelength_min = wavelength_min self.wavelength_max = wavelength_max self._center = sc.scalar(0.5) * (time_min + time_max) self._frames = chopper_cascade.FrameSequence.from_source_pulse( time_min=time_min, time_max=time_max, wavelength_min=wavelength_min, wavelength_max=wavelength_max, )
@property def center(self) -> sc.Variable: return self._center def chop( self, choppers: list[chopper_cascade.Chopper] ) -> chopper_cascade.FrameSequence: return self._frames.chop(choppers)
[docs] class FakeBeamline:
[docs] def __init__( self, source: FakeSource, pulse: FakePulse, choppers: dict[str, chopper_cascade.Chopper], monitors: dict[str, sc.Variable], detectors: dict[str, sc.Variable], time_of_flight_origin: str | None = None, ): """ Return a fake beamline. Parameters ---------- source: Fake source. pulse: Fake pulse. choppers: Choppers. monitors: Distances of monitors. detectors: Distances of detectors. time_of_flight_origin: Name of the chopper to use as time-of-flight origin. If None, use the source pulse. The center time of the slit opening of the chopper (or the source pulse) is used as time-of-flight origin. """ self._frames = pulse.chop(choppers.values()) self._pulse = pulse self._choppers = choppers self._source = source self._monitors = { key: self._frames[distance] for key, distance in monitors.items() } self.detectors = { key: self._frames[distance] for key, distance in detectors.items() } self._time_of_flight_origin = time_of_flight_origin
def get_monitor(self, name: str) -> sc.DataGroup: frame = self._monitors[name] return self._fake_monitor(frame) def _split_size(self, size, N): base, remainder = divmod(size, N) sizes = [base + 1 if i < remainder else base for i in range(N)] return sizes def _fake_monitor( self, frame: chopper_cascade.Frame ) -> tuple[sc.DataArray, sc.DataArray]: bounds = frame.bounds()['time'] subbounds = frame.subbounds()['time'] subframes = subbounds.sizes['subframe'] sizes = sc.array( dims=['pulse'], values=self._source.rng.integers( 0, self._source.events_per_pulse, size=self._source.number_of_pulses ), unit=None, ) event_index = sc.cumsum(sizes, dim='pulse', mode='exclusive') size = sizes.sum().value subsizes = self._split_size(size, subframes) subframe_times = [ sc.array( dims=['event'], values=self._source.rng.uniform( subbounds['subframe', i][0].value, subbounds['subframe', i][-1].value, size=subsizes[i], ), unit=bounds.unit, ) for i in range(subframes) ] # Offset from pulse that created the monitor event time_offset = sc.concat(subframe_times, 'event') # Ensure all pulses have events from all subframes self._source.rng.shuffle(time_offset.values) event_time_offset = time_offset % self._source.pulse_period time_zero_offset = time_offset - event_time_offset event_time_zero = self._source.t0 wrapped_events = sc.DataArray( sc.ones(sizes=event_time_offset.sizes, unit='counts'), coords={'event_time_offset': event_time_offset}, ) unwrapped_events = sc.DataArray( sc.ones(sizes=time_offset.sizes, unit='counts'), coords={ 'time_offset': time_offset, 'time_zero_offset': time_zero_offset.to(dtype='int64', unit='ns'), }, ) wrapped = sc.DataArray( data=sc.bins(begin=event_index, dim='event', data=wrapped_events), coords={'event_time_zero': event_time_zero}, ) unwrapped = sc.DataArray( data=sc.bins(begin=event_index, dim='event', data=unwrapped_events), coords={'event_time_zero': event_time_zero}, ) if self._time_of_flight_origin is None: offset_to_tof = self._pulse.center else: source_chopper = self._choppers[self._time_of_flight_origin] if len(source_chopper.time_open) != 1: raise NotImplementedError( "Using a chopper with multiple openings as " "source chopper is not implemented yet." ) offset_to_tof = 0.5 * ( source_chopper.time_open[0] + source_chopper.time_close[0] ) unwrapped = unwrapped.transform_coords( tof=lambda time_offset: time_offset - offset_to_tof.to(unit='s'), time_zero=lambda event_time_zero, time_zero_offset: event_time_zero + time_zero_offset.to(dtype='int64', unit='ns') + offset_to_tof.to(dtype='int64', unit='ns'), ) if self._time_of_flight_origin is None: unwrapped.coords['Ltotal'] = frame.distance else: unwrapped.coords['Ltotal'] = frame.distance - source_chopper.distance return wrapped, unwrapped
[docs] class FakeBeamlineEss:
[docs] def __init__( self, choppers: dict[str, chopper_cascade.Chopper], monitors: dict[str, sc.Variable], run_length: sc.Variable, events_per_pulse: int = 200000, source: Callable | None = None, ): import math import tof as tof_pkg from tof.facilities.ess_pulse import pulse self.frequency = pulse.frequency self.npulses = math.ceil((run_length * self.frequency).to(unit='').value) self.events_per_pulse = events_per_pulse # Create a source if source is None: self.source = tof_pkg.Source( facility='ess', neutrons=self.events_per_pulse, pulses=self.npulses ) else: self.source = source(pulses=self.npulses) # Convert the choppers to tof.Chopper def _open_close_angles(chopper, frequency): angular_speed = sc.constants.pi * (2.0 * sc.units.rad) * frequency return ( chopper.time_open * angular_speed, chopper.time_close * angular_speed, ) self.choppers = [] for name, ch in choppers.items(): frequency = self.frequency open_angles, close_angles = _open_close_angles(ch, frequency) # If the difference between open and close angles is larger than 2pi, # the boundaries have crossed, which means that the chopper is rotating # at a lower frequency. two_pi = np.pi * 2 if any(abs(np.diff(open_angles.values) > two_pi)) or any( abs(np.diff(close_angles.values) > two_pi) ): frequency = 0.5 * frequency open_angles, close_angles = _open_close_angles(ch, frequency) self.choppers.append( tof_pkg.Chopper( frequency=frequency, open=open_angles, close=close_angles, phase=sc.scalar(0.0, unit='rad'), distance=ch.distance, name=name, ) ) # Add detectors self.monitors = [ tof_pkg.Detector(distance=distance, name=key) for key, distance in monitors.items() ] # Propagate the neutrons self.model = tof_pkg.Model( source=self.source, choppers=self.choppers, detectors=self.monitors ) self.model_result = self.model.run()
def get_monitor(self, name: str) -> sc.DataGroup: # Create some fake pulse time zero start = sc.datetime("2024-01-01T12:00:00.000000") period = sc.reciprocal(self.frequency) detector = self.model_result.detectors[name] raw_data = detector.data.flatten(to='event') # Select only the neutrons that make it to the detector raw_data = raw_data[~raw_data.masks['blocked_by_others']].copy() raw_data.coords['Ltotal'] = detector.distance # Format the data in a way that resembles data loaded from NeXus event_data = raw_data.copy(deep=False) dt = period.to(unit='us') event_time_zero = (dt * (event_data.coords['toa'] // dt)).to(dtype=int) + start raw_data.coords['event_time_zero'] = event_time_zero event_data.coords['event_time_zero'] = event_time_zero event_data.coords['event_time_offset'] = ( event_data.coords.pop('toa').to(unit='s') % period ) del event_data.coords['tof'] del event_data.coords['speed'] del event_data.coords['time'] del event_data.coords['wavelength'] return ( event_data.group('event_time_zero').rename_dims(event_time_zero='pulse'), raw_data.group('event_time_zero').rename_dims(event_time_zero='pulse'), )
wfm1 = chopper_cascade.Chopper( distance=sc.scalar(6.6, unit='m'), time_open=sc.array( dims=('cutout',), values=[ -0.000396, 0.001286, 0.005786, 0.008039, 0.010133, 0.012080, 0.013889, 0.015571, ], unit='s', ), time_close=sc.array( dims=('cutout',), values=[ 0.000654, 0.002464, 0.006222, 0.008646, 0.010899, 0.012993, 0.014939, 0.016750, ], unit='s', ), ) wfm2 = chopper_cascade.Chopper( distance=sc.scalar(7.1, unit='m'), time_open=sc.array( dims=('cutout',), values=[ 0.000654, 0.002451, 0.006222, 0.008645, 0.010898, 0.012993, 0.014940, 0.016737, ], unit='s', ), time_close=sc.array( dims=('cutout',), values=[ 0.001567, 0.003641, 0.006658, 0.009252, 0.011664, 0.013759, 0.015853, 0.017927, ], unit='s', ), ) # psc1 and psc2 are defined as a single slit of wfm1 and wfm2, respectively. psc1 = chopper_cascade.Chopper( distance=sc.scalar(6.6, unit='m'), time_open=sc.array(dims=('cutout',), values=[0.010133], unit='s'), time_close=sc.array(dims=('cutout',), values=[0.010899], unit='s'), ) psc2 = chopper_cascade.Chopper( distance=sc.scalar(7.1, unit='m'), time_open=sc.array(dims=('cutout',), values=[0.010898], unit='s'), time_close=sc.array(dims=('cutout',), values=[0.011664], unit='s'), ) frame_overlap_1 = chopper_cascade.Chopper( distance=sc.scalar(8.8, unit='m'), time_open=sc.array( dims=('cutout',), values=[ -0.000139, 0.002460, 0.006796, 0.010020, 0.012733, 0.015263, 0.017718, 0.020317, ], unit='s', ), time_close=sc.array( dims=('cutout',), values=[ 0.000640, 0.003671, 0.007817, 0.011171, 0.013814, 0.016146, 0.018497, 0.021528, ], unit='s', ), ) frame_overlap_2 = chopper_cascade.Chopper( distance=sc.scalar(15.9, unit='m'), time_open=sc.array( dims=('cutout',), values=[ -0.000306, 0.010939, 0.016495, 0.021733, 0.026416, 0.030880, 0.035409, ], unit='s', ), time_close=sc.array( dims=('cutout',), values=[ 0.002582, 0.014570, 0.020072, 0.024730, 0.029082, 0.033316, 0.038297, ], unit='s', ), ) pulse_skipping = chopper_cascade.Chopper( distance=sc.scalar(15.91, unit='m'), time_open=sc.scalar(0.021733, unit='s') + sc.arange('cutout', 2, unit='s') * (2 / 14), time_close=sc.scalar(0.024730, unit='s') + sc.arange('cutout', 2, unit='s') * (2 / 14), ) wfm_choppers = sc.DataGroup( wfm1=wfm1, wfm2=wfm2, frame_overlap_1=frame_overlap_1, frame_overlap_2=frame_overlap_2, ) psc_choppers = sc.DataGroup( psc1=psc1, psc2=psc2, frame_overlap_1=frame_overlap_1, frame_overlap_2=frame_overlap_2, ) ess_time_min = sc.scalar(0.0, unit='ms') ess_time_max = sc.scalar(3.0, unit='ms') ess_wavelength_min = sc.scalar(0.0, unit='angstrom') ess_wavelength_max = sc.scalar(10.0, unit='angstrom') wfm_frames = chopper_cascade.FrameSequence.from_source_pulse( time_min=ess_time_min, time_max=ess_time_max, wavelength_min=ess_wavelength_min, wavelength_max=ess_wavelength_max, ) wfm_frames = wfm_frames.chop(wfm_choppers.values()) psc_frames = chopper_cascade.FrameSequence.from_source_pulse( time_min=ess_time_min, time_max=ess_time_max, wavelength_min=ess_wavelength_min, wavelength_max=ess_wavelength_max, ) psc_frames = psc_frames.chop(psc_choppers.values())