Coverage for install/scipp/compat/pandas_compat.py: 70%

63 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-12-01 01:59 +0000

1# SPDX-License-Identifier: BSD-3-Clause 

2# Copyright (c) 2023 Scipp contributors (https://github.com/scipp) 

3 

4from __future__ import annotations 

5 

6from collections.abc import Callable, Iterable 

7from typing import TYPE_CHECKING, Literal 

8 

9from ..core import DataArray, Dataset, Unit, UnitError, array 

10from ..units import default_unit 

11 

12if TYPE_CHECKING: 

13 import pandas as pd 

14 

15 

16def _index_is_trivial(index: pd.Index, n_rows: int) -> bool: 

17 from pandas import RangeIndex 

18 

19 return ( 

20 isinstance(index, RangeIndex) 

21 and index.start == 0 

22 and index.stop == n_rows 

23 and index.step == 1 

24 ) 

25 

26 

27def from_pandas_series( 

28 se: pd.Series, 

29 *, 

30 include_trivial_index: bool = False, 

31 header_parser: HeaderParserArg = None, 

32) -> DataArray: 

33 row_index = se.axes[0] 

34 row_index_name = "row" if row_index.name is None else str(row_index.name) 

35 name, unit = _parse_header("" if se.name is None else str(se.name), header_parser) 

36 

37 coords = ( 

38 {row_index_name: array(dims=[row_index_name], values=row_index)} 

39 if include_trivial_index or not _index_is_trivial(row_index, len(se)) 

40 else {} 

41 ) 

42 

43 if se.dtype == "string": 

44 # se.to_numpy() and np.array(se.values) produce an array of dtype=object 

45 # when the series contains strings. 

46 values = se.to_numpy(dtype=str) 

47 else: 

48 values = se.to_numpy() 

49 return DataArray( 

50 data=array(values=values, dims=[row_index_name], unit=unit), 

51 coords=coords, 

52 name=name, 

53 ) 

54 

55 

56def from_pandas_dataframe( 

57 df: pd.DataFrame, 

58 *, 

59 data_columns: str | Iterable[str] | None = None, 

60 include_trivial_index: bool = False, 

61 header_parser: HeaderParserArg = None, 

62) -> Dataset: 

63 import pandas as pd 

64 

65 columns = ( 

66 from_pandas_series( 

67 pd.Series(df[column_name]), 

68 include_trivial_index=include_trivial_index, 

69 header_parser=header_parser, 

70 ) 

71 for column_name in df.axes[1] 

72 ) 

73 coords = {da.name: da for da in columns} 

74 

75 if data_columns is None: 

76 data = coords 

77 coords = {} 

78 else: 

79 if isinstance(data_columns, str): 

80 data_columns = (data_columns,) 

81 data = {name: coords.pop(name) for name in data_columns} 

82 coords = {name: coord.data for name, coord in coords.items()} 

83 

84 return Dataset(data, coords=coords) 

85 

86 

87def from_pandas( 

88 pd_obj: pd.DataFrame | pd.Series, 

89 *, 

90 data_columns: str | Iterable[str] | None = None, 

91 include_trivial_index: bool = False, 

92 header_parser: HeaderParserArg = None, 

93) -> DataArray | Dataset: 

94 """Converts a pandas.DataFrame or pandas.Series object into a 

95 scipp Dataset or DataArray respectively. 

96 

97 Parameters 

98 ---------- 

99 pd_obj: 

100 The Dataframe or Series to convert. 

101 data_columns: 

102 Select which columns to assign as data. 

103 The rest are returned as coordinates. 

104 If ``None``, all columns are assigned as data. 

105 Use an empty list to assign all columns as coordinates. 

106 include_trivial_index: 

107 ``from_pandas`` can include the index of the data frame / series as a 

108 coordinate. 

109 But when the index is ``RangeIndex(start=0, stop=n, step=1)``, where ``n`` 

110 is the length of the data frame / series, the index is excluded by default. 

111 Set this argument to ``True`` to include to index anyway in this case. 

112 header_parser: 

113 Parses each column header to extract a name and unit for each data array. 

114 By default, it returns the column name and uses the default unit. 

115 Builtin parsers can be specified by name: 

116 

117 - ``"bracket"``: See :func:`scipp.compat.pandas_compat.parse_bracket_header`. 

118 Parses strings where the unit is given between square brackets, 

119 i.e., strings like ``name [unit]``. 

120 

121 Before implementing a custom parser, check out 

122 :func:`scipp.compat.pandas_compat.parse_bracket_header` 

123 to get an overview of how to handle edge cases. 

124 

125 Returns 

126 ------- 

127 : 

128 The converted scipp object. 

129 """ 

130 import pandas as pd 

131 

132 if isinstance(pd_obj, pd.DataFrame): 

133 return from_pandas_dataframe( 

134 pd_obj, 

135 data_columns=data_columns, 

136 include_trivial_index=include_trivial_index, 

137 header_parser=header_parser, 

138 ) 

139 elif isinstance(pd_obj, pd.Series): 

140 return from_pandas_series( 

141 pd_obj, 

142 include_trivial_index=include_trivial_index, 

143 header_parser=header_parser, 

144 ) 

145 else: 

146 raise ValueError(f"from_pandas: cannot convert type '{type(pd_obj)}'") 

147 

148 

149HeaderParser = Callable[[str], tuple[str, Unit | None]] 

150HeaderParserArg = Literal["bracket"] | HeaderParser | None 

151 

152 

153def parse_bracket_header(head: str) -> tuple[str, Unit | None]: 

154 """Parses strings of the form ``name [unit]``. 

155 

156 ``name`` may be any string that does not contain the character ``[``. 

157 And ``unit`` must be a valid unit string to be parsed by ``sc.Unit(unit)``. 

158 Whitespace between the name and unit is removed. 

159 

160 Both name and unit, including brackets, are optional. 

161 If the unit is missing but empty brackets are present, 

162 ``sc.units.default_unit`` is returned. 

163 If the brackets are absent as well, the returned unit is ``None``. 

164 This ensures that columns without unit information are not accidentally assigned 

165 ``dimensionless`` which can silence downstream errors. 

166 

167 If the name is missing, an empty string is returned. 

168 

169 If the input does not conform to the expected pattern, it is returned in full 

170 and the unit is returned as ``None``. 

171 This happens, e.g., when there are multiple opening brackets (``[``). 

172 

173 If the string between brackets does not represent a valid unit, the full input 

174 is returned as the name and the unit is returned as ``None``. 

175 

176 Parameters 

177 ---------- 

178 head: 

179 The string to parse. 

180 

181 Returns 

182 ------- 

183 : 

184 The parsed name and unit. 

185 """ 

186 import re 

187 

188 m = re.match(r"^([^[]*)(?:\[([^[]*)])?$", head) 

189 if m is None: 

190 return head, None 

191 

192 if m.lastindex != 2: 

193 return m[1], None 

194 

195 name = m[1].rstrip() 

196 if m[2].strip(): 

197 try: 

198 return name, Unit(m[2]) 

199 except UnitError: 

200 return head, None 

201 

202 return name, default_unit 

203 

204 

205_HEADER_PARSERS = { 

206 "bracket": parse_bracket_header, 

207} 

208 

209 

210def _parse_header(header: str, parser: HeaderParserArg) -> tuple[str, Unit | None]: 

211 if parser is None: 

212 return header, default_unit 

213 if callable(parser): 

214 return parser(header) 

215 if (parser := _HEADER_PARSERS.get(parser)) is not None: 

216 return parser(header) 

217 else: 

218 raise ValueError( 

219 f"Unknown header parser '{parser}', " 

220 f"supported builtin parsers: {list(_HEADER_PARSERS.keys())}." 

221 )