[docs]deffwhm_to_std(fwhm:sc.Variable)->sc.Variable:""" Convert from full-width half maximum to standard deviation. Parameters ---------- fwhm: Full-width half maximum. Returns ------- : Standard deviation. """# Enables the conversion from full width half# maximum to standard deviationreturnfwhm/_STD_TO_FWHM
[docs]deflinlogspace(dim:str,edges:list|np.ndarray,scale:list|str,num:list|int,unit:str|None=None,)->sc.Variable:""" Generate a 1d array of bin edges with a mixture of linear and/or logarithmic spacings. Examples: - Create linearly spaced edges (equivalent to `sc.linspace`): linlogspace(dim='x', edges=[0.008, 0.08], scale='linear', num=50, unit='m') - Create logarithmically spaced edges (equivalent to `sc.geomspace`): linlogspace(dim='x', edges=[0.008, 0.08], scale='log', num=50, unit='m') - Create edges with a linear and a logarithmic part: linlogspace(dim='x', edges=[1, 3, 8], scale=['linear', 'log'], num=[16, 20]) Parameters ---------- dim: The dimension of the output Variable. edges: The edges for the different parts of the mesh. scale: A string or list of strings specifying the scaling for the different parts of the mesh. Possible values for the scaling are `"linear"` and `"log"`. If a list is supplied, the length of the list must be one less than the length of the `edges` parameter. num: An integer or a list of integers specifying the number of points to use in each part of the mesh. If a list is supplied, the length of the list must be one less than the length of the `edges` parameter. unit: The unit of the output Variable. Returns ------- : Lin-log spaced Q-bin edges. """ifnotisinstance(scale,list):scale=[scale]ifnotisinstance(num,list):num=[num]iflen(scale)!=len(edges)-1:raiseValueError("Sizes do not match. The length of edges should be one ""greater than scale.")funcs={"linear":sc.linspace,"log":sc.geomspace}grids=[]foriinrange(len(edges)-1):# Skip the leading edge in the piece when concatenatingstart=int(i>0)mesh=funcs[scale[i]](dim=dim,start=edges[i],stop=edges[i+1],num=num[i]+start,unit=unit)grids.append(mesh[dim,start:])returnsc.concat(grids,dim)
def_sort_by(a,by):return[xforx,_insorted(zip(a,by,strict=True),key=lambdax:x[1])]def_find_interval_overlaps(intervals):'''Returns the intervals where at least two or more of the provided intervals are overlapping.'''edges=list(chain.from_iterable(intervals))is_start_edge=list(chain.from_iterable((True,False)for_inintervals))edges_sorted=sorted(edges)is_start_edge_sorted=_sort_by(is_start_edge,edges)number_overlapping=0overlap_intervals=[]forx,is_startinzip(edges_sorted,is_start_edge_sorted,strict=True):ifnumber_overlapping==1andis_start:start=xifnumber_overlapping==2andnotis_start:overlap_intervals.append((start,x))ifis_start:number_overlapping+=1else:number_overlapping-=1returnoverlap_intervalsdef_searchsorted(a,v):fori,einenumerate(a):ife>v:returnireturnlen(a)def_create_qgrid_where_overlapping(qgrids):'''Given a number of Q-grids, construct a new grid covering the regions where (any two of the) provided grids overlap.'''pieces=[]forstart,endin_find_interval_overlaps([(q.min(),q.max())forqinqgrids]):interval_sliced_from_qgrids=[q[max(_searchsorted(q,start)-1,0):_searchsorted(q,end)+1]forqinqgrids]densest_grid_in_interval=max(interval_sliced_from_qgrids,key=len)pieces.append(densest_grid_in_interval)returnsc.concat(pieces,dim='Q')def_same_dtype(arrays):return[arr.to(dtype='float64')forarrinarrays]def_interpolate_on_qgrid(curves,grid):returnsc.concat(_same_dtype([sc.lookup(c,grid.dim)[sc.midpoints(grid)]forcincurves]),dim='curves',)
[docs]defscale_reflectivity_curves_to_overlap(curves:Sequence[sc.DataArray],critical_edge_interval:tuple[sc.Variable,sc.Variable]|None=None,)->tuple[list[sc.DataArray],list[sc.Variable]]:'''Make the curves overlap by scaling all except the first by a factor. The scaling factors are determined by a maximum likelihood estimate (assuming the errors are normal distributed). If :code:`critical_edge_interval` is provided then all curves are scaled. All curves must be have the same unit for data and the Q-coordinate. Parameters --------- curves: the reflectivity curves that should be scaled together critical_edge_interval: a tuple denoting an interval that is known to belong to the critical edge, i.e. where the reflectivity is known to be 1. Returns --------- : A list of scaled reflectivity curves and a list of the scaling factors. '''ifcritical_edge_intervalisnotNone:q=next(iter(curves)).coords['Q']N=(((q>=critical_edge_interval[0])&(q<critical_edge_interval[1])).sum().value)edge=sc.DataArray(data=sc.ones(dims=('Q',),shape=(N,),with_variances=True),coords={'Q':sc.linspace('Q',*critical_edge_interval,N+1)},)curves,factors=scale_reflectivity_curves_to_overlap([edge,*curves])returncurves[1:],factors[1:]iflen({c.data.unitforcincurves})!=1:raiseValueError('The reflectivity curves must have the same unit')iflen({c.coords['Q'].unitforcincurves})!=1:raiseValueError('The Q-coordinates must have the same unit for each curve')qgrid=_create_qgrid_where_overlapping([c.coords['Q']forcincurves])r=_interpolate_on_qgrid(map(sc.values,curves),qgrid).valuesv=_interpolate_on_qgrid(map(sc.variances,curves),qgrid).valuesdefcost(scaling_factors):scaling_factors=np.concatenate([[1.0],scaling_factors])[:,None]r_scaled=scaling_factors*rv_scaled=scaling_factors**2*vv_scaled[v_scaled==0]=np.naninv_v_scaled=1/v_scaledr_avg=np.nansum(r_scaled*inv_v_scaled,axis=0)/np.nansum(inv_v_scaled,axis=0)returnnp.nansum((r_scaled-r_avg)**2*inv_v_scaled)sol=opt.minimize(cost,[1.0]*(len(curves)-1))scaling_factors=(1.0,*map(float,sol.x))return[scaling_factor*curveforscaling_factor,curveinzip(scaling_factors,curves,strict=True)],scaling_factors
[docs]defcombine_curves(curves:Sequence[sc.DataArray],q_bin_edges:sc.Variable|None=None,)->sc.DataArray:'''Combines the given curves by interpolating them on a 1d grid defined by :code:`q_bin_edges` and averaging over the provided reflectivity curves. The averaging is done using a weighted mean where the weights are proportional to the variances. Unless the curves are already scaled correctly they might need to be scaled using :func:`scale_reflectivity_curves_to_overlap` before calling this function. All curves must be have the same unit for data and the Q-coordinate. Parameters ---------- curves: the reflectivity curves that should be combined q_bin_edges: the Q bin edges of the resulting combined reflectivity curve Returns --------- : A data array representing the combined reflectivity curve '''iflen({c.data.unitforcincurves})!=1:raiseValueError('The reflectivity curves must have the same unit')iflen({c.coords['Q'].unitforcincurves})!=1:raiseValueError('The Q-coordinates must have the same unit for each curve')r=_interpolate_on_qgrid(map(sc.values,curves),q_bin_edges).valuesv=_interpolate_on_qgrid(map(sc.variances,curves),q_bin_edges).valuesv[v==0]=np.naninv_v=1.0/vr_avg=np.nansum(r*inv_v,axis=0)/np.nansum(inv_v,axis=0)v_avg=1/np.nansum(inv_v,axis=0)returnsc.DataArray(data=sc.array(dims='Q',values=r_avg,variances=v_avg,unit=next(iter(curves)).data.unit,),coords={'Q':q_bin_edges},)
[docs]deforso_datasets_from_measurements(workflow:sciline.Pipeline,runs:Sequence[Mapping[type,Any]],*,scale_to_overlap:bool=True,)->list[OrsoDataset]:'''Produces a list of ORSO datasets containing one reflectivity curve for each of the provided runs. Each entry of :code:`runs` is a mapping of parameters and values needed to produce the dataset. Optionally, the reflectivity curves can be scaled to overlap in the regions where they have the same Q-value. Parameters ----------- workflow: The sciline workflow used to compute `ReflectivityOverQ` for each of the runs. runs: The sciline parameters to be used for each run scale_to_overlap: If True the curves will be scaled to overlap. Note that the curve of the first run is unscaled and the rest are scaled to match it. Returns --------- list of the computed ORSO datasets, containing one reflectivity curve each '''reflectivity_curves=[]forparametersinruns:wf=workflow.copy()forname,valueinparameters.items():wf[name]=valuereflectivity_curves.append(wf.compute(ReflectivityOverQ))scale_factors=(scale_reflectivity_curves_to_overlap([r.hist()forrinreflectivity_curves])[1]ifscale_to_overlapelse(1,)*len(runs))datasets=[]forparameters,curve,scale_factorinzip(runs,reflectivity_curves,scale_factors,strict=True):wf=workflow.copy()forname,valueinparameters.items():wf[name]=valuewf[ReflectivityOverQ]=scale_factor*curvedataset=wf.compute(orso.OrsoIofQDataset)datasets.append(dataset)returndatasets