DMSC Integration Testing

Last updated: December 16, 2025 22:41:25

Test: nexusjsontemplate|estia|nexus_json_template|

None

None

None

None

None

None

None

None

None

None

None

None

None

None

None

None

None

None

None

None

None

None

None

None

None

None

None

None

None

None

None

None

None

None

None

None

None

None

None

None

None

None

None

None

None

None

None

nexus_json_template = PosixPath('estia-dynamic.json')

def test_nexus_json_template(nexus_json_template: pathlib.Path) -> None:
downloaded = json.loads(nexus_json_template.read_text())
assert "children" in downloaded # Check if the file was downloaded correctly
# collect_streaming_modules will validate the structure
# that is required by `esslivedata`.
# It is currently removed from `esslivedata` package.
# We should update the method that is used by `esslivedata` once it is added back
# or updated in `esslivedata`.
> collect_streaming_modules(downloaded)

tests/nexusjsontemplate/nexus_json_template_test.py:201:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
tests/nexusjsontemplate/nexus_json_template_test.py:174: in collect_streaming_modules
_validate_module_keys(key_value_pairs)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

key_value_pairs = ((StreamModuleKey(module_type='ev44', topic='estia_detector', source='multiblade'), StreamModuleValue(path=('entry', '...[{'name': 'NX_class', 'values': 'NXlog'}, {'name': 'default', 'values': 'time'}]}, dtype=None, value_units=None)), ...)

def _validate_module_keys(
key_value_pairs: tuple[tuple[StreamModuleKey, StreamModuleValue], ...],
) -> None:
"""Validate the module keys."""
from collections import Counter

key_counts = Counter([key for key, _ in key_value_pairs])
duplicated_keys = [key for key, count in key_counts.items() if count > 1]
if duplicated_keys:
> raise InvalidNexusStructureError(
"Duplicate module place holder(s) found in the nexus structure: ",
duplicated_keys,
)
E tests.nexusjsontemplate.nexus_json_template_test.InvalidNexusStructureError: ('Duplicate module place holder(s) found in the nexus structure: ', [StreamModuleKey(module_type='f144', topic='estia_choppers', source='tbd')])

tests/nexusjsontemplate/nexus_json_template_test.py:106: InvalidNexusStructureError

nexus_json_template = PosixPath('estia-dynamic.json')

def test_nexus_json_template(nexus_json_template: pathlib.Path) -> None:
downloaded = json.loads(nexus_json_template.read_text())
assert "children" in downloaded # Check if the file was downloaded correctly
# collect_streaming_modules will validate the structure
# that is required by `esslivedata`.
# It is currently removed from `esslivedata` package.
# We should update the method that is used by `esslivedata` once it is added back
# or updated in `esslivedata`.
> collect_streaming_modules(downloaded)

tests/nexusjsontemplate/nexus_json_template_test.py:201:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
tests/nexusjsontemplate/nexus_json_template_test.py:174: in collect_streaming_modules
_validate_module_keys(key_value_pairs)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

key_value_pairs = ((StreamModuleKey(module_type='ev44', topic='estia_detector', source='multiblade'), StreamModuleValue(path=('entry', '...[{'name': 'NX_class', 'values': 'NXlog'}, {'name': 'default', 'values': 'time'}]}, dtype=None, value_units=None)), ...)

def _validate_module_keys(
key_value_pairs: tuple[tuple[StreamModuleKey, StreamModuleValue], ...],
) -> None:
"""Validate the module keys."""
from collections import Counter

key_counts = Counter([key for key, _ in key_value_pairs])
duplicated_keys = [key for key, count in key_counts.items() if count > 1]
if duplicated_keys:
> raise InvalidNexusStructureError(
"Duplicate module place holder(s) found in the nexus structure: ",
duplicated_keys,
)
E tests.nexusjsontemplate.nexus_json_template_test.InvalidNexusStructureError: ('Duplicate module place holder(s) found in the nexus structure: ', [StreamModuleKey(module_type='f144', topic='estia_choppers', source='tbd')])

tests/nexusjsontemplate/nexus_json_template_test.py:106: InvalidNexusStructureError

nexus_json_template = PosixPath('estia-dynamic.json')

def test_nexus_json_template(nexus_json_template: pathlib.Path) -> None:
downloaded = json.loads(nexus_json_template.read_text())
assert "children" in downloaded # Check if the file was downloaded correctly
# collect_streaming_modules will validate the structure
# that is required by `esslivedata`.
# It is currently removed from `esslivedata` package.
# We should update the method that is used by `esslivedata` once it is added back
# or updated in `esslivedata`.
> collect_streaming_modules(downloaded)

tests/nexusjsontemplate/nexus_json_template_test.py:201:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
tests/nexusjsontemplate/nexus_json_template_test.py:174: in collect_streaming_modules
_validate_module_keys(key_value_pairs)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

key_value_pairs = ((StreamModuleKey(module_type='ev44', topic='estia_detector', source='multiblade'), StreamModuleValue(path=('entry', '...[{'name': 'NX_class', 'values': 'NXlog'}, {'name': 'default', 'values': 'time'}]}, dtype=None, value_units=None)), ...)

def _validate_module_keys(
key_value_pairs: tuple[tuple[StreamModuleKey, StreamModuleValue], ...],
) -> None:
"""Validate the module keys."""
from collections import Counter

key_counts = Counter([key for key, _ in key_value_pairs])
duplicated_keys = [key for key, count in key_counts.items() if count > 1]
if duplicated_keys:
> raise InvalidNexusStructureError(
"Duplicate module place holder(s) found in the nexus structure: ",
duplicated_keys,
)
E tests.nexusjsontemplate.nexus_json_template_test.InvalidNexusStructureError: ('Duplicate module place holder(s) found in the nexus structure: ', [StreamModuleKey(module_type='f144', topic='estia_choppers', source='tbd')])

tests/nexusjsontemplate/nexus_json_template_test.py:106: InvalidNexusStructureError