Source code for data_loaders

from __future__ import annotations
import io
import numpy as np
import ast
import os
import pickle
import re
import h5py

from igor2 import binarywave, packed
import zipfile
from argparse import Namespace
from errno import ENOENT
from warnings import catch_warnings, simplefilter

from datetime import datetime
from typing import Union, Any, List, Dict
from pydantic import BaseModel, Field
from abc import ABC, abstractmethod


[docs] class Dataset(BaseModel): """ The generic class inheriting after :class:`~pydantic.BaseModel` and defining structure of the dataset in :mod:`piva`. Data files read by corresponding **Dataloader** classes are returned in this format. Creates a generic `dataset` object inheriting from :class:`~pydantic.BaseModel` that will be filled with all required data and metadata, depending on what is accessible from files generated by different instruments, and left as :class:`None` otherwise. Asterix indicates attributes that are mandatory for functioning of the **DataViewers**. Object contains: =============== =================== ===================================== **attribute** **type** **description** **data** * :class:`np.ndarray` Acquired data set, always 3D matrix (``len(data.shape) = 3``). Oriented as: `dim(0)`-scanned axis, `dim(1)`- analyzer axis, `dim(2)`-energy axis. When scan type is a single cut (resulting data are 2D), first dimension is equal (``data[0, :, :] = np.array([0])``) **xscale** * :class:`np.ndarray` Axis along the scanned direction, units depend on the scan type. When scan type is a single cut (2D), it is set to ``np.array([1])`` **yscale** * :class:`np.ndarray` Axis along the analyzer slit, most likely in [deg] **zscale** * :class:`np.ndarray` Axis along the energy direction, most likely in [eV] ekin :class:`np.ndarray` Energy axis in kinetic energy | :class:`None` scale (if default scale is in binding energy) kxscale :class:`np.ndarray` Momentum axis (saved after | :class:`None` conversion) along the scanned direction kyscale :class:`np.ndarray` Momentum axis (saved after | :class:`None` conversion) along the analyzer direction x :class:`float` | `x` position of the manipulator :class:`None` y :class:`float` | `y` position of the manipulator :class:`None` z :class:`float` | `z` position of the manipulator :class:`None` theta :class:`float` | `theta` angle of the manipulator; :class:`None` often referred as `polar` phi :class:`float` | `phi` angle of the manipulator; :class:`None` often referred as `azimuth` tilt :class:`float` | `tilt` angle of the manipulator :class:`None` temp :class:`float` | Temperature during the experiment :class:`None` pressure :class:`float` | Pressure during the experiment :class:`None` hv :class:`float` | Photon energy used during the :class:`None` experiment wf :class:`float` | Work function of the analyzer :class:`None` Ef :class:`float` | Correction for the Fermi level :class:`None` polarization :class:`str` | Photon polarization :class:`None` PE :class:`int` | Pass energy of the analyzer :class:`None` exit_slit :class:`float` | Exit (vertical) slit of the :class:`None` beamline; responsible for energy resolution FE :class:`float` | Front end of the beamline :class:`None` scan_type :class:`str` | Type of the measurement (e.g. :class:`None` `cut`, `tilt scan`, `hv scan`) scan_dim :class:`list` | If scan other than `cut`, scanned :class:`None` dimensions as list: [`start`, `stop`, `step`] acq_mode :class:`str` | Data acquisition mode :class:`None` lens_mode :class:`str` | Lens mode of the analyzer :class:`None` ana_slit :class:`str` | Slit opening of the analyzer :class:`None` defl_angle :class:`float` | Applied deflection angle :class:`None` n_sweeps :class:`int` | Number of sweeps :class:`None` DT :class:`int` | Dwell time :class:`None` data_provenance :class:`dict` Dataset logbook; contains information about original file and keeps track of functions called on the data =============== =================== ===================================== """ data: np.ndarray xscale: np.ndarray yscale: np.ndarray zscale: np.ndarray ekin: np.ndarray | None = None kxscale: np.ndarray | None = None kyscale: np.ndarray | None = None x: float | None = None y: float | None = None z: float | None = None theta: float | None = None phi: float | None = None tilt: float | None = None temp: float | None = None pressure: float | None = None hv: float | None = None wf: float | None = None Ef: float | None = None polarization: str | None = None PE: int | None = None exit_slit: float | None = None FE: float | None = None scan_type: str | None = None scan_dim: List[float] | None = None acq_mode: str | None = None lens_mode: str | None = None ana_slit: str | None = None defl_angle: float | None = None n_sweeps: int | None = None DT: int | None = None date: str | None = None data_provenance: Dict[str, List[Dict]] = Field( default_factory=lambda: { "file": [], "k_space_conv": [], "edited_entries": [], } ) model_config = { "arbitrary_types_allowed": True, "extra": "allow", }
[docs] def add_org_file_entry(self, fname: str, dl: str) -> None: """ Add information about the original data file to data provenance logbook. :param fname: file name :param dl: specific **Dataloader** (inheriting from :class:`Dataloader`) that was used to open the file """ file_entry = {} file_entry["index"] = 0 file_entry["date_time"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S") file_entry["path"] = fname file_entry["type"] = "original" file_entry["index_taken"] = "-" file_entry["binned"] = "-" file_entry["data_loader"] = dl self.data_provenance["file"].append(file_entry)
[docs] class Dataloader(ABC): """ Parent class (interface) from which other **DataLoaders** inherit some methods. Even while using same software, files can differ from beamline to beamline in terms of format, amount of saved metadata and the way they are stored. To take differences into account **DataLoaders** for specific beamlines are implemented separately. """ name = "Base"
[docs] def __init__(self) -> None: self.ds = Dataset.model_construct() self.raster = False self.scan = None
[docs] @abstractmethod def load_data(self, filename: str, metadata: bool) -> Dataset: """ Must be implemented in subclasses. """ pass
[docs] def load_ses_zip( self, filename: str, bl_md: list = None, metadata: bool = False ) -> None: """ Load data from SES (Scienta) **\ *.zip** files. :param filename: absolute path to the file :param bl_md: beamline specific metadata passed as a list of tuples in format (*name* :py:obj:`str`, *label* :py:obj:`str`, *type* :py:obj:`type`), where *name* stands for how data entry is saved in the file, *label* - how the information should be called in the :class:`~pydantic.BaseModel` and *type* - type of the variable (:py:obj:`float`, :py:obj:`str`, `etc.`) :param metadata: if :py:obj:`True`, read only metadata and size of the dataset to display them in :class:`DataBrowser` window. Helps to browse through the files faster, without actually loading entire file. """ # Prepare metadata key-value pairs for the different metadata files # and their expected types ds = self.ds keys1 = [ ("width", "n_energy", int), ("depth", "n_x", int), ("height", "n_y", int), ("first_full", "first_energy", int), ("last_full", "last_energy", int), ("widthoffset", "start_energy", float), ("widthdelta", "step_energy", float), ("depthoffset", "start_x", float), ("depthdelta", "step_x", float), ("heightoffset", "start_y", float), ("heightdelta", "step_y", float), ] # Load the zipfile with zipfile.ZipFile(filename, "r") as z: # Get the created filename from the viewer with z.open("viewer.ini") as viewer: for line in viewer.readlines(): ln = line.decode("UTF-8") if ln.startswith("name"): # Make sure to split off unwanted whitespace file_id = ln.split("=")[1].split()[0] # Get most metadata from a metadata file with z.open("Spectrum_" + file_id + ".ini") as md_file1: M = Namespace() self.read_ses_metadata(M, md_file1.readlines(), bl_md=keys1, zip=True) # Get additional metadata from a second metadata file... with z.open(file_id + ".ini") as md_file2: self.read_ses_metadata(ds, md_file2, bl_md=bl_md, zip=True) # Extract the binary data from the zipfile if metadata: data_flat = np.zeros((int(M.n_y) * int(M.n_x) * int(M.n_energy))) else: with z.open("Spectrum_" + file_id + ".bin") as f: data_flat = np.frombuffer(f.read(), dtype="float32") # Put the data back into its actual shape data = np.reshape(data_flat, (int(M.n_x), int(M.n_y), int(M.n_energy))) # Cut off unswept region data = data[:, :, M.first_energy : M.last_energy + 1] # Put into shape (energy, other angle, angle along analyzer) data = np.moveaxis(data, 2, 0) # Create axes xscale = start_step_n(M.start_x, M.step_x, M.n_x) yscale = start_step_n(M.start_y, M.step_y, M.n_y) energies = start_step_n(M.start_energy, M.step_energy, M.n_energy) energies = energies[M.first_energy : M.last_energy + 1] if yscale.size > 1: data = np.swapaxes(np.swapaxes(data, 0, 1), 1, 2) else: data = np.swapaxes(data, 0, 1) # set data and axes ds.data = data ds.xscale = xscale ds.yscale = yscale ds.zscale = energies ds.ekin = energies if ds.xscale.size == 1: ds.scan_type = "cut" else: ds.scan_dim = [ ds.xscale[0], ds.xscale[-1], np.abs(ds.xscale[0] - ds.xscale[1]), ]
[docs] def load_ses_ibw( self, filename: str, bl_md: list = None, metadata: bool = False ) -> None: """ Load data from SES (Scianta) IGOR binary wave (**\ *.ibw**) files. :param filename: absolute path to the file :param bl_md: beamline specific metadata. Not used here, but required to mach format of other **Dataloaders**. See :meth:`load_ses_zip` for more info. :param metadata: if :py:obj:`True`, read only metadata and size of the dataset. See :meth:`load_ses_zip` for more info. """ ds = self.ds wave = binarywave.load(filename)["wave"] self._read_ibw_(ds, wave, bl_md=bl_md, metadata=metadata)
[docs] def load_ses_pxt( self, filename: str, bl_md: list = None, metadata: bool = False ) -> None: """ Load data from SES (Scianta) IGOR packed experiment (**\ *.pxt**) files. :param filename: absolute path to the file :param bl_md: beamline specific metadata. Not used here, but required to mach format of other **Dataloaders**. See :meth:`load_ses_zip` for more info. :param metadata: if :py:obj:`True`, read only metadata and size of the dataset. See :meth:`load_ses_zip` for more info. """ ds = self.ds wave = packed.load(filename)[0][0].wave["wave"] self._read_ibw_(ds, wave, bl_md=bl_md)
def _read_ibw_( self, ds: Dataset, wave: Any, bl_md: list = None, metadata: bool = False ) -> None: """ Read igor binary wave file. :param ds: :class:`Dataset` object to fill up with values. :param wave: loaded ibw wave. :param bl_md: beamline specific metadata. Not used here, but required to mach format of other **Dataloaders**. See :meth:`load_ses_zip` for more info. :param metadata: if :py:obj:`True`, read only metadata and size of the dataset. See :meth:`load_ses_zip` for more info. :return: """ # The `header` contains some metadata header = wave["wave_header"] # load raster scan in data are 4D # if not ((header['nDim'][2] == 0) and (header['nDim'][3] == 0)): if header["nDim"][3] != 0: self.raster = True self.scan = self.load_raster_scan(wave, bl_md=bl_md, metadata=metadata) nDim = header["nDim"] steps = header["sfA"] starts = header["sfB"] # Construct the x and y scales from start, stop and n if nDim[2] == 0: nDim[2] += 1 data = np.swapaxes(np.array([wave["wData"]]), 1, 2) else: data = np.swapaxes(np.array([wave["wData"]])[0, :, :, :], 0, 2) xscale = start_step_n(starts[2], steps[2], nDim[2]) yscale = start_step_n(starts[1], steps[1], nDim[1]) zscale = start_step_n(starts[0], steps[0], nDim[0]) # set data and axes ds.data = data ds.xscale = xscale ds.yscale = yscale ds.zscale = zscale # Convert bytestring of ASCII characters `note` containing some # metadata, to a list of strings meta = wave["note"].decode("ASCII").split("\r") self.read_ses_metadata(ds, meta, bl_md=bl_md) if ds.xscale.size == 1: ds.scan_type = "cut" else: ds.scan_dim = [ ds.xscale[0], ds.xscale[-1], np.abs(ds.xscale[0] - ds.xscale[1]), ]
[docs] @staticmethod def read_ses_metadata( ns: Union[Dataset, Namespace], meta: list, bl_md: list = None, zip: bool = False ) -> None: """ Load metadata from SES file/notes/comments for the analyzer settings and some beamline specific, if provided. :param ns: object to fill up with values. :param meta: list of strings, usually lines read from loaded data, where the metadata can be found. :param bl_md: beamline specific metadata. See :meth:`load_ses_zip` for more info. :param zip: lines in ``meta`` might require slightly different decoding. If :py:obj:`True`, apply the one used in **zip** files. """ # standard SES metadata meta_keys = [ ("Excitation Energy", "hv", float), ("Acquisition Mode", "acq_mode", str), ("Pass Energy", "PE", int), ("Lens Mode", "lens_mode", str), ("ThetaY", "defl_angle", float), ("Number of Sweeps", "n_sweeps", int), ("Step Time", "DT", int), ] # append beamline specific metadata, if there are any if bl_md is not None: for bl_md_i in bl_md: meta_keys.append(bl_md_i) # set metadata for line in meta: # Split at 'equals' sign if zip: tokens = line.decode("utf-8").split("=") else: tokens = line.split("=") for key, name, dtype in meta_keys: if tokens[0] == key: # Split off whitespace or garbage at the end value = tokens[1].split()[0] # And cast to right type value = dtype(value) ns.__setattr__(name, value) elif tokens[0] == "Mode": if ( tokens[1].split()[0] == "ARPES" and tokens[1].split()[1] == "Mapping" ): ns.__setattr__("scan_type", "DA scan") elif tokens[0] == "Thetay_Low": ns.__setattr__("scan_start", float(tokens[1].split()[0])) elif tokens[0] == "Thetay_High": ns.__setattr__("scan_stop", float(tokens[1].split()[0])) elif tokens[0] == "Thetay_StepSize": ns.__setattr__("scan_step", float(tokens[1].split()[0]))
[docs] @staticmethod def load_raster_scan( wave: Any, bl_md: list = None, metadata: bool = False ) -> np.ndarray: """ Load data from `xy` manipulator raster scan. Each energy-momentum map is saved as a separate :class:`Dataset` object. :param wave: loaded ibw wave. :param bl_md: beamline specific metadata. Not used here, but required to mach format of other **Dataloaders**. See :meth:`load_ses_zip` for more info. :param metadata: if :py:obj:`True`, read only metadata and size of the dataset. See :meth:`load_ses_zip` for more info. :return: array with loaded :class:`Dataset` objects """ # The `header` contains some metadata header = wave["wave_header"] nDim = header["nDim"] steps = header["sfA"] starts = header["sfB"] scan = np.empty((nDim[2], nDim[3]), dtype=object) xscale = np.array([0]) yscale = start_step_n(starts[1], steps[1], nDim[1]) zscale = start_step_n(starts[0], steps[0], nDim[0]) x_axis = start_step_n(starts[2], steps[2], nDim[2]) y_axis = start_step_n(starts[3], steps[3], nDim[3]) for xi in range(nDim[2]): for yi in range(nDim[3]): scan[xi, yi] = Dataset() tmp_dl = DataloaderBloch() meta = wave["note"].decode("ASCII").split("\r") tmp_dl.read_ses_metadata(scan[xi, yi], meta, bl_md) scan[xi, yi].x = x_axis[xi] scan[xi, yi].y = y_axis[yi] scan[xi, yi].data = np.zeros((1, nDim[1], nDim[0])) if not metadata: scan[xi, yi].data[0, :, :] = wave["wData"][:, :, xi, yi].T scan[xi, yi].xscale = xscale scan[xi, yi].yscale = yscale scan[xi, yi].zscale = zscale scan[xi, yi].scan_type = "raster scan" return scan
[docs] def validate_at_return(self, filename: str): """ Validate that the Dataset was correctly populated with data and add original file information to the data provenance record. :param filename: absolute path to the file :return: loaded dataset with available metadata """ if self.raster: for xi in range(self.scan.shape[0]): for yi in range(self.scan.shape[1]): dsi = self.scan[xi, yi] dsi.add_org_file_entry(filename, self.name) dsi.validate_assignment = True self.scan[xi, yi] = Dataset.model_validate(dsi.model_dump()) return self.scan else: self.ds.add_org_file_entry(filename, self.name) self.ds.validate_assignment = True return Dataset.model_validate(self.ds.model_dump())
[docs] class DataloaderPickle(Dataloader): """ Dataloader for opening files saved with :mod:`piva`. Files are in binary format saved using :mod:`pickle` module, and contain raw :class:`Dataset` object. """ name = "Pickle"
[docs] def __init__(self): super(DataloaderPickle, self).__init__()
[docs] def load_data(self, filename: str, metadata: bool = False) -> Dataset: """ Load :mod:`pickle` file and bring it into correct format. :param filename: absolute path to the file :param metadata: if :py:obj:`True`, read only metadata and size of the dataset. Not used here, but required to mach format of other **Dataloaders**. See :meth:`load_ses_zip` for more info. :return: loaded dataset with available metadata """ # Open the file and get a handle for it if filename.endswith(".p"): with open(filename, "rb") as f: filedata = pickle.load(f) else: raise NotImplementedError if isinstance(filedata, np.ndarray): self.raster = True self.scan = filedata # synchronize all attributes in case file was saved using older version for attr in Dataset.model_fields: if hasattr(filedata, attr): try: setattr(self.ds, attr, getattr(filedata, attr)) except Exception as e: raise e return self.validate_at_return(filename)
[docs] class DataloaderSIS(Dataloader): """ Dataloader for opening files from SIS beamline at SLS (Swiss Light Source, Switzerland). """ name = "SIS"
[docs] def __init__(self): super(DataloaderSIS, self).__init__()
[docs] def load_data(self, filename: str, metadata: bool = False) -> Dataset: """ Recognize correct format and load data from the file. :param filename: absolute path to the file :param metadata: if :py:obj:`True`, read only metadata and size of the dataset. Not used here, but required to mach format of other **Dataloaders**. See :meth:`load_ses_zip` for more info. :return: loaded dataset with available metadata """ if filename.endswith("h5"): self.load_h5(filename, metadata=metadata) elif filename.endswith("zip"): self.load_ses_zip(filename, metadata=metadata) elif filename.endswith("ibw"): self.load_ses_ibw(filename, metadata=metadata) elif filename.endswith("pxt"): self.load_ses_pxt(filename, metadata=metadata) else: raise NotImplementedError return self.validate_at_return(filename)
[docs] def load_h5(self, filename: str, metadata: bool = False) -> None: """ Load HDF file and all available metadata. :param filename: absolute path to the file :param metadata: if :py:obj:`True`, read only metadata and size of the dataset. Not used here, but required to mach format of other **Dataloaders**. See :meth:`load_ses_zip` for more info. """ # Load the hdf5 file # Use 'rdcc_nbytes' flag for setting up the chunk cache (in bytes) datfile = h5py.File(filename, "r") # Extract the actual dataset and some metadata h5_data = datfile["Electron Analyzer/Image Data"] attributes = h5_data.attrs # Convert to array and make 3 dimensional if necessary shape = h5_data.shape if metadata: data = np.zeros(shape) else: if len(shape) == 3: data = np.zeros(shape) for i in range(shape[2]): data[:, :, i] = h5_data[:, :, i] else: data = np.array(h5_data) data = data.T if len(shape) == 2: x = 1 y = shape[1] N_E = shape[0] # Make data 3D data = data.reshape((1, y, N_E)) # Extract the limits xlims = [1, 1] ylims = attributes["Axis1.Scale"] elims = attributes["Axis0.Scale"] xscale = start_step_n(*xlims, x) yscale = start_step_n(*ylims, y) energies = start_step_n(*elims, N_E) elif len(shape) == 3: x = shape[1] y = shape[2] N_E = shape[0] # Extract the limits xlims = attributes["Axis2.Scale"] ylims = attributes["Axis1.Scale"] elims = attributes["Axis0.Scale"] xscale = start_step_n(*xlims, y) yscale = start_step_n(*ylims, x) energies = start_step_n(*elims, N_E) # Case sequence of cuts else: x = shape[0] y = shape[1] N_E = y data = np.rollaxis(data, 2, 0) # Extract the limits xlims = attributes["Axis1.Scale"] ylims = attributes["Axis0.Scale"] elims = ylims xscale = start_step_n(*xlims, y) yscale = start_step_n(*ylims, x) energies = start_step_n(*elims, N_E) # Extract some data for ang2k conversion metadata = datfile["Other Instruments"] x_pos = metadata["X"][0] y_pos = metadata["Y"][0] z_pos = metadata["Z"][0] theta = metadata["Theta"][0] phi = metadata["Phi"][0] tilt = metadata["Tilt"][0] # account for differences from before the beamline upgrade try: temp = metadata["Temperature B (Sample 1)"][0] pressure = metadata["Pressure AC (ACMI)"][0] except KeyError: temp = metadata["Temperature B"][0] pressure = metadata["Pressure AC1"][0] hv = attributes["Excitation Energy (eV)"] wf = attributes["Work Function (eV)"] polarization = metadata["hv"].attrs["Mode"][10:] PE = attributes["Pass Energy (eV)"] exit_slit = metadata["Exit Slit"][0] FE = metadata["FE Horiz. Width"][0] ekin = energies + hv - wf lens_mode = attributes["Lens Mode"] acq_mode = attributes["Acquisition Mode"] n_sweeps = attributes["Sweeps on Last Image"] DT = attributes["Dwell Time (ms)"] if "Axis2.Scale" in attributes: scan_type = attributes["Axis2.Description"] + " scan" start = attributes["Axis2.Scale"][0] step = attributes["Axis2.Scale"][1] stop = ( attributes["Axis2.Scale"][0] + attributes["Axis2.Scale"][1] * xscale.size ) scan_dim = [start, stop, step] else: scan_type = "cut" scan_dim = [] self.ds.data = data self.ds.xscale = xscale self.ds.yscale = yscale self.ds.zscale = energies self.ds.ekin = ekin self.ds.x = x_pos self.ds.y = y_pos self.ds.z = z_pos self.ds.theta = theta self.ds.phi = phi self.ds.tilt = tilt self.ds.temp = temp self.ds.pressure = pressure self.ds.hv = hv self.ds.wf = wf self.ds.polarization = polarization self.ds.PE = PE self.ds.exit_slit = exit_slit self.ds.FE = FE self.ds.scan_type = scan_type self.ds.scan_dim = scan_dim self.ds.lens_mode = lens_mode self.ds.acq_mode = acq_mode self.ds.n_sweeps = n_sweeps self.ds.DT = DT h5py.File.close(datfile)
[docs] class DataloaderADRESS(Dataloader): """ Dataloader for opening files from Address beamline at SLS (Swiss Light Source, Switzerland). """ name = "ADRESS"
[docs] def __init__(self): super(DataloaderADRESS, self).__init__()
[docs] def load_data(self, filename: str, metadata: bool = False) -> Dataset: """ Recognize correct format and load data from the file. :param filename: absolute path to the file :param metadata: if :py:obj:`True`, read only metadata and size of the dataset. Not used here, but required to mach format of other **Dataloaders**. See :meth:`load_ses_zip` for more info. :return: loaded dataset with available metadata """ if filename.endswith("h5"): self.load_h5(filename, metadata=metadata) else: raise NotImplementedError return self.validate_at_return(filename)
[docs] def load_h5(self, filename: str, metadata: bool = False) -> None: """ Load HDF file and all available metadata. :param filename: absolute path to the file :param metadata: if :py:obj:`True`, read only metadata and size of the dataset. Not used here, but required to mach format of other **Dataloaders**. See :meth:`load_ses_zip` for more info. """ h5file = h5py.File(filename, "r") # The actual data is in the field: 'Matrix' matrix = h5file["Matrix"] # The scales can be extracted from the matrix' attributes scalings = matrix.attrs["IGORWaveScaling"] info = matrix.attrs["IGORWaveNote"] # Convert `units` and `info`, which is a bytestring of ASCII # characters, to lists of strings Put the data into a numpy array and # convert to float if metadata: data = np.zeros(matrix.shape) else: data = np.array(matrix, dtype=float) shape = data.shape if len(shape) == 3: # Case map or hv scan (or...?) data = np.rollaxis(data.T, 2, 1) shape = data.shape # Shape has changed xstep, xstart = scalings[3] ystep, ystart = scalings[1] zstep, zstart = scalings[2] xscale = start_step_n(xstart, xstep, shape[0]) yscale = start_step_n(ystart, ystep, shape[1]) zscale = start_step_n(zstart, zstep, shape[2]) else: # Case cut # Make data 3-dimensional by adding an empty dimension data = data.reshape(1, shape[0], shape[1]) # Shape has changed shape = data.shape ystep, ystart = scalings[1] zstep, zstart = scalings[2] xscale = np.array([1]) yscale = start_step_n(ystart, ystep, shape[1]) zscale = start_step_n(zstart, zstep, shape[2]) self.ds.data = data self.ds.xscale = xscale self.ds.yscale = yscale self.ds.zscale = zscale # more metadata metadata_list = info.decode("ASCII").split("\n") keys1 = [ ("hv", "hv", float), ("Pol", "polarization", str), ("Slit ", "exit_slit", float), ("Mode", "lens_mode", str), ("Epass", "PE", int), ("X ", "x", float), ("Y ", "y", float), ("Z ", "z", float), ("Theta", "theta", float), ("Azimuth", "phi", float), ("Tilt", "tilt", float), ("ADef", "defl_angle", float), ("Temp", "temp", float), ("dt", "DT", int), ] self.read_metadata(keys1, metadata_list) if xscale.size == 1: self.ds.__setattr__("scan_type", "cut") h5py.File.close(h5file)
[docs] def read_metadata(self, keys: list, metadata_list: list) -> None: """ Read metadata from HDF file in a similar fashion as in :meth:`Dataloader.read_ses_metadata` (see for more details). :param keys: keys to metadata passed as a list of tuples in format (*name* :py:obj:`str`, *label* :py:obj:`str`, *type* :py:obj:`type`), where *name* stands for how data entry is saved in the file, *label* - how the information should be called in the :class:`BaseModel` and *type* - type of the variable (:py:obj:`float`, :py:obj:`str`, `etc.`) :param metadata_list: list of strings, usually lines read from loaded data, where the metadata can be found. """ # List of interesting keys and associated variable names for line in metadata_list: # Split at 'equals' sign tokens = line.split("=") for key, name, dtype in keys: if key in tokens[0]: if "Tilt" in tokens[0] and ":" in tokens[1]: self.ds.__setattr__("scan_type", "Tilt scan") start, step, stop = tokens[1].split(":") self.ds.__setattr__("scan_dim", [start, stop, step]) self.ds.__setattr__("tilt", start) elif "hv" in tokens[0] and ":" in tokens[1]: self.ds.__setattr__("scan_type", "hv scan") start, step, stop = tokens[1].split(":") self.ds.__setattr__("scan_dim", [start, stop, step]) self.ds.__setattr__("hv", start) elif "ADef" in tokens[0] and ":" in tokens[1]: self.ds.__setattr__("scan_type", "DA scan") start, step, stop = tokens[1].split(":") self.ds.__setattr__("scan_dim", [start, stop, step]) self.ds.__setattr__("defl_angle", None) elif "Slit" in tokens[0] and tokens[0][0] == "A": value = tokens[1].split()[0][:-2] self.ds.__setattr__("ana_slit", value) # Split off whitespace or garbage at the end else: value = tokens[1].split()[0] if dtype is float: self.ds.__setattr__(name, float(value)) else: self.ds.__setattr__(name, value)
[docs] class DataloaderBloch(Dataloader): """ Dataloader for opening files from Bloch beamline at MAX-IV (Sweden). """ name = "Bloch"
[docs] def __init__(self): super(DataloaderBloch, self).__init__()
[docs] def load_data(self, filename: str, metadata: bool = False) -> Dataset: """ Recognize correct format and load data from the file. :param filename: absolute path to the file :param metadata: if :py:obj:`True`, read only metadata and size of the dataset. Not used here, but required to mach format of other **Dataloaders**.See :meth:`load_ses_zip` for more info. :return: loaded dataset with available metadata """ bl_md = [ ("A", "phi", float), ("P", "theta", float), ("T", "tilt", float), ("X", "x", float), ("Y", "y", float), ("Z", "z", float), ] if filename.endswith("zip"): self.load_ses_zip(filename, bl_md=bl_md, metadata=metadata) elif filename.endswith("ibw"): self.load_ses_ibw(filename, bl_md=bl_md, metadata=metadata) elif filename.endswith("pxt"): self.load_ses_pxt(filename, bl_md=bl_md, metadata=metadata) else: raise NotImplementedError return self.validate_at_return(filename)
[docs] class DataloaderI05(Dataloader): """ Dataloader for opening files from I05 beamline at Diamond Light Source (UK). """ name = "I05"
[docs] def __init__(self): super(DataloaderI05, self).__init__()
[docs] def load_data(self, filename: str, metadata: bool = False) -> Dataset: """ Recognize correct format and load data from the file. :param filename: absolute path to the file :param metadata: if :py:obj:`True`, read only metadata and size of the dataset. Not used here, but required to mach format of other **Dataloaders**.See :meth:`load_ses_zip` for more info. :return: loaded dataset with available metadata """ if filename.endswith("nxs"): self.load_nxs(filename, metadata=metadata) else: raise NotImplementedError return self.validate_at_return(filename)
[docs] def load_nxs(self, filename: str, metadata: bool) -> None: """ Load nexus file and all available metadata. :param filename: absolute path to the file :param metadata: if :py:obj:`True`, read only metadata and size of the dataset. Not used here, but required to mach format of other **Dataloaders**. See :meth:`load_ses_zip` for more info. """ # Read file with h5py reader infile = h5py.File(filename, "r") if metadata: data = np.zeros(infile["/entry1/analyser/data"].shape) else: data = np.array(infile["/entry1/analyser/data"]) angles = np.array(infile["/entry1/analyser/angles"]) energies = np.array(infile["/entry1/analyser/energies"]) if len(energies.shape) == 2: zscale = energies[0] else: zscale = energies yscale = angles # Check if we have a scan if data.shape[0] == 1: xscale = np.array([0]) else: # Otherwise, extract third dimension from scan command command = infile["entry1/scan_command"][()] # Special case for 'pathgroup' if command.split()[1] == "pathgroup": self.print_m("is pathgroup") # Extract points from a ([polar, x, y], [polar, x, y], ...) # tuple points = command.split("(")[-1].split(")")[0] tuples = points.split("[")[1:] xscale = [] for t in tuples: point = t.split(",")[0] xscale.append(float(point)) xscale = np.array(xscale) # Special case for 'scangroup' elif command.split()[1] == "scan_group": self.print_m("is scan_group") # Extract points from a ([polar, x, y], [polar, x, y], ...) # tuple points = command.split("((")[-1].split("))")[0] points = "((" + points + "))" xscale = np.array(ast.literal_eval(points))[:, 0] # Now, if this was a scan with varying centre_energy, the # zscale contains a list of energies... for now, just take # the first one zscale = zscale[0] # "Normal" case else: start_stop_step = command.split()[2:5] start, stop, step = [float(s) for s in start_stop_step] xscale = np.arange(start, stop + 0.5 * step, step) if not (xscale.size == data.shape[0]): x_done = data.shape[0] data_tmp = np.zeros((xscale.size, data.shape[1], data.shape[2])) data_tmp[:x_done, :, :] = data data = data_tmp xscale = xscale[: data.shape[0]] # read metadata x = float(infile["entry1/instrument/manipulator/sax"][0]) y = float(infile["entry1/instrument/manipulator/say"][0]) z = float(infile["entry1/instrument/manipulator/saz"][0]) theta = float(infile["entry1/instrument/manipulator/sapolar"][0]) phi = float(infile["entry1/instrument/manipulator/saazimuth"][0]) tilt = float(infile["entry1/instrument/manipulator/satilt"][0]) PE = int(infile["entry1/instrument/analyser/pass_energy"][0]) n_sweeps = int(infile["entry1/instrument/analyser/number_of_iterations"][0]) lens_mode = str(infile["entry1/instrument/analyser/lens_mode"][0])[2:-1] acq_mode = str(infile["entry1/instrument/analyser/acquisition_mode"][0])[2:-1] DT = int(infile["entry1/instrument/analyser/time_for_frames"][0] * 1000) defl_ang = float(infile["entry1/instrument/analyser/deflector_x"][0]) hv = float(infile["entry1/instrument/monochromator/energy"][0]) exit_slit = float( infile["entry1/instrument/monochromator/exit_slit_size"][0] * 1000 ) FE = round( infile["entry1/instrument/monochromator/s2_horizontal_slit_size"][0], 2 ) polarization = str( infile["entry1/instrument/insertion_device/beam/final_polarisation_label"][ 0 ] )[2:-1] temp = float(infile["entry1/sample/temperature"][0]) pressure = float(infile["entry1/sample/lc_pressure"][0]) # get scan info if infile["entry1/scan_dimensions"][0] == 1: scan_type = "cut" scan_dim = None else: tmp = str(np.string_(infile["entry1/scan_command"]))[2:-1].split() start, stop, step = float(tmp[2]), float(tmp[3]), float(tmp[4]) scan_dim = [start, stop, step] if "deflector" in tmp[1]: scan_type = "DA" elif "polar" in tmp[1]: scan_type = "theta" elif "energy" in tmp[1]: scan_type = "hv" scan_type += " scan" self.ds.data = data self.ds.xscale = xscale self.ds.yscale = yscale self.ds.zscale = zscale self.ds.ekin = None self.ds.kxscale = None self.ds.kyscale = None self.ds.x = x self.ds.y = y self.ds.z = z self.ds.theta = theta self.ds.phi = phi self.ds.tilt = tilt self.ds.temp = temp self.ds.pressure = pressure self.ds.hv = hv self.ds.wf = None self.ds.Ef = None self.ds.polarization = polarization self.ds.PE = PE self.ds.exit_slit = exit_slit self.ds.FE = FE self.ds.scan_type = scan_type self.ds.scan_dim = scan_dim self.ds.acq_mode = acq_mode self.ds.lens_mode = lens_mode self.ds.ana_slit = None self.ds.defl_angle = defl_ang self.ds.n_sweeps = n_sweeps self.ds.DT = DT h5py.File.close(infile)
[docs] class DataloaderMERLIN(Dataloader): """ Dataloader for opening files from Merlin beamline at ALS (Advanced Light Source, Berkeley, CA). """ name = "Merlin"
[docs] def __init__(self): super(DataloaderMERLIN, self).__init__() self.datfile = None
[docs] def load_data(self, filename: str, metadata: bool = False) -> Dataset: """ Recognize correct format and load data from the file. :param filename: absolute path to the file :param metadata: if :py:obj:`True`, read only metadata and size of the dataset. Not used here, but required to mach format of other **Dataloaders**.See :meth:`load_ses_zip` for more info. :return: loaded dataset with available metadata """ if filename.endswith("h5"): self.load_h5(filename, metadata=metadata) elif filename.endswith("ibw"): self.load_ses_ibw(filename, metadata=metadata) elif filename.endswith("pxt"): self.load_ses_pxt(self, filename, metadata=metadata) else: raise NotImplementedError return self.validate_at_return(filename)
[docs] def load_h5(self, filename: str, metadata: bool = False) -> None: """ Load HDF type file and all available metadata. :param filename: absolute path to the file :param metadata: if :py:obj:`True`, read only metadata and size of the dataset. Not used here, but required to mach format of other **Dataloaders**. See :meth:`load_ses_zip` for more info. """ # Load the hdf5 file # Use 'rdcc_nbytes' flag for setting up the chunk cache (in bytes) self.datfile = h5py.File(filename, "r") if "3Ddata" in self.datfile.keys(): type = "3Ddata" elif "2Ddata" in self.datfile.keys(): type = "2Ddata" else: return # Extract the actual dataset and some metadata h5_data = self.datfile[type + "/Spectrum"] detector = self.datfile[type + "/Detector"].attrs sample = self.datfile[type + "/Sample"].attrs source = self.datfile[type + "/Source"].attrs if type == "2Ddata": data = np.zeros((1, h5_data.shape[0], h5_data.shape[1])) data[0, :, :] = h5_data xscale = np.array([1]) yscale = start_step_n( float(h5_data.attrs["AxisScaling"][1, 1]), float(h5_data.attrs["AxisScaling"][1, 0]), h5_data.shape[0], ) zscale = start_step_n( float(h5_data.attrs["AxisScaling"][0, 1]), float(h5_data.attrs["AxisScaling"][0, 0]), h5_data.shape[1], ) scan_type = "cut" scan_dim = [] elif type == "3Ddata": data = np.zeros(h5_data.shape) if not metadata: for i in range(data.shape[0]): data[i, :, :] = h5_data[i, :, :] data = np.swapaxes(np.swapaxes(data, 0, 2), 1, 2) try: xaxis = [] file = open(filename[:-3] + "_Motor_Pos.txt") for line in file.readlines(): xaxis.append(line.strip("\n")) file.close() scan_type = str(xaxis[0]) xscale = np.array(xaxis[1:], dtype=float) if xscale[0] > xscale[-1]: xscale = np.flip(xscale) data = np.flip(data, axis=0) scan_dim = [xscale[0], xscale[-1], np.abs(xscale[0] - xscale[1])] except FileNotFoundError as e: raise e yscale = start_step_n( float(h5_data.attrs["AxisScaling"][1, 1]), float(h5_data.attrs["AxisScaling"][1, 0]), h5_data.shape[0], ) zscale = start_step_n( float(h5_data.attrs["AxisScaling"][0, 1]), float(h5_data.attrs["AxisScaling"][0, 0]), h5_data.shape[1], ) else: return # Extract some metadata x_pos = float(sample["Sample X"]) y_pos = float(sample["Sample Y"]) z_pos = float(sample["Sample Z"]) theta = float(sample["Polar"]) phi = float(sample["Azimuth"]) tilt = float(sample["Tilt"]) temp = float(sample["Temperature A"]) pressure = float(sample["Pressure"]) hv = float(source["BL Energy"]) wf = 4.44 polarization = ["LH", "LC", "LV", "RC"][int(source["EPU POL"])] PE = int(detector["Pass Energy"]) exit_slit = round(float(source["Exit Slit"]), 2) FE = round(float(source["Entrance Slit"]), 2) lens_mode = detector["Lens Mode"] acq_mode = detector["Acq Mode"] n_sweeps = int(detector["Num of Sweeps"]) DT = float(detector["Step Time"]) self.ds.data = data self.ds.xscale = xscale self.ds.yscale = yscale self.ds.zscale = zscale self.ds.x = x_pos self.ds.y = y_pos self.ds.z = z_pos self.ds.theta = theta self.ds.phi = phi self.ds.tilt = tilt self.ds.temp = temp self.ds.pressure = pressure self.ds.hv = hv self.ds.wf = wf self.ds.polarization = polarization self.ds.PE = PE self.ds.exit_slit = exit_slit self.ds.FE = FE self.ds.scan_type = scan_type self.ds.scan_dim = scan_dim self.ds.lens_mode = lens_mode self.ds.acq_mode = acq_mode self.ds.n_sweeps = n_sweeps self.ds.DT = DT h5py.File.close(self.datfile)
[docs] class DataloaderHERS(Dataloader): """ Dataloader for opening files from Merlin beamline at ALS (Advanced Light Source, Berkeley, CA). """ name = "HERS"
[docs] def __init__(self): super(DataloaderHERS, self).__init__() self.datfile = None
[docs] def load_data(self, filename: str, metadata: bool = False) -> Dataset: """ Recognize correct format and load data from the file. :param filename: absolute path to the file :param metadata: if :py:obj:`True`, read only metadata and size of the dataset. Not used here, but required to mach format of other **Dataloaders**.See :meth:`load_ses_zip` for more info. :return: loaded dataset with available metadata """ if filename.endswith("zip"): self.load_ses_zip(filename, metadata=metadata) elif filename.endswith("ibw"): self.load_ses_ibw(filename, metadata=metadata) elif filename.endswith("pxt"): self.load_ses_pxt(self, filename, metadata=metadata) else: raise NotImplementedError return self.validate_at_return(filename)
[docs] class DataloaderURANOS(Dataloader): """ Dataloader for opening files from Uranos beamline at Solaris (Poland). """ name = "URANOS"
[docs] def __init__(self): super(DataloaderURANOS, self).__init__() self.datfile = None
[docs] def load_data(self, filename: str, metadata: bool = False) -> Dataset: """ Recognize correct format and load data from the file. :param filename: absolute path to the file :param metadata: if :py:obj:`True`, read only metadata and size of the dataset. Not used here, but required to mach format of other **Dataloaders**. See :meth:`load_ses_zip` for more info. :return: loaded dataset with available metadata """ bl_md = [ ("X", "x", float), ("Y", "y", float), ("Z", "z", float), ("R1", "theta", float), ("R3", "tilt", float), ] if filename.endswith("zip"): self.load_ses_zip(filename, bl_md=bl_md, metadata=metadata) elif filename.endswith("ibw"): self.load_ses_ibw(filename, bl_md=bl_md, metadata=metadata) elif filename.endswith("pxt"): self.load_ses_pxt(filename, bl_md=bl_md, metadata=metadata) else: raise NotImplementedError return self.validate_at_return(filename)
[docs] class DataloaderCASSIOPEE(Dataloader): """ Dataloader for opening files from CASSIOPEE beamline at SOLEIL (France). """ name = "CASSIOPEE" # Possible scantypes HV = "hv scan" FSM = "Theta scan"
[docs] def __init__(self): super(DataloaderCASSIOPEE, self).__init__()
[docs] def load_data(self, filename: str, metadata: bool = False) -> Dataset: """ Recognize correct format and load data from the file. :param filename: absolute path to the file. :param metadata: if :py:obj:`True`, read only metadata and size of the dataset. Not used here, but required to mach format of other **Dataloaders**. See :meth:`load_ses_zip` for more info. :return: loaded dataset with available metadata. """ if os.path.isfile(filename): self.load_from_file(filename, metadata=metadata) else: if not filename.endswith("/"): filename += "/" self.load_from_dir(filename) return self.validate_at_return(filename)
[docs] def load_from_file(self, filename: str, metadata: bool = False) -> None: """ Recognize correct format and load data from the file. :param filename: absolute path to the file :param metadata: if :py:obj:`True`, read only metadata and size of the dataset. Not used here, but required to mach format of other **Dataloaders**. See :meth:`load_ses_zip` for more info. :return: loaded dataset with available metadata """ if filename.endswith(".ibw"): self.load_ses_ibw(filename, metadata=metadata) elif filename.endswith("pxt"): self.load_ses_pxt(filename, metadata=metadata) else: self.load_from_txt(filename)
[docs] def load_from_dir(self, dirname: str) -> None: """ Read data from directory containing slices of data saved in separate files. Note: At CASSIOPEE beamline multidimensional scans are saved as a collection of **\ *.txt** files, that need to be combined into full map. :param dirname: absolute path to the file """ # Get the all filenames in the dir all_filenames = os.listdir(dirname) # Remove all non-data files filenames = [] for name in all_filenames: if "_1_i" in name: metadata_file = open(dirname + name) if "ROI" in name: filenames.append(name) # Get metadata from first file in list skip, energy, angles = self.get_metadata(dirname + filenames[0]) keys = [ ("hv (eV) ", "hv", float), ("x (mm) ", "x", float), ("y (mm) ", "y", float), ("z (mm) ", "z", float), ("theta (deg) ", "theta", float), ("phi (deg) ", "phi", float), ("tilt (deg) ", "tilt", float), ("InputB ", "temp", float), ("P(mbar) ", "pressure", float), ("Polarisation [0", "polarization", str), ] md = self.read_metadata(keys, metadata_file) # Get the data from each cut separately. This happens in the order # they appear in os.listdir() which is usually not what we want -> a # reordering is necessary later. unordered = {} i_min = np.inf i_max = -np.inf for name in filenames: # Keep track of the min and max indices in the directory i = int(name.split("_")[-3]) if i < i_min: i_min = i if i > i_max: i_max = i # Get the data of cut i this_cut = np.loadtxt(dirname + name, skiprows=skip + 1)[:, 1:] unordered.update({i: this_cut}) # Properly rearrange the cuts data = [] for i in range(i_min, i_max + 1): data.append(np.array(unordered[i]).T) data = np.array(data) # Get the z-axis from the metadata files scan_type, outer_loop, hv, thetas = self.get_outer_loop(dirname, filenames) thetas = sorted(thetas) if scan_type == self.HV: xscale = outer_loop scan_start = hv[0] scan_stop = hv[-1] scan_step = np.abs(hv[0] - hv[1]) elif scan_type == self.FSM: xscale = outer_loop scan_start = thetas[0] scan_stop = thetas[-1] scan_step = np.abs(thetas[0] - thetas[1]) else: xscale = np.arange(data.shape[0]) scan_start = 0 scan_stop = 0 scan_step = 0 yscale = angles zscale = energy self.ds.data = data self.ds.xscale = xscale self.ds.yscale = yscale self.ds.zscale = zscale self.ds.ekin = zscale self.ds.hv = float(md.hv) self.ds.x = float(md.x) self.ds.y = float(md.y) self.ds.z = float(md.z) self.ds.theta = float(md.theta) self.ds.phi = float(md.phi) self.ds.tilt = float(md.tilt) self.ds.temp = float(md.temp) self.ds.pressure = float(md.pressure) self.ds.polarization = md.polarization self.ds.scan_type = scan_type self.ds.scan_dim = [scan_start, scan_stop, scan_step]
[docs] def load_from_txt(self, filename: str) -> None: """ Load data from **\ *.txt** file. :param filename: absolute path to the file """ i, energy, angles = self.get_metadata(filename) data0 = np.loadtxt(filename, skiprows=i + 1).T # The first column in the datafile contains the angles data = np.array([data0[1:, :]]) self.ds.data = data self.ds.xscale = np.array([0]) self.ds.yscale = angles self.ds.zscale = energy
[docs] def get_outer_loop(self, dirname: str, filenames: list) -> tuple: """ Try to determine the scantype and the corresponding `z`-axis scale from the additional metadata textfiles. These follow the assumptions made in :meth:`~data_loader.DataloaderCASSIOPEE.load_from_dir`. Additionally, the MONOCHROMATOR section must come before the UNDULATOR section as in both sections we have a key `hv` but only the former makes sense. :param dirname: absolute path to the directory :param filenames: list of files' names to load :return: A tuple of (:py:obj:`str` - `scantype`, :class:`np.ndarray` or :py:obj:`float` - extracted xscale or the value for hv for non-hv-scans (``scantype``, ``zscale``, ``hvs[0]``) or (``None``, ``None``, ``hvs[0]``) in case of failure. """ # Prepare containers indices, xs, ys, zs, thetas, phis, tilts, hvs = ([], [], [], [], [], [], [], []) containers = [indices, xs, ys, zs, thetas, phis, tilts, hvs] for name in filenames: # Get the index of the file index = int(name.split("_")[-3]) # Build the metadata-filename by substituting the ROI part with i metafile = re.sub(r"_ROI.?_", "_i", name) # The values are separated from the names by a `:` splitchar = ":" # Read in the file with open(dirname + metafile, "r") as f: for line in f.readlines(): if line.startswith("x (mm)"): x = float(line.split(splitchar)[-1]) elif line.startswith("y (mm)"): y = float(line.split(splitchar)[-1]) elif line.startswith("z (mm)"): z = float(line.split(splitchar)[-1]) elif line.startswith("theta (deg)"): theta = float(line.split(splitchar)[-1]) elif line.startswith("phi (deg)"): phi = float(line.split(splitchar)[-1]) elif line.startswith("tilt (deg)"): tilt = float(line.split(splitchar)[-1]) elif line.startswith("hv (eV)"): hv = float(line.split(splitchar)[-1]) elif line.startswith("UNDULATOR"): break # NOTE The order of this list has to match the order of the # containers values = [index, x, y, z, theta, phi, tilt, hv] for i, container in enumerate(containers): container.append(values[i]) # Check which parameters vary to determine scantype if np.abs(hvs[1] - hvs[0]) > 0.4: scantype = self.HV xscale = hvs elif thetas[1] != thetas[0]: scantype = self.FSM xscale = thetas else: scantype = None xscale = None # Put zscale in order and return if xscale is not None: xscale = np.array(xscale)[np.argsort(indices)] return scantype, xscale, hvs, thetas
[docs] @staticmethod def get_metadata(filename: str) -> tuple: """ Extract some of the metadata stored in a CASSIOPEE output text file. Also try to detect the line number below which the data starts (for ``np.loadtxt( , skiprows=)`` .) :param filename: absolute path to the file :return: (`i`, `energy`, `angles`), where `i` - numbers of rows to skip before redaing data, `energy` - energy axis, `angles` - analyzer axis. """ with open(filename, "r") as f: for i, line in enumerate(f.readlines()): if line.startswith("Dimension 1 scale="): energy = line.split("=")[-1].split() energy = np.array(energy, dtype=float) elif line.startswith("Dimension 2 scale="): angles = line.split("=")[-1].split() angles = np.array(angles, dtype=float) elif line.startswith("Excitation Energy"): pass elif line.startswith("inputA") or line.startswith("[Data"): # this seems to be the last line before the data break return i, energy, angles
[docs] @staticmethod def read_metadata(keys: list, metadata_file: io.IOBase) -> Namespace: """ Read some metadata from one of the header files. :param keys: keys to metadata passed as a list of tuples in format (*name* :py:obj:`str`, *label* :py:obj:`str`, *type* :py:obj:`type`), where *name* stands for how data entry is saved in the file, *label* - how the information should be called in the :class:`Namespace` and *type* - type of the variable (:py:obj:`float`, :py:obj:`str`, `etc.`) :param metadata_file: opened file containing metadata :return: object with collected metadata """ # List of interesting keys and associated variable names metadata = Namespace() for line in metadata_file.readlines(): # Split at 'equals' sign tokens = line.split(":") for key, name, dtype in keys: if tokens[0] == key: if hasattr(metadata, name): pass else: # Split off whitespace or garbage at the end value = tokens[-1][1:-1] # And cast to right type if key == "Polarisation [0": if value == "0": metadata.__setattr__(name, "LV") elif value == "1": metadata.__setattr__(name, "LH") elif value == "2": metadata.__setattr__(name, "AV") elif value == "3": metadata.__setattr__(name, "AH") elif value == "4": metadata.__setattr__(name, "CR") else: pass else: metadata.__setattr__(name, value) metadata_file.close() return metadata
# +-------+ # # | Tools | # ================================================================= # +-------+ #
[docs] def start_step_n(start: float, step: float, n: int) -> np.ndarray: """ Return an array that starts at value ``start`` and goes ``n`` steps of ``step``. Helpful for generating axes, as many systems provide exactly starting value, step and dimensionality of the data. :param start: begining value of the axis :param step: step value along the axis :param n: number of steps :return: generated axis """ end = start + n * step return np.linspace(start, end, n)
[docs] def load_data( filename: str, metadata: bool = False, suppress_warnings: bool = False ) -> Dataset: """ Try to load file by iterating through all Dataloaders and applying the respective **Dataloader's** :obj:`load_data` method. :param filename: absolute path to the file :param metadata: if :py:obj:`True`, read only metadata and size of the dataset. Not used here, but required to mach format of other `Dataloaders`. See :meth:`load_ses_zip` for more info. :param suppress_warnings: if :py:obj:`True`, suppress possible warning to keep terminal clean :return: loaded dataset with available metadata. NOTE: method returns :class:`Dataset` loaded with the first **Dataloader** that didn't raise any errors. Might be, that other **Dataloader** can perform better, especially with regard to loaded metadata. """ # List containing all reasonably defined dataloaders all_dls = [ DataloaderPickle, DataloaderSIS, DataloaderBloch, DataloaderADRESS, DataloaderI05, DataloaderCASSIOPEE, DataloaderMERLIN, ] # Sanity check: does the given path even exist in the filesystem? if not os.path.exists(filename): raise FileNotFoundError(ENOENT, os.strerror(ENOENT), filename) # Suppress warnings with catch_warnings(): if suppress_warnings: simplefilter("ignore") for dataloader in all_dls: dl = dataloader() # Try loading the data try: dataset = dl.load_data(filename, metadata=metadata) except Exception: continue return dataset
[docs] def dump(data: Dataset, filename: str, force: bool = False) -> None: """ Wrapper for :meth:`pickle.dump`, to save opened and modified :class:`Dataset`. :param data: dataset to save :param filename: absolute path to the file, :param force: if :py:obj:`True`, overwrite existing file without asking. Default is :py:obj:`False` """ # Check if file already exists if not force and os.path.isfile(filename): question = "File <{}> exists. Overwrite it? (y/N)".format(filename) answer = input(question) # If the answer is anything but a clear affirmative, stop here if answer.lower() not in ["y", "yes"]: return with open(filename, "wb") as f: pickle.dump(data, f) message = "Wrote to file <{}>.".format(filename) print(message)
[docs] def update_namespace(data: Dataset, *attributes: list) -> None: """ Add attributes to a :class:`Dataset`. :param data: dataset object :param attributes: list of tuples (*name*, *value*) pairs of the attributes to add. Where *name* is a :py:obj:`str` and value any python object. """ for name, attribute in attributes: data.__dict__.update({name: attribute})