Coverage for install/scipp/compat/pandas_compat.py: 70%
63 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-11-17 01:51 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2024-11-17 01:51 +0000
1# SPDX-License-Identifier: BSD-3-Clause
2# Copyright (c) 2023 Scipp contributors (https://github.com/scipp)
4from __future__ import annotations
6from collections.abc import Callable, Iterable
7from typing import TYPE_CHECKING, Literal
9from ..core import DataArray, Dataset, Unit, UnitError, array
10from ..units import default_unit
12if TYPE_CHECKING:
13 import pandas as pd
16def _index_is_trivial(index: pd.Index, n_rows: int) -> bool:
17 from pandas import RangeIndex
19 return (
20 isinstance(index, RangeIndex)
21 and index.start == 0
22 and index.stop == n_rows
23 and index.step == 1
24 )
27def from_pandas_series(
28 se: pd.Series,
29 *,
30 include_trivial_index: bool = False,
31 header_parser: HeaderParserArg = None,
32) -> DataArray:
33 row_index = se.axes[0]
34 row_index_name = "row" if row_index.name is None else str(row_index.name)
35 name, unit = _parse_header("" if se.name is None else str(se.name), header_parser)
37 coords = (
38 {row_index_name: array(dims=[row_index_name], values=row_index)}
39 if include_trivial_index or not _index_is_trivial(row_index, len(se))
40 else {}
41 )
43 if se.dtype == "string":
44 # se.to_numpy() and np.array(se.values) produce an array of dtype=object
45 # when the series contains strings.
46 values = se.to_numpy(dtype=str)
47 else:
48 values = se.to_numpy()
49 return DataArray(
50 data=array(values=values, dims=[row_index_name], unit=unit),
51 coords=coords,
52 name=name,
53 )
56def from_pandas_dataframe(
57 df: pd.DataFrame,
58 *,
59 data_columns: str | Iterable[str] | None = None,
60 include_trivial_index: bool = False,
61 header_parser: HeaderParserArg = None,
62) -> Dataset:
63 import pandas as pd
65 columns = (
66 from_pandas_series(
67 pd.Series(df[column_name]),
68 include_trivial_index=include_trivial_index,
69 header_parser=header_parser,
70 )
71 for column_name in df.axes[1]
72 )
73 coords = {da.name: da for da in columns}
75 if data_columns is None:
76 data = coords
77 coords = {}
78 else:
79 if isinstance(data_columns, str):
80 data_columns = (data_columns,)
81 data = {name: coords.pop(name) for name in data_columns}
82 coords = {name: coord.data for name, coord in coords.items()}
84 return Dataset(data, coords=coords)
87def from_pandas(
88 pd_obj: pd.DataFrame | pd.Series,
89 *,
90 data_columns: str | Iterable[str] | None = None,
91 include_trivial_index: bool = False,
92 header_parser: HeaderParserArg = None,
93) -> DataArray | Dataset:
94 """Converts a pandas.DataFrame or pandas.Series object into a
95 scipp Dataset or DataArray respectively.
97 Parameters
98 ----------
99 pd_obj:
100 The Dataframe or Series to convert.
101 data_columns:
102 Select which columns to assign as data.
103 The rest are returned as coordinates.
104 If ``None``, all columns are assigned as data.
105 Use an empty list to assign all columns as coordinates.
106 include_trivial_index:
107 ``from_pandas`` can include the index of the data frame / series as a
108 coordinate.
109 But when the index is ``RangeIndex(start=0, stop=n, step=1)``, where ``n``
110 is the length of the data frame / series, the index is excluded by default.
111 Set this argument to ``True`` to include to index anyway in this case.
112 header_parser:
113 Parses each column header to extract a name and unit for each data array.
114 By default, it returns the column name and uses the default unit.
115 Builtin parsers can be specified by name:
117 - ``"bracket"``: See :func:`scipp.compat.pandas_compat.parse_bracket_header`.
118 Parses strings where the unit is given between square brackets,
119 i.e., strings like ``name [unit]``.
121 Before implementing a custom parser, check out
122 :func:`scipp.compat.pandas_compat.parse_bracket_header`
123 to get an overview of how to handle edge cases.
125 Returns
126 -------
127 :
128 The converted scipp object.
129 """
130 import pandas as pd
132 if isinstance(pd_obj, pd.DataFrame):
133 return from_pandas_dataframe(
134 pd_obj,
135 data_columns=data_columns,
136 include_trivial_index=include_trivial_index,
137 header_parser=header_parser,
138 )
139 elif isinstance(pd_obj, pd.Series):
140 return from_pandas_series(
141 pd_obj,
142 include_trivial_index=include_trivial_index,
143 header_parser=header_parser,
144 )
145 else:
146 raise ValueError(f"from_pandas: cannot convert type '{type(pd_obj)}'")
149HeaderParser = Callable[[str], tuple[str, Unit | None]]
150HeaderParserArg = Literal["bracket"] | HeaderParser | None
153def parse_bracket_header(head: str) -> tuple[str, Unit | None]:
154 """Parses strings of the form ``name [unit]``.
156 ``name`` may be any string that does not contain the character ``[``.
157 And ``unit`` must be a valid unit string to be parsed by ``sc.Unit(unit)``.
158 Whitespace between the name and unit is removed.
160 Both name and unit, including brackets, are optional.
161 If the unit is missing but empty brackets are present,
162 ``sc.units.default_unit`` is returned.
163 If the brackets are absent as well, the returned unit is ``None``.
164 This ensures that columns without unit information are not accidentally assigned
165 ``dimensionless`` which can silence downstream errors.
167 If the name is missing, an empty string is returned.
169 If the input does not conform to the expected pattern, it is returned in full
170 and the unit is returned as ``None``.
171 This happens, e.g., when there are multiple opening brackets (``[``).
173 If the string between brackets does not represent a valid unit, the full input
174 is returned as the name and the unit is returned as ``None``.
176 Parameters
177 ----------
178 head:
179 The string to parse.
181 Returns
182 -------
183 :
184 The parsed name and unit.
185 """
186 import re
188 m = re.match(r"^([^[]*)(?:\[([^[]*)])?$", head)
189 if m is None:
190 return head, None
192 if m.lastindex != 2:
193 return m[1], None
195 name = m[1].rstrip()
196 if m[2].strip():
197 try:
198 return name, Unit(m[2])
199 except UnitError:
200 return head, None
202 return name, default_unit
205_HEADER_PARSERS = {
206 "bracket": parse_bracket_header,
207}
210def _parse_header(header: str, parser: HeaderParserArg) -> tuple[str, Unit | None]:
211 if parser is None:
212 return header, default_unit
213 if callable(parser):
214 return parser(header)
215 if (parser := _HEADER_PARSERS.get(parser)) is not None:
216 return parser(header)
217 else:
218 raise ValueError(
219 f"Unknown header parser '{parser}', "
220 f"supported builtin parsers: {list(_HEADER_PARSERS.keys())}."
221 )