Source code for ess.nmx.mtz_io

# SPDX-License-Identifier: BSD-3-Clause
# Copyright (c) 2024 Scipp contributors (https://github.com/scipp)
import pathlib
from typing import NewType

import gemmi
import numpy as np
import pandas as pd
import scipp as sc

# Index types for param table.
MTZFileIndex = NewType("MTZFileIndex", int)
"""The index of the mtz file when iterating over multiple mtz files."""

# User defined or configurable types
MTZFilePath = NewType("MTZFilePath", pathlib.Path)
"""Path to the mtz file"""
SpaceGroupDesc = NewType("SpaceGroupDesc", str)
"""The space group description. e.g. 'P 21 21 21'"""
DEFAULT_SPACE_GROUP_DESC = SpaceGroupDesc("P 21 21 21")
"""The default space group description to use if not found in the mtz files."""

# Custom column names
WavelengthColumnName = NewType("WavelengthColumnName", str)
"""The name of the wavelength column in the mtz file."""
DEFAULT_WAVELENGTH_COLUMN_NAME = WavelengthColumnName("LAMBDA")

IntensityColumnName = NewType("IntensityColumnName", str)
"""The name of the intensity column in the mtz file."""
DEFAULT_INTENSITY_COLUMN_NAME = IntensityColumnName("I")

StdDevColumnName = NewType("StdDevColumnName", str)
"""The name of the standard uncertainty of intensity column in the mtz file."""
DEFAULT_STD_DEV_COLUMN_NAME = StdDevColumnName("SIGI")

# Computed types
MtzDataFrame = NewType("MtzDataFrame", pd.DataFrame)
"""The raw mtz dataframe."""
NMXMtzDataFrame = NewType("NMXMtzDataFrame", pd.DataFrame)
"""The processed mtz dataframe with derived columns."""
NMXMtzDataArray = NewType("NMXMtzDataArray", sc.DataArray)


[docs] def read_mtz_file(file_path: MTZFilePath) -> gemmi.Mtz: """read mtz file""" return gemmi.read_mtz_file(file_path.as_posix())
[docs] def mtz_to_pandas(mtz: gemmi.Mtz) -> pd.DataFrame: """Converts the mtz file to a pandas dataframe. It is equivalent to the following code: .. code-block:: python import numpy as np import pandas as pd data = np.array(mtz, copy=False) columns = mtz.column_labels() return pd.DataFrame(data, columns=columns) It is recommended in the gemmi documentation. """ return pd.DataFrame( # Recommended in the gemmi documentation. data=np.array(mtz, copy=False), columns=mtz.column_labels() )
[docs] def process_single_mtz_to_dataframe( mtz: gemmi.Mtz, wavelength_column_name: WavelengthColumnName = DEFAULT_WAVELENGTH_COLUMN_NAME, intensity_column_name: IntensityColumnName = DEFAULT_INTENSITY_COLUMN_NAME, intensity_sig_col_name: StdDevColumnName = DEFAULT_STD_DEV_COLUMN_NAME, ) -> MtzDataFrame: """Select and derive columns from the original ``MtzDataFrame``. Parameters ---------- mtz: The raw mtz dataset. wavelength_column_name: The name of the wavelength column in the mtz file. intensity_column_name: The name of the intensity column in the mtz file. intensity_sig_col_name: The name of the standard uncertainty of intensity column in the mtz file. Returns ------- : The new mtz dataframe with derived and renamed columns. The derived columns are: - ``hkl``: The miller indices as a list of integers. - ``d``: The d-spacing calculated from the miller indices. :math:``\\dfrac{2}{d^{2}} = \\dfrac{\\sin^2(\\theta)}{\\lambda^2}`` - ``resolution``: The resolution calculated from the d-spacing. For consistent names of columns/coordinates, the following columns are renamed: - ``wavelength_column_name`` -> ``'wavelength'`` - ``intensity_column_name`` -> ``'I'`` - ``intensity_sig_col_name`` -> ``'SIGI'`` Other columns are kept as they are. Notes ----- :class:`pandas.DataFrame` is used from loading to merging, but :class:`gemmi.Mtz` has :func:`gemmi.Mtz:calculate_d` that can derive the ``d`` using ``HKL``. This part of the method must be called on each mtz file separately. """ from .mtz_io import mtz_to_pandas orig_df = mtz_to_pandas(mtz) mtz_df = pd.DataFrame() # HKL should always be integer. mtz_df[["H", "K", "L"]] = orig_df[["H", "K", "L"]].astype(int) mtz_df["hkl"] = mtz_df[["H", "K", "L"]].values.tolist() def _calculate_d(row: pd.Series) -> float: return mtz.get_cell().calculate_d(row["hkl"]) mtz_df["d"] = mtz_df.apply(_calculate_d, axis=1) mtz_df["resolution"] = (1 / mtz_df["d"]) ** 2 / 4 mtz_df["wavelength"] = orig_df[wavelength_column_name] mtz_df[DEFAULT_INTENSITY_COLUMN_NAME] = orig_df[intensity_column_name] mtz_df[DEFAULT_STD_DEV_COLUMN_NAME] = orig_df[intensity_sig_col_name] # Keep other columns for column in [col for col in orig_df.columns if col not in mtz_df]: mtz_df[column] = orig_df[column] return MtzDataFrame(mtz_df)
[docs] def get_space_group_from_description(desc: SpaceGroupDesc) -> gemmi.SpaceGroup: """Retrieves spacegroup from parameter. Parameters ---------- desc: The space group description to use if not found in the mtz files. Returns ------- : The space group. """ return gemmi.SpaceGroup(desc)
[docs] def get_space_group_from_mtz(mtz: gemmi.Mtz) -> gemmi.SpaceGroup | None: """Retrieves spacegroup from file. Spacegroup is always expected in any MTZ files, but it may be missing. Parameters ---------- mtz: Raw mtz dataset. Returns ------- : The space group, or None if not found. """ return mtz.spacegroup
[docs] def get_unique_space_group(*spacegroups: gemmi.SpaceGroup | None) -> gemmi.SpaceGroup: """Retrieves the unique space group from multiple space groups. Parameters ---------- spacegroups: The space groups to check. Returns ------- : The unique space group. Raises ------ ValueError: If there are multiple space groups. """ spacegroups = [sgrp for sgrp in spacegroups if sgrp is not None] if len(spacegroups) == 0: raise ValueError("No space group found.") first = spacegroups[0] if all(sgrp == first for sgrp in spacegroups): return first raise ValueError(f"Multiple space groups found: {spacegroups}")
[docs] def get_reciprocal_asu(spacegroup: gemmi.SpaceGroup) -> gemmi.ReciprocalAsu: """Returns the reciprocal asymmetric unit from the space group.""" return gemmi.ReciprocalAsu(spacegroup)
[docs] def merge_mtz_dataframes(*mtz_dfs: MtzDataFrame) -> MtzDataFrame: """Merge multiple mtz dataframes into one.""" return MtzDataFrame(pd.concat(mtz_dfs, ignore_index=True))
[docs] def process_mtz_dataframe( *, mtz_df: MtzDataFrame, reciprocal_asu: gemmi.ReciprocalAsu, sg: gemmi.SpaceGroup, ) -> NMXMtzDataFrame: """Modify/Add columns of the shallow copy of a mtz dataframe. This method must be called after merging multiple mtz dataframe. """ df = mtz_df.copy(deep=False) def _reciprocal_asu(row: pd.Series) -> list[int]: """Converts miller indices(HKL) to ASU indices.""" return reciprocal_asu.to_asu(row["hkl"], sg.operations())[0] df["hkl_asu"] = df.apply(_reciprocal_asu, axis=1) # Unpack the indices for later. df[["H_ASU", "K_ASU", "L_ASU"]] = pd.DataFrame( df["hkl_asu"].to_list(), index=df.index ) return NMXMtzDataFrame(df)
[docs] def nmx_mtz_dataframe_to_scipp_dataarray( nmx_mtz_df: NMXMtzDataFrame, ) -> NMXMtzDataArray: """Converts the processed mtz dataframe to a scipp dataarray. The intensity, with column name :attr:`~DEFAULT_INTENSITY_COLUMN_NAME` becomes the data and the standard uncertainty of intensity, with column name :attr:`~DEFAULT_SIGMA_INTENSITY_COLUMN_NAME` becomes the variances of the data. Parameters ---------- nmx_mtz_df: The merged and processed mtz dataframe. Returns ------- : The scipp dataarray with the intensity and variances. The ``I`` column becomes the data and the squared ``SIGI`` column becomes the variances. Therefore they are not in the coordinates. Following coordinates are modified: - ``hkl``: The miller indices as a string. It is modified to have a string dtype since is no dtype that can represent this in scipp. - ``hkl_asu``: The asymmetric unit of miller indices as a string. This coordinate will be used to derive estimated scale factors. It is modified to have a string dtype as the same reason as why ``hkl`` coordinate is modified. Zero or negative intensities are removed from the dataarray. It can happen due to the post-processing of the data, e.g. background subtraction. """ from scipp.compat.pandas_compat import from_pandas_dataframe, parse_bracket_header to_scipp = nmx_mtz_df.copy(deep=False) # Convert to scipp Dataset nmx_mtz_ds = from_pandas_dataframe( to_scipp, data_columns=[ DEFAULT_INTENSITY_COLUMN_NAME, DEFAULT_STD_DEV_COLUMN_NAME, ], header_parser=parse_bracket_header, ) # Pop the indices columns. # TODO: We can put them back once we support tuple[int] dtype. # See https://github.com/scipp/scipp/issues/3046 for more details. # Temporarily, we will manually convert them to a string. # It is done on the scipp variable instead of the dataframe # since columns with string dtype are converted to PyObject dtype # instead of string by `from_pandas_dataframe`. for indices_name in ("hkl", "hkl_asu"): nmx_mtz_ds.coords[indices_name] = sc.array( dims=nmx_mtz_ds.coords[indices_name].dims, values=nmx_mtz_df[indices_name].astype(str).tolist(), # `astype`` is not enough to convert the dtype to string. # The result of `astype` will have `PyObject` as a dtype. ) # Add units nmx_mtz_ds.coords["wavelength"].unit = sc.units.angstrom for key in nmx_mtz_ds.keys(): nmx_mtz_ds[key].unit = sc.units.dimensionless # Add variances nmx_mtz_da = nmx_mtz_ds[DEFAULT_INTENSITY_COLUMN_NAME].copy(deep=False) nmx_mtz_da.variances = (nmx_mtz_ds[DEFAULT_STD_DEV_COLUMN_NAME].data ** 2).values # Return DataArray without negative intensities return NMXMtzDataArray(nmx_mtz_da[nmx_mtz_da.data > 0])
providers = ( read_mtz_file, process_single_mtz_to_dataframe, # get_space_group_from_description, get_space_group_from_mtz, get_reciprocal_asu, process_mtz_dataframe, nmx_mtz_dataframe_to_scipp_dataarray, ) """The providers related to the MTZ IO.""" default_parameters = { WavelengthColumnName: DEFAULT_WAVELENGTH_COLUMN_NAME, IntensityColumnName: DEFAULT_INTENSITY_COLUMN_NAME, StdDevColumnName: DEFAULT_STD_DEV_COLUMN_NAME, } """The parameters related to the MTZ IO."""