Source code for ess.sans.beam_center_finder

# SPDX-License-Identifier: BSD-3-Clause
# Copyright (c) 2023 Scipp contributors (https://github.com/scipp)

import uuid

import numpy as np
import sciline
import scipp as sc
import scippnexus as snx
from scipp.core import concepts

from ess.reduce.uncertainty import UncertaintyBroadcastMode

from .conversions import ElasticCoordTransformGraph
from .logging import get_logger
from .types import (
    BeamCenter,
    CleanDirectBeam,
    CorrectedDetector,
    DetectorBankSizes,
    DimsToKeep,
    IntensityQ,
    NeXusComponent,
    Numerator,
    QBins,
    ReturnEvents,
    SampleRun,
    WavelengthBands,
    WavelengthMask,
)


def _xy_extrema(pos: sc.Variable) -> sc.Variable:
    x_min = pos.fields.x.min()
    x_max = pos.fields.x.max()
    y_min = pos.fields.y.min()
    y_max = pos.fields.y.max()
    return sc.concat([x_min, x_max, y_min, y_max], dim='extremes')


def _find_beam_center(
    data,
    beam_stop_radius,
    beam_stop_arm_width,
):
    '''
    Each iteration the center of mass of the remaining intensity is computed
    and assigned to be the current beam center guess.
    Then three symmetrical masks are created to make sure that the remaining intensity
    distribution does not extend outside of the detector and that the beam stop
    does not make the remaining intensity asymmetrical.

    The three masks are:

      - one "outer" circular mask with radius less than the minimal distance
        from the current beam center guess to the border of the detector
      - one "inner" circular mask with radius larger than the beam stop
      - one "arm" rectangular mask with width wider than the beam stop arm

    The "outer" mask radius is found from the detector size.
    The "inner" mask radius is supplied by the caller.
    The "arm" mask slope is determined by the direction of minimum intensity
    around the current beam center guess, the "arm" mask width is an argument
    supplied by the caller.
    '''
    events = data.copy()
    events.masks.clear()
    intensity = events.bins.sum()

    for i in range(20):
        # Average position, weighted by intensity.
        center = (
            intensity.coords['position'] * sc.values(intensity)
        ).sum() / sc.values(intensity).sum()
        distance_to_center = intensity.coords['position'] - center.data

        # Outer radius of annulus around center.
        # Defined so that the annulus does not go outside of the bounds of the detector.
        outer = 0.9 * min(
            sc.abs(distance_to_center.fields.x.min()),
            sc.abs(distance_to_center.fields.x.max()),
            sc.abs(distance_to_center.fields.y.min()),
            sc.abs(distance_to_center.fields.y.max()),
        )
        intensity.masks['outer'] = (
            distance_to_center.fields.x**2 + distance_to_center.fields.y**2 > outer**2
        )
        # Inner radius defined by size of beam stop.
        intensity.masks['inner'] = (
            distance_to_center.fields.x**2 + distance_to_center.fields.y**2
            < beam_stop_radius**2
        )

        # Iterate without the arm mask a few times to settle near the center
        # before introducing the arm mask.
        if i > 10:
            # Angle between +x and distance_to_center.
            intensity.coords['theta'] = sc.where(
                distance_to_center.fields.x > sc.scalar(0.0, unit='m'),
                sc.atan2(y=distance_to_center.fields.y, x=distance_to_center.fields.x),
                sc.scalar(sc.constants.pi.value, unit='rad')
                - sc.atan2(
                    y=distance_to_center.fields.y, x=-distance_to_center.fields.x
                ),
            )
            intensity_over_angle = intensity.drop_masks(
                ['arm'] if 'arm' in intensity.masks else []
            ).hist(theta=100)
            # Assume angle of arm coincides with intensity minimum
            arm_angle = intensity_over_angle.coords['theta'][
                np.argmin(intensity_over_angle.values)
            ]

            slope = sc.tan(arm_angle)
            intensity.masks['arm'] = (
                distance_to_center.fields.y
                < slope * distance_to_center.fields.x + beam_stop_arm_width
            ) & (
                distance_to_center.fields.y
                > slope * distance_to_center.fields.x - beam_stop_arm_width
            )

    center.data.fields.z = sc.scalar(0.0, unit=center.data.unit)
    return center.data


[docs] def beam_center_from_center_of_mass_alternative( workflow, beam_stop_radius=None, beam_stop_arm_width=None, ) -> BeamCenter: """ Estimate the beam center via the center-of-mass of the data counts. We are assuming the intensity distribution is symmetric around the beam center. Even if the intensity distribution is symmetric around the beam center the intensity distribution in the detector might not be, because - the detector has a finite extent, - and there is a beam stop covering part of the detector. To deal with the limited size of the detector a mask can be applied that is small enough so that the the remaining intensity is entirely inside the detector. To deal with the beam stop we can mask the region of the detector that the beam stop covers. But to preserve the symmetry of the intensity around the beam center the masks also need to be symmetical around the beam center. The problem is, the beam center is unknown. However, if the beam center was known to us, and we applied symmetrical masks that covered the regions of the detector where the intensity distribution is asymmetrical, then the center of mass of the remaining intensity would equal the beam center. Conversely, if we apply symmetrical masks around a point that is not the beam center the center of mass of the remaining intensity will (likely) not equal the original point. This suggests the beam center can be found using a fixed point iteration where each iteration we 1. Compute the center of mass of the remaining intensity and assign it to be our current estimate of the beam center. 2. Create symmetrical masks around the current estimate of the beam center. 3. Repeat from 1. until convergence. Parameters ---------- workflow: The reduction workflow to compute CorrectedDetector[SampleRun, Numerator]. Returns ------- : The beam center position as a vector. """ if beam_stop_radius is None: beam_stop_radius = sc.scalar(0.05, unit='m') if beam_stop_arm_width is None: beam_stop_arm_width = sc.scalar(0.02, unit='m') try: beam_center = workflow.compute(BeamCenter) except sciline.UnsatisfiedRequirement: beam_center = sc.vector([0.0, 0.0, 0.0], unit='m') workflow[BeamCenter] = beam_center data = workflow.compute(CorrectedDetector[SampleRun, Numerator]) return _find_beam_center(data, beam_stop_radius, beam_stop_arm_width) + beam_center
[docs] def beam_center_from_center_of_mass(workflow: sciline.Pipeline) -> BeamCenter: """ Estimate the beam center via the center-of-mass of the data counts. The center-of-mass is simply the weighted mean of the positions. Areas with low counts are excluded from the center of mass calculation, as they typically fall into asymmetric regions of the detector panel and would thus lead to a biased result. The beam is assumed to be roughly aligned with the Z axis. The returned beam center is the component normal to the beam direction, projected onto the X-Y plane. Parameters ---------- workflow: The reduction workflow to compute CorrectedDetector[SampleRun, Numerator]. Returns ------- : The beam center position as a vector. """ try: beam_center = workflow.compute(BeamCenter) except sciline.UnsatisfiedRequirement: beam_center = sc.vector([0.0, 0.0, 0.0], unit='m') workflow = workflow.copy() workflow[BeamCenter] = beam_center data = workflow.compute(CorrectedDetector[SampleRun, Numerator]) graph = workflow.compute(ElasticCoordTransformGraph[SampleRun]) dims_to_sum = set(data.dims) - set(data.coords['position'].dims) if dims_to_sum: summed = data.sum(dims_to_sum) else: summed = data.bins.sum() if summed.ndim > 1: summed = summed.flatten(to=uuid.uuid4().hex) pos = summed.coords['position'] v = sc.values(summed) mask = concepts.irreducible_mask(summed, dim=None) if mask is None: mask = sc.zeros(sizes=pos.sizes, dtype='bool') extrema = _xy_extrema(pos[~mask]) # Mean including existing masks cutoff = 0.1 * v.mean().data low_counts = v.data < cutoff # Increase cutoff until we no longer include pixels at the X/Y min/max. # This would be simpler if the logical panel shape was reflected in the # dims of the input data, instead of having a flat list of pixels. while sc.any(_xy_extrema(pos[~(mask | low_counts)]) == extrema): cutoff *= 2.0 low_counts = v.data < cutoff # See scipp/scipp#3271, the following lines are a workaround select = ~(low_counts | mask) v = v.data[select] pos = pos[select] com = sc.sum(pos * v) / v.sum() # We compute the shift between the incident beam direction and the center-of-mass incident_beam = summed.transform_coords('incident_beam', graph=graph).coords[ 'incident_beam' ] n_beam = incident_beam / sc.norm(incident_beam) com_shift = com - sc.dot(com, n_beam) * n_beam xy = [com_shift.fields.x.value, com_shift.fields.y.value] return beam_center + _offsets_to_vector(data=summed, xy=xy, graph=graph)
def _offsets_to_vector(data: sc.DataArray, xy: list[float], graph: dict) -> sc.Variable: """ Convert x,y offsets inside the plane normal to the beam to a vector in absolute coordinates. """ u = data.coords['position'].unit # Get two vectors that define the plane normal to the beam coords = data.transform_coords( ['cyl_x_unit_vector', 'cyl_y_unit_vector'], graph=graph ).coords center = xy[0] * coords['cyl_x_unit_vector'] + xy[1] * coords['cyl_y_unit_vector'] center.unit = u return center def _iofq_in_quadrants( xy: list[float], workflow: sciline.Pipeline, detector: sc.DataArray, norm: sc.DataArray, ) -> dict[str, sc.DataArray]: """ Compute the intensity as a function of Q inside 4 quadrants in Phi. Parameters ---------- xy: The x,y offsets in the plane normal to the beam. detector: The raw detector. norm: The denominator data for normalization. Returns ------- : A dictionary containing the intensity as a function of Q in each quadrant. The quadrants are named 'south-west', 'south-east', 'north-east', and 'north-west'. """ pi = sc.constants.pi.value phi_bins = sc.linspace('phi', -pi, pi, 5, unit='rad') quadrants = ['south-west', 'south-east', 'north-east', 'north-west'] graph = workflow.compute(ElasticCoordTransformGraph[SampleRun]) workflow = workflow.copy() workflow[BeamCenter] = _offsets_to_vector(data=detector, xy=xy, graph=graph) calibrated = workflow.compute(CorrectedDetector[SampleRun, Numerator]) with_phi = calibrated.transform_coords( 'phi', graph=graph, keep_intermediate=False, keep_inputs=False ) # If gravity-correction is enabled, phi depends on wavelength (and event). # We cannot handle this below, so we approximate phi by the mean value. if ('phi' not in with_phi.coords) and ('phi' in with_phi.bins.coords): # This is the case where we have a phi event coord but no coord at the top level phi = with_phi.bins.coords['phi'].bins.mean() else: phi = with_phi.coords['phi'] if phi.bins is not None or 'wavelength' in phi.dims: phi = phi.mean('wavelength') out = {} for i, quad in enumerate(quadrants): # Select pixels based on phi sel = (phi >= phi_bins[i]) & (phi < phi_bins[i + 1]) # The beam center is applied when computing EmptyDetector, set quadrant # *before* that step. workflow[NeXusComponent[snx.NXdetector, SampleRun]] = sc.DataGroup( data=detector[sel] ) # MaskedData would be computed automatically, but we did it above already workflow[CorrectedDetector[SampleRun, Numerator]] = calibrated[sel] workflow[CleanDirectBeam] = norm if norm.dims == ('wavelength',) else norm[sel] out[quad] = workflow.compute(IntensityQ[SampleRun]) return out def _cost(xy: list[float], *args) -> float: """ Cost function for determining how close the :math:`I(Q)` curves are in all four quadrants. The cost is defined as .. math:: \\text{cost} = \\frac{\\sum_{Q}\\sum_{i=1}^{i=4} \\overline{I}(Q)\\left(I(Q)_{i} - \\overline{I}(Q)\\right)^2}{\\sum_{Q}\\overline{I}(Q)} ~, where :math:`i` represents the 4 quadrants and :math:`\\overline{I}(Q)` is the mean intensity of the 4 quadrants as a function of :math:`Q`. This is basically a weighted mean of the square of the differences between the :math:`I(Q)` curves in the 4 quadrants with respect to the mean, and where the weights are :math:`\\overline{I}(Q)`. We use a weighted mean, as opposed to relative (percentage) differences to give less importance to regions with low statistics which are potentially noisy and would contribute significantly to the computed cost. Parameters ---------- xy: The x,y offsets in the plane normal to the beam. *args: Arguments passed to :func:`iofq_in_quadrants`. Returns ------- : The sum of the residuals for :math:`I(Q)` in the 4 quadrants, with respect to the mean :math:`I(Q)` in all quadrants. Notes ----- Mantid uses a different cost function. They compute the horizontal (Left - Right) and the vertical (Top - Bottom) costs, and require both to be below the tolerance. The costs are defined as .. math:: \\text{cost} = \\sum_{Q} \\left(I(Q)_{\\text{L,T}} - I(Q)_{\\text{R,B}}\\right)^2 ~. Using absolute differences instead of a weighted mean is similar to our cost function in the way that it would give a lot of weight to even a small difference in a high-intensity region. However, it also means that an absolute difference of e.g. 2 in a high-intensity region would be weighted the same as a difference of 2 in a low-intensity region. It is also not documented why two separate costs are computed, instead of a single one. The Mantid implementation is available `here <https://github.com/mantidproject/mantid/blob/main/Framework/PythonInterface/plugins/algorithms/WorkflowAlgorithms/SANS/SANSBeamCentreFinder.py`_. """ # noqa: E501 iofq = _iofq_in_quadrants(xy, *args) all_q = sc.concat([sc.values(da) for da in iofq.values()], dim='quadrant') ref = all_q.mean('quadrant') c = (all_q - ref) ** 2 out = (sc.sum(ref * c) / sc.sum(ref)).value logger = get_logger('sans') if not np.isfinite(out): out = np.inf logger.info( 'Non-finite value computed in cost. This is likely due to a division by ' 'zero. If the final results for the beam center are not satisfactory, ' 'try restricting your Q range, or increasing the size of your Q bins to ' 'improve statistics in the denominator.' ) logger.info('Beam center finder: x=%s, y=%s, cost=%s', xy[0], xy[1], out) return out
[docs] def beam_center_from_iofq( *, workflow: sciline.Pipeline, q_bins: int | sc.Variable, minimizer: str | None = None, tolerance: float | None = None, ) -> BeamCenter: """ Find the beam center of a SANS scattering pattern using an I(Q) calculation. Description of the procedure: #. obtain an initial guess by computing the center-of-mass of the pixels, weighted by the counts on each pixel #. from that initial guess, divide the panel into 4 quadrants #. compute :math:`I(Q)` inside each quadrant and compute the residual difference between all 4 quadrants #. iteratively move the centre position and repeat 2. and 3. until all 4 :math:`I(Q)` curves lie on top of each other Parameters ---------- workflow: The reduction workflow to compute I(Q). q_bins: The binning in the Q dimension to be used. minimizer: The Scipy minimizer method to use (see the `Scipy docs <https://docs.scipy.org/doc/scipy/reference/generated/scipy.optimize.minimize.html>`_ for details). tolerance: Tolerance for termination (see the `Scipy docs <https://docs.scipy.org/doc/scipy/reference/generated/scipy.optimize.minimize.html>`_ for details). Returns ------- : The beam center position as a vector. Notes ----- We record here the thought process we went through during the writing of this algorithm. This information is important for understanding why the beam center finding is implemented the way it is, and should be considered carefully before making changes to the logic of the algorithm. **Use a + cut, not an X cut** The first idea for implementing the beam center finder was to cut the detector panel into 4 wedges using a cross (X) shape. This is what Mantid does, and seemed natural, because the offsets when searching for the beam center would be applied along the horizontal and vertical directions. This worked well on square detector panels (like the SANS2D detector), but on rectangular detectors, the north and south wedges ended up holding many less pixels than the east and west panels. More pixels means more contributions to a particular :math:`Q` bin, and comparing the :math:`I(Q)` curves in the 4 wedges was thus not possible. We therefore divided the detector panel into 4 quadrants using a ``+`` cut instead. Note that since we are looking at an isotropic scattering pattern, the shape of the cut (and the number of quadrants) should not matter for the resulting shapes of the :math:`I(Q)` curves. **Normalization inside the 4 quadrants** The first attempt at implementing the beam center finder was to only compute the raw counts as a function of $Q$ for the sample run, and not compute any normalization term. The idea was that even though this would change the shape of the :math:`I(Q)` curve, because we were looking at isotropic scattering, it would change the shape of the curve isotropically, thus still allowing us to find the center when the curves in all 4 quadrants overlap. The motivation for this was to save computational cost. After discovering the issue that using a ``X`` shaped cut for dividing the detector panel would yield different contributions to :math:`I(Q)` in the different wedges, we concluded that some normalization was necessary. The first version was to simply sum the counts in each quadrant and use this to normalize the counts for each intensity curve. This was, however, not sufficient in cases where masks are applied to the detector pixels. It is indeed very common to mask broken pixels, as well as the region of the detector where the beam stop is casting a shadow. Such a beam stop will not appear in all 4 quadrants, and because it spans a range of scattering (:math:`2{\\theta}`) angles, it spans a range of :math:`Q` bins. All this means that we in fact need to perform a reduction as close as possible to the full :math:`I(Q)` reduction in each of the 4 quadrants to achieve a reliable result. We write 'as close as possible' because In the full :math:`I(Q)` reduction, there is a term :math:`D({\\lambda})` in the normalization called the 'direct beam' which gives the efficiency of the detectors as a function of wavelength. Because finding the beam center is required to compute the direct beam in the first place, we do not include this term in the computation of :math:`I(Q)` for finding the beam center. This changes the shape of the :math:`I(Q)` curve, but since it changes it in the same manner for all :math:`{\\phi}` angles, this does not affect the results for finding the beam center. This is what is now implemented in this version of the algorithm. """ from scipy.optimize import minimize logger = get_logger('sans') logger.info('Requested minimizer: %s', minimizer) logger.info('Requested tolerance: %s', tolerance) minimizer = minimizer or 'Nelder-Mead' tolerance = tolerance or 0.1 logger.info('Using minimizer: %s', minimizer) logger.info('Using tolerance: %s', tolerance) keys = ( NeXusComponent[snx.NXdetector, SampleRun], CorrectedDetector[SampleRun, Numerator], CleanDirectBeam, ElasticCoordTransformGraph[SampleRun], ) workflow = workflow.copy() # Avoid reshape of detector, which would break boolean-indexing by cost function workflow[DetectorBankSizes] = {} results = workflow.compute(keys) detector = results[NeXusComponent[snx.NXdetector, SampleRun]]['data'] data = results[CorrectedDetector[SampleRun, Numerator]] norm = results[CleanDirectBeam] graph = results[ElasticCoordTransformGraph[SampleRun]] # Avoid reloading the detector workflow[NeXusComponent[snx.NXdetector, SampleRun]] = sc.DataGroup(data=detector) workflow[UncertaintyBroadcastMode] = UncertaintyBroadcastMode.upper_bound workflow[ReturnEvents] = False workflow[DimsToKeep] = () workflow[WavelengthMask] = None workflow[WavelengthBands] = None workflow[QBins] = q_bins # Use center of mass to get initial guess for beam center com_shift = beam_center_from_center_of_mass(workflow) logger.info('Initial guess for beam center: %s', com_shift) coords = data.transform_coords( ['cylindrical_x', 'cylindrical_y'], graph=graph ).coords bounds = [ (coords['cylindrical_x'].min().value, coords['cylindrical_x'].max().value), (coords['cylindrical_y'].min().value, coords['cylindrical_y'].max().value), ] # Refine using Scipy optimize res = minimize( _cost, x0=[com_shift.fields.x.value, com_shift.fields.y.value], args=(workflow, detector, norm), bounds=bounds, method=minimizer, tol=tolerance, ) center = _offsets_to_vector(data=data, xy=res.x, graph=graph) logger.info('Final beam center value: %s', center) logger.info('Beam center finder minimizer info: %s', res) return center