Source code for ess.reduce.nexus.json_nexus

"""Adapter for loading NeXus files encoded as JSON."""

from __future__ import annotations

from collections.abc import Mapping
from typing import Any

import numpy as np
import scippnexus as snx

_nexus_class = "NX_class"
_nexus_units = "units"
_nexus_name = "name"
_nexus_path = "path"
_nexus_values = "values"
_nexus_dataset = "dataset"
_nexus_config = "config"
_nexus_group = "group"
_nexus_children = "children"
_nexus_link = "link"
_nexus_stream = "stream"

_filewriter_to_supported_numpy_dtype = {
    "float32": np.float32,
    "float64": np.float64,
    "float": np.float32,
    "double": np.float64,
    "int8": np.int32,
    "int16": np.int32,
    "int32": np.int32,
    "int64": np.int64,
    "uint8": np.int32,
    "uint16": np.int32,
    "uint32": np.int32,
    "uint64": np.int64,
    "string": np.str_,
}

numpy_to_filewriter_type = {
    np.float32: "float32",
    np.float64: "float64",
    np.int8: "int8",
    np.int16: "int16",
    np.int32: "int32",
    np.int64: "int64",
    np.uint8: "uint8",
    np.uint16: "uint16",
    np.uint32: "uint32",
    np.uint64: "uint64",
    np.str_: "string",
    np.object_: "string",
}


[docs] def json_nexus_group( json_dict: dict[str, Any], *, definitions: dict[str, type] | None = None ) -> snx.Group: """Parse a JSON dictionary into a NeXus group. Parameters ---------- json_dict: ``dict`` containing a NeXus structure as JSON. definitions: ScippNexus application definitions. When not given, the default definitions are used. Returns ------- : A NeXus group that can be used for loading data as if it were loaded from a file with :class:`scippnexus.File`. """ return snx.Group( JSONGroup(json_dict), definitions=definitions if definitions is not None else snx.base_definitions(), )
[docs] class MissingAttribute(Exception): pass
[docs] def make_json_attr(name: str, value) -> dict: if isinstance(value, str | bytes): attr_info = {"string_size": len(value), "type": "string"} elif isinstance(value, float): attr_info = {"size": 1, "type": "float64"} elif isinstance(value, int): attr_info = {"size": 1, "type": "int64"} elif isinstance(value, list): attr_info = {"size": len(value), "type": "string"} else: attr_info = { "size": value.shape, "type": numpy_to_filewriter_type[value.dtype.type], } name_and_value = {"name": name, "values": value} return {**attr_info, **name_and_value}
[docs] def make_json_dataset(name: str, data) -> dict: if isinstance(data, str | bytes): dataset_info = {"string_size": len(data), "type": "string"} elif isinstance(data, float): dataset_info = {"size": 1, "type": "float64"} elif isinstance(data, int): dataset_info = {"size": 1, "type": "int32"} else: dataset_info = { "size": data.shape, "type": numpy_to_filewriter_type[data.dtype.type], } return { 'module': _nexus_dataset, _nexus_config: { **dataset_info, _nexus_name: name, _nexus_values: data, }, "attributes": [], }
def _get_attribute_value( element: dict, attribute_name: str ) -> str | float | int | list: """ attributes can be a dictionary of key-value pairs, or an array of dictionaries with key, value, type, etc """ try: attributes = element["attributes"] try: return attributes[attribute_name] except TypeError: for attribute in attributes: if attribute[_nexus_name] == attribute_name: return attribute[_nexus_values] except KeyError: pass raise MissingAttribute def _visitnodes(root: dict): for child in root.get(_nexus_children, ()): yield child yield from _visitnodes(child) def _name(node: dict): if _nexus_name in node: return node[_nexus_name] if _nexus_config in node: return node[_nexus_config][_nexus_name] return '' def _is_group(node: dict): return _nexus_children in node def _is_dataset(node: dict): return node.get('module') == _nexus_dataset def _is_link(node: dict): return node.get('module') == _nexus_link def _is_stream(node: dict): return 'module' in node and not (_is_dataset(node) or _is_link(node))
[docs] def contains_stream(group: JSONGroup) -> bool: """Return True if the group contains a stream object""" return ( isinstance(group, JSONGroup) and _nexus_children in group._node and any(map(_is_stream, group._node[_nexus_children])) )
[docs] class JSONTypeStringID: def get_cset(self): import h5py return h5py.h5t.CSET_UTF8
[docs] class JSONAttrID:
[docs] def __init__(self): pass
def get_type(self): return JSONTypeStringID()
[docs] class JSONAttributeManager(Mapping[str, Any]):
[docs] def __init__(self, node: dict): self._node = node
def __contains__(self, name): try: self[name] except MissingAttribute: return False return True def __getitem__(self, name): return _get_attribute_value(self._node, name) def __setitem__(self, name, value): if name in self: raise NotImplementedError("Replacing existing item not implemented yet.") attr = make_json_attr(name, value) self._node['attributes'].append(attr) return self[name] def __iter__(self): if (attrs := self._node.get('attributes')) is not None: if isinstance(attrs, dict): yield from attrs else: for item in attrs: yield item[_nexus_name] def __len__(self): return sum(1 for _ in self)
[docs] def get(self, name: str, default=None): return self[name] if name in self else default
def get_id(self, name) -> JSONAttrID: # TODO This is a hack that only works since this is used only for a single # purpose by scippnexus.NXobject return JSONAttrID()
[docs] class JSONNode:
[docs] def __init__(self, node: dict, *, parent=None): self._file = parent.file if parent is not None else self self._parent = self if parent is None else parent self._node = node name = _name(self._node) if parent is None or parent.name == '/': self._name = f'/{name}' else: self._name = f'{parent.name}/{name}'
@property def attrs(self) -> JSONAttributeManager: return JSONAttributeManager(self._node) @property def name(self) -> str: return self._name @property def file(self): return self._file @property def parent(self): return self._parent
[docs] class JSONDataset(JSONNode): @property def dtype(self) -> str: try: dtype = self._node[_nexus_config]["type"] except KeyError: if "dtype" not in self._node[_nexus_config] and isinstance( self._node[_nexus_config][_nexus_values], str ): dtype = 'string' else: dtype = self._node[_nexus_config]["dtype"] if dtype == 'string': return np.dtype(str) return np.dtype(dtype) @property def ndim(self) -> int: return len(self.shape) @property def shape(self): return np.asarray(self._node[_nexus_config][_nexus_values]).shape def __getitem__(self, index): return np.asarray(self._node[_nexus_config][_nexus_values])[index] def read_direct(self, buf, source_sel): buf[...] = self[source_sel] def asstr(self, **ignored): return self
[docs] class JSONGroup(JSONNode): def __contains__(self, name: str) -> bool: try: self[name] return True except KeyError: return False def keys(self) -> list[str]: if contains_stream(self): return [] children = self._node[_nexus_children] return [_name(child) for child in children if not contains_stream(child)] def items(self) -> list[tuple[str, JSONNode]]: return [(key, self[key]) for key in self.keys()] def _as_group_or_dataset(self, item, parent): if _is_group(item): return JSONGroup(item, parent=parent) return JSONDataset(item, parent=parent) def __getitem__(self, name: str) -> JSONDataset | JSONGroup: if name.startswith('/') and name.count('/') == 1: parent = self.file elif '/' in name: parent = self['/'.join(name.split('/')[:-1])] else: parent = self for child in parent._node[_nexus_children]: if _name(child) != name.split('/')[-1]: continue if _is_link(child): return self[child[_nexus_config]["target"]] if _is_group(child) or _is_dataset(child): return self._as_group_or_dataset(child, parent) raise KeyError(f"Unable to open object (object '{name}' doesn't exist)") def __iter__(self): yield from self.keys() def visititems(self, callable): def skip(node): return _is_link(node) or contains_stream(self) children = [ _name(child) for child in self._node[_nexus_children] if not skip(child) ] for key in children: item = self[key] callable(key, item) if isinstance(item, JSONGroup): item.visititems(callable) def create_dataset(self, name: str, data) -> JSONDataset: if name in self: raise NotImplementedError("Replacing existing item not implemented yet.") dataset = make_json_dataset(name, data) self._node[_nexus_children].append(dataset) return self[name] def create_group(self, name: str) -> JSONGroup: if name in self: raise NotImplementedError("Replacing existing item not implemented yet.") group = {"type": "group", "name": name, "children": [], "attributes": []} self._node[_nexus_children].append(group) return self[name]