Zoom Polarization Analysis#
Introduction#
[ ]:
%matplotlib widget
import plopp
import sciline
import scipp as sc
from ess import polarization as pol
from ess import sans
from ess import isissans as isis
import ess.isissans.data # noqa: F401
from ess.sans.types import *
[ ]:
from pathlib import Path
data_folder = Path('zoom-data-2025-07')
# Runs with analyzer at 4 different times
cell_runs = [
str(data_folder / '3He_unpolarized_in' / f'ZOOM00038{run}.nxs')
for run in [423, 425, 427, 429, 431, 433, 435, 437]
]
empty_run = data_folder / 'Background_files' / 'ZOOM00038336.nxs'
depolarized_run = data_folder / 'Background_files' / 'ZOOM00038470.nxs'
cell_runs = [*cell_runs, depolarized_run]
Setup SANS workflow for computing transmission fraction#
[ ]:
params = {
WavelengthBins: sc.geomspace(
'wavelength', start=1.75, stop=16.5, num=141, unit='Å'
),
isis.mantidio.CalibrationWorkspace: None,
# Spectrum numbers used for histogram files (Workspace2D)
isis.MonitorSpectrumNumber[Incident]: isis.MonitorSpectrumNumber[Incident](3),
isis.MonitorSpectrumNumber[Transmission]: isis.MonitorSpectrumNumber[Transmission](
4
),
# Monitor names used for event files (EventWorkspace)
NeXusMonitorName[Incident]: NeXusMonitorName[Incident]("monitor3"),
NeXusMonitorName[Transmission]: NeXusMonitorName[Transmission]("monitor4"),
UncertaintyBroadcastMode: UncertaintyBroadcastMode.upper_bound,
}
[ ]:
from ess.polarization.zoom import ZoomTransmissionFractionWorkflow
sans_workflow = ZoomTransmissionFractionWorkflow(cell_runs)
for key, value in params.items():
sans_workflow[key] = value
sans_workflow[isis.mantidio.Period] = 0
sans_workflow[Filename[EmptyBeamRun]] = str(empty_run)
Inspect data for one of the runs with analyzer cell#
[ ]:
loaded = sciline.get_mapped_node_names(
sans_workflow, isis.mantidio.LoadedFileContents[TransmissionRun[SampleRun]]
)
# Load only first run
first_run = sans_workflow.compute(loaded[0])
sc.DataGroup(sc.collapse(first_run['data'], keep='tof')).plot()
[ ]:
first_run
We can load the combined time-dependent incident and transmission monitors. Note that the last run is the depolarized run:
[ ]:
mons = sans_workflow.compute(
(
NeXusComponent[Incident, TransmissionRun[SampleRun]],
NeXusComponent[Transmission, TransmissionRun[SampleRun]],
)
)
mons = sc.DataGroup(
incident=mons[NeXusComponent[Incident, TransmissionRun[SampleRun]]]['data'],
transmission=mons[NeXusComponent[Transmission, TransmissionRun[SampleRun]]]['data'],
)
display(sc.DataGroup(sc.collapse(mons['incident'], keep='tof')).plot())
display(sc.DataGroup(sc.collapse(mons['transmission'], keep='tof')).plot())
The task graph for computing the transmission fraction is:
[ ]:
sans_workflow.visualize(
TransmissionFraction[SampleRun], graph_attr={'rankdir': 'LR'}, compact=True
)
Compute transmission fractions#
There are multiple files which together define the time-dependence of the analyzer cell transmission. Note that as before the final run (time) is the depolarized run:
[ ]:
raw_transmission = sans_workflow.compute(TransmissionFraction[SampleRun])
We can plot the computed transmission fractions:
[ ]:
transmission_depolarized = raw_transmission['time', -1].copy()
transmission = raw_transmission['time', :-1].copy()
trans = sc.DataGroup(
{f"{time:c}": transmission['time', time] for time in transmission.coords['time']}
)
trans['depolarized'] = transmission_depolarized
display(trans.plot())
[ ]:
# Sanity check: Where can cosh yield values that can be fitted?
transmission_empty_glass = 0.9 * sc.Unit('dimensionless')
wavelength = sc.midpoints(transmission.coords['wavelength'])
opacity0 = 0.8797823016804095 * sc.Unit('1/angstrom')
(
sc.acosh(
sc.values(transmission)
* sc.exp(opacity0 * wavelength)
/ transmission_empty_glass
)
/ (opacity0 * wavelength)
).plot()
[ ]:
wav_min = 1.75 * sc.Unit('angstrom')
wav_max = 16.5 * sc.Unit('angstrom')
transmission_truncated = raw_transmission['wavelength', wav_min:wav_max]
transmission_depolarized = transmission_truncated['time', -1].copy()
transmission = transmission_truncated['time', :-1].copy()
We can now setup the polarization analysis workflow. The previously computed transmission fractions are used as workflow inputs:
[ ]:
he3_workflow = pol.he3.He3CellWorkflow(in_situ=False, incoming_polarized=True)
# TODO Is plus correct here, this is period 0? Do we also have minus data?
he3_workflow[pol.he3.He3AnalyzerTransmissionFractionParallel] = transmission
# TODO Fake empty transmission for now, would need to load different period
he3_workflow[pol.he3.He3AnalyzerTransmissionFractionAntiParallel] = transmission[
'time', 0:0
]
he3_workflow[
pol.he3.He3CellTransmissionFractionIncomingUnpolarized[
pol.Analyzer, pol.Depolarized
]
] = transmission_depolarized
# When in_situ=False, these params are used as starting guess for the fit
he3_workflow[pol.he3.He3CellLength[pol.Analyzer]] = 0.1 * sc.Unit('m')
he3_workflow[pol.he3.He3CellPressure[pol.Analyzer]] = 1.0 * sc.Unit('bar')
he3_workflow[pol.he3.He3CellTemperature[pol.Analyzer]] = 300.0 * sc.Unit('K')
he3_workflow[pol.he3.He3TransmissionEmptyGlass[pol.Analyzer]] = transmission_empty_glass
he3_workflow.visualize(
pol.TransmissionFunction[pol.Analyzer], graph_attr={'rankdir': 'LR'}
)
The workflow can compute the transmission function:
[ ]:
func = he3_workflow.compute(pol.TransmissionFunction[pol.Analyzer])
We can evaluate this transmission function at desired time and wavelength points:
[ ]:
wavelength = sc.linspace('wavelength', start=2, stop=16.0, num=141, unit='angstrom')
time = sc.linspace('time', start=0, stop=100000, num=101, unit='s')
display(func.opacity_function(wavelength=wavelength).plot())
display(func.polarization_function(time=time).plot())
display(func(wavelength=wavelength, time=time, plus_minus='plus').plot(norm='log'))
display(func(wavelength=wavelength, time=time, plus_minus='minus').plot(norm='log'))
[ ]:
trans = sc.DataArray(
func(wavelength=wavelength, time=time, plus_minus='plus'),
coords={
'wavelength': wavelength,
'time': time,
},
)
sc.DataGroup(
{f"{time:c}": trans['time', time] for time in trans.coords['time'][::20]}
).plot(norm='linear', linestyle='solid', marker=None)
[ ]:
trans = sc.DataArray(
func(wavelength=wavelength, time=time, plus_minus='plus'),
coords={
'wavelength': wavelength,
'time': time,
},
)
sc.DataGroup(
{f"{wav:c}": trans['wavelength', wav] for wav in trans.coords['wavelength'][::20]}
).plot(norm='log', linestyle='solid', marker=None)
[ ]:
func.opacity_function.opacity0
[ ]:
func.polarization_function.C
[ ]:
func.polarization_function.T1
Sample data#
We now proceed to process the sample data. We use the Zoom workflow from ESSsans, configured as follows:
[ ]:
def make_sample_workflow() -> sciline.Pipeline:
wf = isis.zoom.ZoomWorkflow()
wf.insert(isis.io.transmission_from_background_run)
wf.insert(isis.io.transmission_from_sample_run)
# TODO Are these masks correct? Provide correct XML files!
masks = isis.data.zoom_tutorial_mask_filenames()
wf = sans.with_pixel_mask_filenames(wf, masks)
for key, value in params.items():
wf[key] = value
wf[QBins] = sc.geomspace(dim='Q', start=0.004, stop=0.8, num=141, unit='1/angstrom')
wf[QxBins] = sc.linspace('Qx', start=-0.3, stop=0.3, num=101, unit='1/angstrom')
wf[QyBins] = sc.linspace('Qy', start=-0.3, stop=0.3, num=101, unit='1/angstrom')
wf[NonBackgroundWavelengthRange] = sc.array(
dims=['wavelength'], values=[0.7, 17.1], unit='angstrom'
)
wf[CorrectForGravity] = True
wf[UncertaintyBroadcastMode] = UncertaintyBroadcastMode.upper_bound
wf[ReturnEvents] = True
# TODO Configure offsets and beam center correctly!
# sample_workflow[isis.SampleOffset] = sc.vector([0.0, 0.0, 0.11], unit='m')
# sample_workflow[isis.DetectorBankOffset] = sc.vector([0.0, 0.0, 0.5], unit='m')
wf[BeamCenter] = sc.vector([0.0, 0.0, 0.0], unit='m')
# TODO Use direct beam!
# sample_workflow[DirectBeamFilename]
wf[DirectBeam] = None
return wf
The sample runs are event data, so we can simply merge all runs to obtain a single event-mode \(I(Q)\). We use sans.with_sample_runs to obtain a workflow that merges the runs and computes the \(I(Q)\):
[ ]:
water_folder = data_folder / 'Sample_water_cell3'
sample_runs = [str(water_folder / f'ZOOM00038{run}.nxs') for run in range(439, 466, 2)]
[ ]:
sample_workflow = make_sample_workflow()
sample_workflow[Filename[EmptyBeamRun]] = str(empty_run)
multi_run_sample_workflow = sans.with_sample_runs(sample_workflow, runs=sample_runs)
The sample workflow looks like this:
[ ]:
multi_run_sample_workflow.visualize(
IofQ[SampleRun], graph_attr={'rankdir': 'LR'}, compact=True
)
ESSsans does not support processing multiple periods (and there are no plans for this, as this is ISIS-specific). We therefore run the workflow once per period. Note that is is inefficient as it loads every file four times.
[ ]:
# WARNING: Running this cell takes a couple of minutes when all sample runs are used.
# TODO Is this correct? We get very large times. Are we using the correct transmission
# runs above?
time_base = transmission.coords['datetime'].min()
periods = []
for period in range(4):
multi_run_sample_workflow[isis.mantidio.Period] = period
iofq = multi_run_sample_workflow.compute(IofQxy[SampleRun])
iofq = iofq.bins.assign_coords(
time=iofq.bins.coords['pulse_time'].to(unit=time_base.unit) - time_base
).drop_coords('wavelength') # Wavelength bounds get in the way
periods.append(iofq)
[ ]:
plopp.slicer(
sc.values(sc.concat(periods, 'period').hist(pulse_time=10)), keep=('Qx', 'Qy')
)
Correction workflow#
In the previous section we have setup the workflow for the analyzer. We also computed the transmission function there, but in production this will be done implicitly by running the entire workflow we will setup here. We can combine this with the workflow for the polarizer to obtain the full correction workflow:
[ ]:
supermirror_workflow = pol.SupermirrorWorkflow()
supermirror_workflow.visualize(pol.TransmissionFunction[pol.Polarizer])
We will use a second-order polynomial supermirror efficiency function:
[ ]:
from ess.polarization.data import example_polarization_efficiency_table
supermirror_workflow[pol.SupermirrorEfficiencyFunction[pol.Polarizer]] = (
pol.EfficiencyLookupTable.from_file(
example_polarization_efficiency_table(),
wavelength_colname='# X ',
efficiency_colname=' Y ',
)
)
[ ]:
workflow = pol.PolarizationAnalysisWorkflow(
polarizer_workflow=supermirror_workflow,
analyzer_workflow=he3_workflow,
)
For a single channel, the complete workflow looks as follows:
[ ]:
workflow.visualize(
pol.PolarizationCorrectedData[pol.Up, pol.Up], graph_attr={'rankdir': 'LR'}
)
We can now set the reduced sample data for each measured channel:
[ ]:
workflow[pol.ReducedSampleDataBySpinChannel[pol.Up, pol.Up]] = periods[0]
workflow[pol.ReducedSampleDataBySpinChannel[pol.Down, pol.Up]] = periods[1]
workflow[pol.ReducedSampleDataBySpinChannel[pol.Up, pol.Down]] = periods[2]
workflow[pol.ReducedSampleDataBySpinChannel[pol.Down, pol.Down]] = periods[3]
The result for a single input channel (up-up) is:
[ ]:
channels = workflow.compute(pol.PolarizationCorrectedData[pol.Up, pol.Up])
tiled = plopp.tiled(2, 2)
tiled[0, 0] = channels.upup.nanhist().plot()
tiled[0, 1] = channels.updown.nanhist().plot()
tiled[1, 0] = channels.downup.nanhist().plot()
tiled[1, 1] = channels.downdown.nanhist().plot()
tiled
The combination of all four channels is:
[ ]:
channels = workflow.compute(pol.TotalPolarizationCorrectedData)
tiled = plopp.tiled(2, 2)
tiled[0, 0] = channels.upup.nanhist().plot()
tiled[0, 1] = channels.updown.nanhist().plot()
tiled[1, 0] = channels.downup.nanhist().plot()
tiled[1, 1] = channels.downdown.nanhist().plot()
tiled