from __future__ import annotations
import io
import numpy as np
import ast
import os
import pickle
import re
import h5py
from igor2 import binarywave, packed
import zipfile
from argparse import Namespace
from errno import ENOENT
from warnings import catch_warnings, simplefilter
from datetime import datetime
from typing import Union, Any, List, Dict
from pydantic import BaseModel, Field
from abc import ABC, abstractmethod
[docs]
class Dataset(BaseModel):
"""
The generic class inheriting after :class:`~pydantic.BaseModel` and
defining structure of the dataset in :mod:`piva`.
Data files read by corresponding **Dataloader** classes are returned in
this format.
Creates a generic `dataset` object inheriting from
:class:`~pydantic.BaseModel` that will be filled with all required data
and metadata, depending on what is accessible from files generated by
different instruments, and left as :class:`None` otherwise.
Asterix indicates attributes that are mandatory for functioning of the
**DataViewers**.
Object contains:
=============== =================== =====================================
**attribute** **type** **description**
**data** * :class:`np.ndarray` Acquired data set, always 3D
matrix (``len(data.shape) = 3``).
Oriented as: `dim(0)`-scanned
axis, `dim(1)`- analyzer axis,
`dim(2)`-energy axis. When scan
type is a single cut (resulting
data are 2D), first dimension is
equal (``data[0, :, :] =
np.array([0])``)
**xscale** * :class:`np.ndarray` Axis along the scanned direction,
units depend on the scan type.
When scan type is a single cut
(2D), it is set to
``np.array([1])``
**yscale** * :class:`np.ndarray` Axis along the analyzer slit,
most likely in [deg]
**zscale** * :class:`np.ndarray` Axis along the energy direction,
most likely in [eV]
ekin :class:`np.ndarray` Energy axis in kinetic energy
| :class:`None` scale (if default scale is in
binding energy)
kxscale :class:`np.ndarray` Momentum axis (saved after
| :class:`None` conversion) along the scanned
direction
kyscale :class:`np.ndarray` Momentum axis (saved after
| :class:`None` conversion) along the analyzer
direction
x :class:`float` | `x` position of the manipulator
:class:`None`
y :class:`float` | `y` position of the manipulator
:class:`None`
z :class:`float` | `z` position of the manipulator
:class:`None`
theta :class:`float` | `theta` angle of the manipulator;
:class:`None` often referred as `polar`
phi :class:`float` | `phi` angle of the manipulator;
:class:`None` often referred as `azimuth`
tilt :class:`float` | `tilt` angle of the manipulator
:class:`None`
temp :class:`float` | Temperature during the experiment
:class:`None`
pressure :class:`float` | Pressure during the experiment
:class:`None`
hv :class:`float` | Photon energy used during the
:class:`None` experiment
wf :class:`float` | Work function of the analyzer
:class:`None`
Ef :class:`float` | Correction for the Fermi level
:class:`None`
polarization :class:`str` | Photon polarization
:class:`None`
PE :class:`int` | Pass energy of the analyzer
:class:`None`
exit_slit :class:`float` | Exit (vertical) slit of the
:class:`None` beamline; responsible for energy
resolution
FE :class:`float` | Front end of the beamline
:class:`None`
scan_type :class:`str` | Type of the measurement (e.g.
:class:`None` `cut`, `tilt scan`, `hv scan`)
scan_dim :class:`list` | If scan other than `cut`, scanned
:class:`None` dimensions as list: [`start`,
`stop`, `step`]
acq_mode :class:`str` | Data acquisition mode
:class:`None`
lens_mode :class:`str` | Lens mode of the analyzer
:class:`None`
ana_slit :class:`str` | Slit opening of the analyzer
:class:`None`
defl_angle :class:`float` | Applied deflection angle
:class:`None`
n_sweeps :class:`int` | Number of sweeps
:class:`None`
DT :class:`int` | Dwell time
:class:`None`
data_provenance :class:`dict` Dataset logbook; contains
information about original file
and keeps track of functions
called on the data
=============== =================== =====================================
"""
data: np.ndarray
xscale: np.ndarray
yscale: np.ndarray
zscale: np.ndarray
ekin: np.ndarray | None = None
kxscale: np.ndarray | None = None
kyscale: np.ndarray | None = None
x: float | None = None
y: float | None = None
z: float | None = None
theta: float | None = None
phi: float | None = None
tilt: float | None = None
temp: float | None = None
pressure: float | None = None
hv: float | None = None
wf: float | None = None
Ef: float | None = None
polarization: str | None = None
PE: int | None = None
exit_slit: float | None = None
FE: float | None = None
scan_type: str | None = None
scan_dim: List[float] | None = None
acq_mode: str | None = None
lens_mode: str | None = None
ana_slit: str | None = None
defl_angle: float | None = None
n_sweeps: int | None = None
DT: int | None = None
date: str | None = None
data_provenance: Dict[str, List[Dict]] = Field(
default_factory=lambda: {
"file": [],
"k_space_conv": [],
"edited_entries": [],
}
)
model_config = {
"arbitrary_types_allowed": True,
"extra": "allow",
}
[docs]
def add_org_file_entry(self, fname: str, dl: str) -> None:
"""
Add information about the original data file to data provenance
logbook.
:param fname: file name
:param dl: specific **Dataloader** (inheriting from
:class:`Dataloader`) that was used to open the file
"""
file_entry = {}
file_entry["index"] = 0
file_entry["date_time"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
file_entry["path"] = fname
file_entry["type"] = "original"
file_entry["index_taken"] = "-"
file_entry["binned"] = "-"
file_entry["data_loader"] = dl
self.data_provenance["file"].append(file_entry)
[docs]
class Dataloader(ABC):
"""
Parent class (interface) from which other **DataLoaders** inherit some
methods. Even while using same software, files can differ from beamline to
beamline in terms of format, amount of saved metadata and the way they
are stored. To take differences into account **DataLoaders** for specific
beamlines are implemented separately.
"""
name = "Base"
[docs]
def __init__(self) -> None:
self.ds = Dataset.model_construct()
self.raster = False
self.scan = None
[docs]
@abstractmethod
def load_data(self, filename: str, metadata: bool) -> Dataset:
"""
Must be implemented in subclasses.
"""
pass
[docs]
def load_ses_zip(
self, filename: str, bl_md: list = None, metadata: bool = False
) -> None:
"""
Load data from SES (Scienta) **\ *.zip** files.
:param filename: absolute path to the file
:param bl_md: beamline specific metadata passed as a list of tuples in
format (*name* :py:obj:`str`, *label* :py:obj:`str`,
*type* :py:obj:`type`), where *name* stands for how
data entry is saved in the file, *label* - how the
information should be called in the
:class:`~pydantic.BaseModel` and *type* - type of the
variable (:py:obj:`float`, :py:obj:`str`, `etc.`)
:param metadata: if :py:obj:`True`, read only metadata and size of the
dataset to display them in :class:`DataBrowser`
window. Helps to browse through the files faster,
without actually loading entire file.
"""
# Prepare metadata key-value pairs for the different metadata files
# and their expected types
ds = self.ds
keys1 = [
("width", "n_energy", int),
("depth", "n_x", int),
("height", "n_y", int),
("first_full", "first_energy", int),
("last_full", "last_energy", int),
("widthoffset", "start_energy", float),
("widthdelta", "step_energy", float),
("depthoffset", "start_x", float),
("depthdelta", "step_x", float),
("heightoffset", "start_y", float),
("heightdelta", "step_y", float),
]
# Load the zipfile
with zipfile.ZipFile(filename, "r") as z:
# Get the created filename from the viewer
with z.open("viewer.ini") as viewer:
for line in viewer.readlines():
ln = line.decode("UTF-8")
if ln.startswith("name"):
# Make sure to split off unwanted whitespace
file_id = ln.split("=")[1].split()[0]
# Get most metadata from a metadata file
with z.open("Spectrum_" + file_id + ".ini") as md_file1:
M = Namespace()
self.read_ses_metadata(M, md_file1.readlines(), bl_md=keys1, zip=True)
# Get additional metadata from a second metadata file...
with z.open(file_id + ".ini") as md_file2:
self.read_ses_metadata(ds, md_file2, bl_md=bl_md, zip=True)
# Extract the binary data from the zipfile
if metadata:
data_flat = np.zeros((int(M.n_y) * int(M.n_x) * int(M.n_energy)))
else:
with z.open("Spectrum_" + file_id + ".bin") as f:
data_flat = np.frombuffer(f.read(), dtype="float32")
# Put the data back into its actual shape
data = np.reshape(data_flat, (int(M.n_x), int(M.n_y), int(M.n_energy)))
# Cut off unswept region
data = data[:, :, M.first_energy : M.last_energy + 1]
# Put into shape (energy, other angle, angle along analyzer)
data = np.moveaxis(data, 2, 0)
# Create axes
xscale = start_step_n(M.start_x, M.step_x, M.n_x)
yscale = start_step_n(M.start_y, M.step_y, M.n_y)
energies = start_step_n(M.start_energy, M.step_energy, M.n_energy)
energies = energies[M.first_energy : M.last_energy + 1]
if yscale.size > 1:
data = np.swapaxes(np.swapaxes(data, 0, 1), 1, 2)
else:
data = np.swapaxes(data, 0, 1)
# set data and axes
ds.data = data
ds.xscale = xscale
ds.yscale = yscale
ds.zscale = energies
ds.ekin = energies
if ds.xscale.size == 1:
ds.scan_type = "cut"
else:
ds.scan_dim = [
ds.xscale[0],
ds.xscale[-1],
np.abs(ds.xscale[0] - ds.xscale[1]),
]
[docs]
def load_ses_ibw(
self, filename: str, bl_md: list = None, metadata: bool = False
) -> None:
"""
Load data from SES (Scianta) IGOR binary wave (**\ *.ibw**) files.
:param filename: absolute path to the file
:param bl_md: beamline specific metadata. Not used here, but required
to mach format of other **Dataloaders**. See
:meth:`load_ses_zip` for more info.
:param metadata: if :py:obj:`True`, read only metadata and size of the
dataset. See :meth:`load_ses_zip` for more info.
"""
ds = self.ds
wave = binarywave.load(filename)["wave"]
self._read_ibw_(ds, wave, bl_md=bl_md, metadata=metadata)
[docs]
def load_ses_pxt(
self, filename: str, bl_md: list = None, metadata: bool = False
) -> None:
"""
Load data from SES (Scianta) IGOR packed experiment (**\ *.pxt**)
files.
:param filename: absolute path to the file
:param bl_md: beamline specific metadata. Not used here, but required
to mach format of other **Dataloaders**. See
:meth:`load_ses_zip` for more info.
:param metadata: if :py:obj:`True`, read only metadata and size of the
dataset. See :meth:`load_ses_zip` for more info.
"""
ds = self.ds
wave = packed.load(filename)[0][0].wave["wave"]
self._read_ibw_(ds, wave, bl_md=bl_md)
def _read_ibw_(
self, ds: Dataset, wave: Any, bl_md: list = None, metadata: bool = False
) -> None:
"""
Read igor binary wave file.
:param ds: :class:`Dataset` object to fill up with values.
:param wave: loaded ibw wave.
:param bl_md: beamline specific metadata. Not used here, but required
to mach format of other **Dataloaders**. See
:meth:`load_ses_zip` for more info.
:param metadata: if :py:obj:`True`, read only metadata and size of the
dataset. See :meth:`load_ses_zip` for more info.
:return:
"""
# The `header` contains some metadata
header = wave["wave_header"]
# load raster scan in data are 4D
# if not ((header['nDim'][2] == 0) and (header['nDim'][3] == 0)):
if header["nDim"][3] != 0:
self.raster = True
self.scan = self.load_raster_scan(wave, bl_md=bl_md, metadata=metadata)
nDim = header["nDim"]
steps = header["sfA"]
starts = header["sfB"]
# Construct the x and y scales from start, stop and n
if nDim[2] == 0:
nDim[2] += 1
data = np.swapaxes(np.array([wave["wData"]]), 1, 2)
else:
data = np.swapaxes(np.array([wave["wData"]])[0, :, :, :], 0, 2)
xscale = start_step_n(starts[2], steps[2], nDim[2])
yscale = start_step_n(starts[1], steps[1], nDim[1])
zscale = start_step_n(starts[0], steps[0], nDim[0])
# set data and axes
ds.data = data
ds.xscale = xscale
ds.yscale = yscale
ds.zscale = zscale
# Convert bytestring of ASCII characters `note` containing some
# metadata, to a list of strings
meta = wave["note"].decode("ASCII").split("\r")
self.read_ses_metadata(ds, meta, bl_md=bl_md)
if ds.xscale.size == 1:
ds.scan_type = "cut"
else:
ds.scan_dim = [
ds.xscale[0],
ds.xscale[-1],
np.abs(ds.xscale[0] - ds.xscale[1]),
]
[docs]
@staticmethod
def load_raster_scan(
wave: Any, bl_md: list = None, metadata: bool = False
) -> np.ndarray:
"""
Load data from `xy` manipulator raster scan. Each energy-momentum map
is saved as a separate :class:`Dataset` object.
:param wave: loaded ibw wave.
:param bl_md: beamline specific metadata. Not used here, but required
to mach format of other **Dataloaders**. See
:meth:`load_ses_zip` for more info.
:param metadata: if :py:obj:`True`, read only metadata and size of the
dataset. See :meth:`load_ses_zip` for more info.
:return: array with loaded :class:`Dataset` objects
"""
# The `header` contains some metadata
header = wave["wave_header"]
nDim = header["nDim"]
steps = header["sfA"]
starts = header["sfB"]
scan = np.empty((nDim[2], nDim[3]), dtype=object)
xscale = np.array([0])
yscale = start_step_n(starts[1], steps[1], nDim[1])
zscale = start_step_n(starts[0], steps[0], nDim[0])
x_axis = start_step_n(starts[2], steps[2], nDim[2])
y_axis = start_step_n(starts[3], steps[3], nDim[3])
for xi in range(nDim[2]):
for yi in range(nDim[3]):
scan[xi, yi] = Dataset()
tmp_dl = DataloaderBloch()
meta = wave["note"].decode("ASCII").split("\r")
tmp_dl.read_ses_metadata(scan[xi, yi], meta, bl_md)
scan[xi, yi].x = x_axis[xi]
scan[xi, yi].y = y_axis[yi]
scan[xi, yi].data = np.zeros((1, nDim[1], nDim[0]))
if not metadata:
scan[xi, yi].data[0, :, :] = wave["wData"][:, :, xi, yi].T
scan[xi, yi].xscale = xscale
scan[xi, yi].yscale = yscale
scan[xi, yi].zscale = zscale
scan[xi, yi].scan_type = "raster scan"
return scan
[docs]
def validate_at_return(self, filename: str):
"""
Validate that the Dataset was correctly populated with data and add
original file information to the data provenance record.
:param filename: absolute path to the file
:return: loaded dataset with available metadata
"""
if self.raster:
for xi in range(self.scan.shape[0]):
for yi in range(self.scan.shape[1]):
dsi = self.scan[xi, yi]
dsi.add_org_file_entry(filename, self.name)
dsi.validate_assignment = True
self.scan[xi, yi] = Dataset.model_validate(dsi.model_dump())
return self.scan
else:
self.ds.add_org_file_entry(filename, self.name)
self.ds.validate_assignment = True
return Dataset.model_validate(self.ds.model_dump())
[docs]
class DataloaderPickle(Dataloader):
"""
Dataloader for opening files saved with :mod:`piva`.
Files are in binary format saved using :mod:`pickle` module, and
contain raw :class:`Dataset` object.
"""
name = "Pickle"
[docs]
def __init__(self):
super(DataloaderPickle, self).__init__()
[docs]
def load_data(self, filename: str, metadata: bool = False) -> Dataset:
"""
Load :mod:`pickle` file and bring it into correct format.
:param filename: absolute path to the file
:param metadata: if :py:obj:`True`, read only metadata and size of the
dataset. Not used here, but required to mach format
of other **Dataloaders**. See :meth:`load_ses_zip`
for more info.
:return: loaded dataset with available metadata
"""
# Open the file and get a handle for it
if filename.endswith(".p"):
with open(filename, "rb") as f:
filedata = pickle.load(f)
else:
raise NotImplementedError
if isinstance(filedata, np.ndarray):
self.raster = True
self.scan = filedata
# synchronize all attributes in case file was saved using older version
for attr in Dataset.model_fields:
if hasattr(filedata, attr):
try:
setattr(self.ds, attr, getattr(filedata, attr))
except Exception as e:
raise e
return self.validate_at_return(filename)
[docs]
class DataloaderSIS(Dataloader):
"""
Dataloader for opening files from SIS beamline at SLS (Swiss Light Source,
Switzerland).
"""
name = "SIS"
[docs]
def __init__(self):
super(DataloaderSIS, self).__init__()
[docs]
def load_data(self, filename: str, metadata: bool = False) -> Dataset:
"""
Recognize correct format and load data from the file.
:param filename: absolute path to the file
:param metadata: if :py:obj:`True`, read only metadata and size of the
dataset. Not used here, but required to mach format
of other **Dataloaders**. See :meth:`load_ses_zip`
for more info.
:return: loaded dataset with available metadata
"""
if filename.endswith("h5"):
self.load_h5(filename, metadata=metadata)
elif filename.endswith("zip"):
self.load_ses_zip(filename, metadata=metadata)
elif filename.endswith("ibw"):
self.load_ses_ibw(filename, metadata=metadata)
elif filename.endswith("pxt"):
self.load_ses_pxt(filename, metadata=metadata)
else:
raise NotImplementedError
return self.validate_at_return(filename)
[docs]
def load_h5(self, filename: str, metadata: bool = False) -> None:
"""
Load HDF file and all available metadata.
:param filename: absolute path to the file
:param metadata: if :py:obj:`True`, read only metadata and size of the
dataset. Not used here, but required to mach format
of other **Dataloaders**. See :meth:`load_ses_zip`
for more info.
"""
# Load the hdf5 file
# Use 'rdcc_nbytes' flag for setting up the chunk cache (in bytes)
datfile = h5py.File(filename, "r")
# Extract the actual dataset and some metadata
h5_data = datfile["Electron Analyzer/Image Data"]
attributes = h5_data.attrs
# Convert to array and make 3 dimensional if necessary
shape = h5_data.shape
if metadata:
data = np.zeros(shape)
else:
if len(shape) == 3:
data = np.zeros(shape)
for i in range(shape[2]):
data[:, :, i] = h5_data[:, :, i]
else:
data = np.array(h5_data)
data = data.T
if len(shape) == 2:
x = 1
y = shape[1]
N_E = shape[0]
# Make data 3D
data = data.reshape((1, y, N_E))
# Extract the limits
xlims = [1, 1]
ylims = attributes["Axis1.Scale"]
elims = attributes["Axis0.Scale"]
xscale = start_step_n(*xlims, x)
yscale = start_step_n(*ylims, y)
energies = start_step_n(*elims, N_E)
elif len(shape) == 3:
x = shape[1]
y = shape[2]
N_E = shape[0]
# Extract the limits
xlims = attributes["Axis2.Scale"]
ylims = attributes["Axis1.Scale"]
elims = attributes["Axis0.Scale"]
xscale = start_step_n(*xlims, y)
yscale = start_step_n(*ylims, x)
energies = start_step_n(*elims, N_E)
# Case sequence of cuts
else:
x = shape[0]
y = shape[1]
N_E = y
data = np.rollaxis(data, 2, 0)
# Extract the limits
xlims = attributes["Axis1.Scale"]
ylims = attributes["Axis0.Scale"]
elims = ylims
xscale = start_step_n(*xlims, y)
yscale = start_step_n(*ylims, x)
energies = start_step_n(*elims, N_E)
# Extract some data for ang2k conversion
metadata = datfile["Other Instruments"]
x_pos = metadata["X"][0]
y_pos = metadata["Y"][0]
z_pos = metadata["Z"][0]
theta = metadata["Theta"][0]
phi = metadata["Phi"][0]
tilt = metadata["Tilt"][0]
# account for differences from before the beamline upgrade
try:
temp = metadata["Temperature B (Sample 1)"][0]
pressure = metadata["Pressure AC (ACMI)"][0]
except KeyError:
temp = metadata["Temperature B"][0]
pressure = metadata["Pressure AC1"][0]
hv = attributes["Excitation Energy (eV)"]
wf = attributes["Work Function (eV)"]
polarization = metadata["hv"].attrs["Mode"][10:]
PE = attributes["Pass Energy (eV)"]
exit_slit = metadata["Exit Slit"][0]
FE = metadata["FE Horiz. Width"][0]
ekin = energies + hv - wf
lens_mode = attributes["Lens Mode"]
acq_mode = attributes["Acquisition Mode"]
n_sweeps = attributes["Sweeps on Last Image"]
DT = attributes["Dwell Time (ms)"]
if "Axis2.Scale" in attributes:
scan_type = attributes["Axis2.Description"] + " scan"
start = attributes["Axis2.Scale"][0]
step = attributes["Axis2.Scale"][1]
stop = (
attributes["Axis2.Scale"][0]
+ attributes["Axis2.Scale"][1] * xscale.size
)
scan_dim = [start, stop, step]
else:
scan_type = "cut"
scan_dim = []
self.ds.data = data
self.ds.xscale = xscale
self.ds.yscale = yscale
self.ds.zscale = energies
self.ds.ekin = ekin
self.ds.x = x_pos
self.ds.y = y_pos
self.ds.z = z_pos
self.ds.theta = theta
self.ds.phi = phi
self.ds.tilt = tilt
self.ds.temp = temp
self.ds.pressure = pressure
self.ds.hv = hv
self.ds.wf = wf
self.ds.polarization = polarization
self.ds.PE = PE
self.ds.exit_slit = exit_slit
self.ds.FE = FE
self.ds.scan_type = scan_type
self.ds.scan_dim = scan_dim
self.ds.lens_mode = lens_mode
self.ds.acq_mode = acq_mode
self.ds.n_sweeps = n_sweeps
self.ds.DT = DT
h5py.File.close(datfile)
[docs]
class DataloaderADRESS(Dataloader):
"""
Dataloader for opening files from Address beamline at SLS (Swiss Light
Source, Switzerland).
"""
name = "ADRESS"
[docs]
def __init__(self):
super(DataloaderADRESS, self).__init__()
[docs]
def load_data(self, filename: str, metadata: bool = False) -> Dataset:
"""
Recognize correct format and load data from the file.
:param filename: absolute path to the file
:param metadata: if :py:obj:`True`, read only metadata and size of the
dataset. Not used here, but required to mach format
of other **Dataloaders**. See :meth:`load_ses_zip`
for more info.
:return: loaded dataset with available metadata
"""
if filename.endswith("h5"):
self.load_h5(filename, metadata=metadata)
else:
raise NotImplementedError
return self.validate_at_return(filename)
[docs]
def load_h5(self, filename: str, metadata: bool = False) -> None:
"""
Load HDF file and all available metadata.
:param filename: absolute path to the file
:param metadata: if :py:obj:`True`, read only metadata and size of the
dataset. Not used here, but required to mach format
of other **Dataloaders**. See :meth:`load_ses_zip`
for more info.
"""
h5file = h5py.File(filename, "r")
# The actual data is in the field: 'Matrix'
matrix = h5file["Matrix"]
# The scales can be extracted from the matrix' attributes
scalings = matrix.attrs["IGORWaveScaling"]
info = matrix.attrs["IGORWaveNote"]
# Convert `units` and `info`, which is a bytestring of ASCII
# characters, to lists of strings Put the data into a numpy array and
# convert to float
if metadata:
data = np.zeros(matrix.shape)
else:
data = np.array(matrix, dtype=float)
shape = data.shape
if len(shape) == 3:
# Case map or hv scan (or...?)
data = np.rollaxis(data.T, 2, 1)
shape = data.shape
# Shape has changed
xstep, xstart = scalings[3]
ystep, ystart = scalings[1]
zstep, zstart = scalings[2]
xscale = start_step_n(xstart, xstep, shape[0])
yscale = start_step_n(ystart, ystep, shape[1])
zscale = start_step_n(zstart, zstep, shape[2])
else:
# Case cut
# Make data 3-dimensional by adding an empty dimension
data = data.reshape(1, shape[0], shape[1])
# Shape has changed
shape = data.shape
ystep, ystart = scalings[1]
zstep, zstart = scalings[2]
xscale = np.array([1])
yscale = start_step_n(ystart, ystep, shape[1])
zscale = start_step_n(zstart, zstep, shape[2])
self.ds.data = data
self.ds.xscale = xscale
self.ds.yscale = yscale
self.ds.zscale = zscale
# more metadata
metadata_list = info.decode("ASCII").split("\n")
keys1 = [
("hv", "hv", float),
("Pol", "polarization", str),
("Slit ", "exit_slit", float),
("Mode", "lens_mode", str),
("Epass", "PE", int),
("X ", "x", float),
("Y ", "y", float),
("Z ", "z", float),
("Theta", "theta", float),
("Azimuth", "phi", float),
("Tilt", "tilt", float),
("ADef", "defl_angle", float),
("Temp", "temp", float),
("dt", "DT", int),
]
self.read_metadata(keys1, metadata_list)
if xscale.size == 1:
self.ds.__setattr__("scan_type", "cut")
h5py.File.close(h5file)
[docs]
class DataloaderBloch(Dataloader):
"""
Dataloader for opening files from Bloch beamline at MAX-IV (Sweden).
"""
name = "Bloch"
[docs]
def __init__(self):
super(DataloaderBloch, self).__init__()
[docs]
def load_data(self, filename: str, metadata: bool = False) -> Dataset:
"""
Recognize correct format and load data from the file.
:param filename: absolute path to the file
:param metadata: if :py:obj:`True`, read only metadata and size of the
dataset. Not used here, but required to mach format
of other **Dataloaders**.See :meth:`load_ses_zip`
for more info.
:return: loaded dataset with available metadata
"""
bl_md = [
("A", "phi", float),
("P", "theta", float),
("T", "tilt", float),
("X", "x", float),
("Y", "y", float),
("Z", "z", float),
]
if filename.endswith("zip"):
self.load_ses_zip(filename, bl_md=bl_md, metadata=metadata)
elif filename.endswith("ibw"):
self.load_ses_ibw(filename, bl_md=bl_md, metadata=metadata)
elif filename.endswith("pxt"):
self.load_ses_pxt(filename, bl_md=bl_md, metadata=metadata)
else:
raise NotImplementedError
return self.validate_at_return(filename)
[docs]
class DataloaderI05(Dataloader):
"""
Dataloader for opening files from I05 beamline at Diamond Light Source
(UK).
"""
name = "I05"
[docs]
def __init__(self):
super(DataloaderI05, self).__init__()
[docs]
def load_data(self, filename: str, metadata: bool = False) -> Dataset:
"""
Recognize correct format and load data from the file.
:param filename: absolute path to the file
:param metadata: if :py:obj:`True`, read only metadata and size of the
dataset. Not used here, but required to mach format
of other **Dataloaders**.See :meth:`load_ses_zip`
for more info.
:return: loaded dataset with available metadata
"""
if filename.endswith("nxs"):
self.load_nxs(filename, metadata=metadata)
else:
raise NotImplementedError
return self.validate_at_return(filename)
[docs]
def load_nxs(self, filename: str, metadata: bool) -> None:
"""
Load nexus file and all available metadata.
:param filename: absolute path to the file
:param metadata: if :py:obj:`True`, read only metadata and size of the
dataset. Not used here, but required to mach format
of other **Dataloaders**. See :meth:`load_ses_zip`
for more info.
"""
# Read file with h5py reader
infile = h5py.File(filename, "r")
if metadata:
data = np.zeros(infile["/entry1/analyser/data"].shape)
else:
data = np.array(infile["/entry1/analyser/data"])
angles = np.array(infile["/entry1/analyser/angles"])
energies = np.array(infile["/entry1/analyser/energies"])
if len(energies.shape) == 2:
zscale = energies[0]
else:
zscale = energies
yscale = angles
# Check if we have a scan
if data.shape[0] == 1:
xscale = np.array([0])
else:
# Otherwise, extract third dimension from scan command
command = infile["entry1/scan_command"][()]
# Special case for 'pathgroup'
if command.split()[1] == "pathgroup":
self.print_m("is pathgroup")
# Extract points from a ([polar, x, y], [polar, x, y], ...)
# tuple
points = command.split("(")[-1].split(")")[0]
tuples = points.split("[")[1:]
xscale = []
for t in tuples:
point = t.split(",")[0]
xscale.append(float(point))
xscale = np.array(xscale)
# Special case for 'scangroup'
elif command.split()[1] == "scan_group":
self.print_m("is scan_group")
# Extract points from a ([polar, x, y], [polar, x, y], ...)
# tuple
points = command.split("((")[-1].split("))")[0]
points = "((" + points + "))"
xscale = np.array(ast.literal_eval(points))[:, 0]
# Now, if this was a scan with varying centre_energy, the
# zscale contains a list of energies... for now, just take
# the first one
zscale = zscale[0]
# "Normal" case
else:
start_stop_step = command.split()[2:5]
start, stop, step = [float(s) for s in start_stop_step]
xscale = np.arange(start, stop + 0.5 * step, step)
if not (xscale.size == data.shape[0]):
x_done = data.shape[0]
data_tmp = np.zeros((xscale.size, data.shape[1], data.shape[2]))
data_tmp[:x_done, :, :] = data
data = data_tmp
xscale = xscale[: data.shape[0]]
# read metadata
x = float(infile["entry1/instrument/manipulator/sax"][0])
y = float(infile["entry1/instrument/manipulator/say"][0])
z = float(infile["entry1/instrument/manipulator/saz"][0])
theta = float(infile["entry1/instrument/manipulator/sapolar"][0])
phi = float(infile["entry1/instrument/manipulator/saazimuth"][0])
tilt = float(infile["entry1/instrument/manipulator/satilt"][0])
PE = int(infile["entry1/instrument/analyser/pass_energy"][0])
n_sweeps = int(infile["entry1/instrument/analyser/number_of_iterations"][0])
lens_mode = str(infile["entry1/instrument/analyser/lens_mode"][0])[2:-1]
acq_mode = str(infile["entry1/instrument/analyser/acquisition_mode"][0])[2:-1]
DT = int(infile["entry1/instrument/analyser/time_for_frames"][0] * 1000)
defl_ang = float(infile["entry1/instrument/analyser/deflector_x"][0])
hv = float(infile["entry1/instrument/monochromator/energy"][0])
exit_slit = float(
infile["entry1/instrument/monochromator/exit_slit_size"][0] * 1000
)
FE = round(
infile["entry1/instrument/monochromator/s2_horizontal_slit_size"][0], 2
)
polarization = str(
infile["entry1/instrument/insertion_device/beam/final_polarisation_label"][
0
]
)[2:-1]
temp = float(infile["entry1/sample/temperature"][0])
pressure = float(infile["entry1/sample/lc_pressure"][0])
# get scan info
if infile["entry1/scan_dimensions"][0] == 1:
scan_type = "cut"
scan_dim = None
else:
tmp = str(np.string_(infile["entry1/scan_command"]))[2:-1].split()
start, stop, step = float(tmp[2]), float(tmp[3]), float(tmp[4])
scan_dim = [start, stop, step]
if "deflector" in tmp[1]:
scan_type = "DA"
elif "polar" in tmp[1]:
scan_type = "theta"
elif "energy" in tmp[1]:
scan_type = "hv"
scan_type += " scan"
self.ds.data = data
self.ds.xscale = xscale
self.ds.yscale = yscale
self.ds.zscale = zscale
self.ds.ekin = None
self.ds.kxscale = None
self.ds.kyscale = None
self.ds.x = x
self.ds.y = y
self.ds.z = z
self.ds.theta = theta
self.ds.phi = phi
self.ds.tilt = tilt
self.ds.temp = temp
self.ds.pressure = pressure
self.ds.hv = hv
self.ds.wf = None
self.ds.Ef = None
self.ds.polarization = polarization
self.ds.PE = PE
self.ds.exit_slit = exit_slit
self.ds.FE = FE
self.ds.scan_type = scan_type
self.ds.scan_dim = scan_dim
self.ds.acq_mode = acq_mode
self.ds.lens_mode = lens_mode
self.ds.ana_slit = None
self.ds.defl_angle = defl_ang
self.ds.n_sweeps = n_sweeps
self.ds.DT = DT
h5py.File.close(infile)
[docs]
class DataloaderMERLIN(Dataloader):
"""
Dataloader for opening files from Merlin beamline at ALS (Advanced Light
Source, Berkeley, CA).
"""
name = "Merlin"
[docs]
def __init__(self):
super(DataloaderMERLIN, self).__init__()
self.datfile = None
[docs]
def load_data(self, filename: str, metadata: bool = False) -> Dataset:
"""
Recognize correct format and load data from the file.
:param filename: absolute path to the file
:param metadata: if :py:obj:`True`, read only metadata and size of the
dataset. Not used here, but required to mach format
of other **Dataloaders**.See :meth:`load_ses_zip`
for more info.
:return: loaded dataset with available metadata
"""
if filename.endswith("h5"):
self.load_h5(filename, metadata=metadata)
elif filename.endswith("ibw"):
self.load_ses_ibw(filename, metadata=metadata)
elif filename.endswith("pxt"):
self.load_ses_pxt(self, filename, metadata=metadata)
else:
raise NotImplementedError
return self.validate_at_return(filename)
[docs]
def load_h5(self, filename: str, metadata: bool = False) -> None:
"""
Load HDF type file and all available metadata.
:param filename: absolute path to the file
:param metadata: if :py:obj:`True`, read only metadata and size of the
dataset. Not used here, but required to mach format
of other **Dataloaders**. See :meth:`load_ses_zip`
for more info.
"""
# Load the hdf5 file
# Use 'rdcc_nbytes' flag for setting up the chunk cache (in bytes)
self.datfile = h5py.File(filename, "r")
if "3Ddata" in self.datfile.keys():
type = "3Ddata"
elif "2Ddata" in self.datfile.keys():
type = "2Ddata"
else:
return
# Extract the actual dataset and some metadata
h5_data = self.datfile[type + "/Spectrum"]
detector = self.datfile[type + "/Detector"].attrs
sample = self.datfile[type + "/Sample"].attrs
source = self.datfile[type + "/Source"].attrs
if type == "2Ddata":
data = np.zeros((1, h5_data.shape[0], h5_data.shape[1]))
data[0, :, :] = h5_data
xscale = np.array([1])
yscale = start_step_n(
float(h5_data.attrs["AxisScaling"][1, 1]),
float(h5_data.attrs["AxisScaling"][1, 0]),
h5_data.shape[0],
)
zscale = start_step_n(
float(h5_data.attrs["AxisScaling"][0, 1]),
float(h5_data.attrs["AxisScaling"][0, 0]),
h5_data.shape[1],
)
scan_type = "cut"
scan_dim = []
elif type == "3Ddata":
data = np.zeros(h5_data.shape)
if not metadata:
for i in range(data.shape[0]):
data[i, :, :] = h5_data[i, :, :]
data = np.swapaxes(np.swapaxes(data, 0, 2), 1, 2)
try:
xaxis = []
file = open(filename[:-3] + "_Motor_Pos.txt")
for line in file.readlines():
xaxis.append(line.strip("\n"))
file.close()
scan_type = str(xaxis[0])
xscale = np.array(xaxis[1:], dtype=float)
if xscale[0] > xscale[-1]:
xscale = np.flip(xscale)
data = np.flip(data, axis=0)
scan_dim = [xscale[0], xscale[-1], np.abs(xscale[0] - xscale[1])]
except FileNotFoundError as e:
raise e
yscale = start_step_n(
float(h5_data.attrs["AxisScaling"][1, 1]),
float(h5_data.attrs["AxisScaling"][1, 0]),
h5_data.shape[0],
)
zscale = start_step_n(
float(h5_data.attrs["AxisScaling"][0, 1]),
float(h5_data.attrs["AxisScaling"][0, 0]),
h5_data.shape[1],
)
else:
return
# Extract some metadata
x_pos = float(sample["Sample X"])
y_pos = float(sample["Sample Y"])
z_pos = float(sample["Sample Z"])
theta = float(sample["Polar"])
phi = float(sample["Azimuth"])
tilt = float(sample["Tilt"])
temp = float(sample["Temperature A"])
pressure = float(sample["Pressure"])
hv = float(source["BL Energy"])
wf = 4.44
polarization = ["LH", "LC", "LV", "RC"][int(source["EPU POL"])]
PE = int(detector["Pass Energy"])
exit_slit = round(float(source["Exit Slit"]), 2)
FE = round(float(source["Entrance Slit"]), 2)
lens_mode = detector["Lens Mode"]
acq_mode = detector["Acq Mode"]
n_sweeps = int(detector["Num of Sweeps"])
DT = float(detector["Step Time"])
self.ds.data = data
self.ds.xscale = xscale
self.ds.yscale = yscale
self.ds.zscale = zscale
self.ds.x = x_pos
self.ds.y = y_pos
self.ds.z = z_pos
self.ds.theta = theta
self.ds.phi = phi
self.ds.tilt = tilt
self.ds.temp = temp
self.ds.pressure = pressure
self.ds.hv = hv
self.ds.wf = wf
self.ds.polarization = polarization
self.ds.PE = PE
self.ds.exit_slit = exit_slit
self.ds.FE = FE
self.ds.scan_type = scan_type
self.ds.scan_dim = scan_dim
self.ds.lens_mode = lens_mode
self.ds.acq_mode = acq_mode
self.ds.n_sweeps = n_sweeps
self.ds.DT = DT
h5py.File.close(self.datfile)
[docs]
class DataloaderHERS(Dataloader):
"""
Dataloader for opening files from Merlin beamline at ALS (Advanced Light
Source, Berkeley, CA).
"""
name = "HERS"
[docs]
def __init__(self):
super(DataloaderHERS, self).__init__()
self.datfile = None
[docs]
def load_data(self, filename: str, metadata: bool = False) -> Dataset:
"""
Recognize correct format and load data from the file.
:param filename: absolute path to the file
:param metadata: if :py:obj:`True`, read only metadata and size of the
dataset. Not used here, but required to mach format
of other **Dataloaders**.See :meth:`load_ses_zip`
for more info.
:return: loaded dataset with available metadata
"""
if filename.endswith("zip"):
self.load_ses_zip(filename, metadata=metadata)
elif filename.endswith("ibw"):
self.load_ses_ibw(filename, metadata=metadata)
elif filename.endswith("pxt"):
self.load_ses_pxt(self, filename, metadata=metadata)
else:
raise NotImplementedError
return self.validate_at_return(filename)
[docs]
class DataloaderURANOS(Dataloader):
"""
Dataloader for opening files from Uranos beamline at Solaris (Poland).
"""
name = "URANOS"
[docs]
def __init__(self):
super(DataloaderURANOS, self).__init__()
self.datfile = None
[docs]
def load_data(self, filename: str, metadata: bool = False) -> Dataset:
"""
Recognize correct format and load data from the file.
:param filename: absolute path to the file
:param metadata: if :py:obj:`True`, read only metadata and size of the
dataset. Not used here, but required to mach format
of other **Dataloaders**. See :meth:`load_ses_zip`
for more info.
:return: loaded dataset with available metadata
"""
bl_md = [
("X", "x", float),
("Y", "y", float),
("Z", "z", float),
("R1", "theta", float),
("R3", "tilt", float),
]
if filename.endswith("zip"):
self.load_ses_zip(filename, bl_md=bl_md, metadata=metadata)
elif filename.endswith("ibw"):
self.load_ses_ibw(filename, bl_md=bl_md, metadata=metadata)
elif filename.endswith("pxt"):
self.load_ses_pxt(filename, bl_md=bl_md, metadata=metadata)
else:
raise NotImplementedError
return self.validate_at_return(filename)
[docs]
class DataloaderCASSIOPEE(Dataloader):
"""
Dataloader for opening files from CASSIOPEE beamline at SOLEIL (France).
"""
name = "CASSIOPEE"
# Possible scantypes
HV = "hv scan"
FSM = "Theta scan"
[docs]
def __init__(self):
super(DataloaderCASSIOPEE, self).__init__()
[docs]
def load_data(self, filename: str, metadata: bool = False) -> Dataset:
"""
Recognize correct format and load data from the file.
:param filename: absolute path to the file.
:param metadata: if :py:obj:`True`, read only metadata and size of the
dataset. Not used here, but required to mach format
of other **Dataloaders**. See :meth:`load_ses_zip`
for more info.
:return: loaded dataset with available metadata.
"""
if os.path.isfile(filename):
self.load_from_file(filename, metadata=metadata)
else:
if not filename.endswith("/"):
filename += "/"
self.load_from_dir(filename)
return self.validate_at_return(filename)
[docs]
def load_from_file(self, filename: str, metadata: bool = False) -> None:
"""
Recognize correct format and load data from the file.
:param filename: absolute path to the file
:param metadata: if :py:obj:`True`, read only metadata and size of the
dataset. Not used here, but required to mach format
of other **Dataloaders**. See :meth:`load_ses_zip`
for more info.
:return: loaded dataset with available metadata
"""
if filename.endswith(".ibw"):
self.load_ses_ibw(filename, metadata=metadata)
elif filename.endswith("pxt"):
self.load_ses_pxt(filename, metadata=metadata)
else:
self.load_from_txt(filename)
[docs]
def load_from_dir(self, dirname: str) -> None:
"""
Read data from directory containing slices of data saved in separate
files.
Note: At CASSIOPEE beamline multidimensional scans are saved as a
collection of **\ *.txt** files, that need to be combined into full
map.
:param dirname: absolute path to the file
"""
# Get the all filenames in the dir
all_filenames = os.listdir(dirname)
# Remove all non-data files
filenames = []
for name in all_filenames:
if "_1_i" in name:
metadata_file = open(dirname + name)
if "ROI" in name:
filenames.append(name)
# Get metadata from first file in list
skip, energy, angles = self.get_metadata(dirname + filenames[0])
keys = [
("hv (eV) ", "hv", float),
("x (mm) ", "x", float),
("y (mm) ", "y", float),
("z (mm) ", "z", float),
("theta (deg) ", "theta", float),
("phi (deg) ", "phi", float),
("tilt (deg) ", "tilt", float),
("InputB ", "temp", float),
("P(mbar) ", "pressure", float),
("Polarisation [0", "polarization", str),
]
md = self.read_metadata(keys, metadata_file)
# Get the data from each cut separately. This happens in the order
# they appear in os.listdir() which is usually not what we want -> a
# reordering is necessary later.
unordered = {}
i_min = np.inf
i_max = -np.inf
for name in filenames:
# Keep track of the min and max indices in the directory
i = int(name.split("_")[-3])
if i < i_min:
i_min = i
if i > i_max:
i_max = i
# Get the data of cut i
this_cut = np.loadtxt(dirname + name, skiprows=skip + 1)[:, 1:]
unordered.update({i: this_cut})
# Properly rearrange the cuts
data = []
for i in range(i_min, i_max + 1):
data.append(np.array(unordered[i]).T)
data = np.array(data)
# Get the z-axis from the metadata files
scan_type, outer_loop, hv, thetas = self.get_outer_loop(dirname, filenames)
thetas = sorted(thetas)
if scan_type == self.HV:
xscale = outer_loop
scan_start = hv[0]
scan_stop = hv[-1]
scan_step = np.abs(hv[0] - hv[1])
elif scan_type == self.FSM:
xscale = outer_loop
scan_start = thetas[0]
scan_stop = thetas[-1]
scan_step = np.abs(thetas[0] - thetas[1])
else:
xscale = np.arange(data.shape[0])
scan_start = 0
scan_stop = 0
scan_step = 0
yscale = angles
zscale = energy
self.ds.data = data
self.ds.xscale = xscale
self.ds.yscale = yscale
self.ds.zscale = zscale
self.ds.ekin = zscale
self.ds.hv = float(md.hv)
self.ds.x = float(md.x)
self.ds.y = float(md.y)
self.ds.z = float(md.z)
self.ds.theta = float(md.theta)
self.ds.phi = float(md.phi)
self.ds.tilt = float(md.tilt)
self.ds.temp = float(md.temp)
self.ds.pressure = float(md.pressure)
self.ds.polarization = md.polarization
self.ds.scan_type = scan_type
self.ds.scan_dim = [scan_start, scan_stop, scan_step]
[docs]
def load_from_txt(self, filename: str) -> None:
"""
Load data from **\ *.txt** file.
:param filename: absolute path to the file
"""
i, energy, angles = self.get_metadata(filename)
data0 = np.loadtxt(filename, skiprows=i + 1).T
# The first column in the datafile contains the angles
data = np.array([data0[1:, :]])
self.ds.data = data
self.ds.xscale = np.array([0])
self.ds.yscale = angles
self.ds.zscale = energy
[docs]
def get_outer_loop(self, dirname: str, filenames: list) -> tuple:
"""
Try to determine the scantype and the corresponding `z`-axis scale from
the additional metadata textfiles. These follow the assumptions made
in :meth:`~data_loader.DataloaderCASSIOPEE.load_from_dir`.
Additionally, the MONOCHROMATOR section must come before the
UNDULATOR section as in both sections we have a key `hv` but only the
former makes sense.
:param dirname: absolute path to the directory
:param filenames: list of files' names to load
:return: A tuple of (:py:obj:`str` - `scantype`, :class:`np.ndarray` or
:py:obj:`float` - extracted xscale or the value for hv for
non-hv-scans (``scantype``, ``zscale``, ``hvs[0]``) or
(``None``, ``None``, ``hvs[0]``) in case of failure.
"""
# Prepare containers
indices, xs, ys, zs, thetas, phis, tilts, hvs = ([], [], [], [], [], [], [], [])
containers = [indices, xs, ys, zs, thetas, phis, tilts, hvs]
for name in filenames:
# Get the index of the file
index = int(name.split("_")[-3])
# Build the metadata-filename by substituting the ROI part with i
metafile = re.sub(r"_ROI.?_", "_i", name)
# The values are separated from the names by a `:`
splitchar = ":"
# Read in the file
with open(dirname + metafile, "r") as f:
for line in f.readlines():
if line.startswith("x (mm)"):
x = float(line.split(splitchar)[-1])
elif line.startswith("y (mm)"):
y = float(line.split(splitchar)[-1])
elif line.startswith("z (mm)"):
z = float(line.split(splitchar)[-1])
elif line.startswith("theta (deg)"):
theta = float(line.split(splitchar)[-1])
elif line.startswith("phi (deg)"):
phi = float(line.split(splitchar)[-1])
elif line.startswith("tilt (deg)"):
tilt = float(line.split(splitchar)[-1])
elif line.startswith("hv (eV)"):
hv = float(line.split(splitchar)[-1])
elif line.startswith("UNDULATOR"):
break
# NOTE The order of this list has to match the order of the
# containers
values = [index, x, y, z, theta, phi, tilt, hv]
for i, container in enumerate(containers):
container.append(values[i])
# Check which parameters vary to determine scantype
if np.abs(hvs[1] - hvs[0]) > 0.4:
scantype = self.HV
xscale = hvs
elif thetas[1] != thetas[0]:
scantype = self.FSM
xscale = thetas
else:
scantype = None
xscale = None
# Put zscale in order and return
if xscale is not None:
xscale = np.array(xscale)[np.argsort(indices)]
return scantype, xscale, hvs, thetas
# +-------+ #
# | Tools | # =================================================================
# +-------+ #
[docs]
def start_step_n(start: float, step: float, n: int) -> np.ndarray:
"""
Return an array that starts at value ``start`` and goes ``n`` steps of
``step``. Helpful for generating axes, as many systems provide exactly
starting value, step and dimensionality of the data.
:param start: begining value of the axis
:param step: step value along the axis
:param n: number of steps
:return: generated axis
"""
end = start + n * step
return np.linspace(start, end, n)
[docs]
def load_data(
filename: str, metadata: bool = False, suppress_warnings: bool = False
) -> Dataset:
"""
Try to load file by iterating through all Dataloaders and applying the
respective **Dataloader's** :obj:`load_data` method.
:param filename: absolute path to the file
:param metadata: if :py:obj:`True`, read only metadata and size of the
dataset. Not used here, but required to mach format
of other `Dataloaders`. See :meth:`load_ses_zip` for
more info.
:param suppress_warnings: if :py:obj:`True`, suppress possible warning to
keep terminal clean
:return: loaded dataset with available metadata.
NOTE: method returns :class:`Dataset` loaded with the first
**Dataloader** that didn't raise any errors. Might be, that other
**Dataloader** can perform better, especially with regard to
loaded metadata.
"""
# List containing all reasonably defined dataloaders
all_dls = [
DataloaderPickle,
DataloaderSIS,
DataloaderBloch,
DataloaderADRESS,
DataloaderI05,
DataloaderCASSIOPEE,
DataloaderMERLIN,
]
# Sanity check: does the given path even exist in the filesystem?
if not os.path.exists(filename):
raise FileNotFoundError(ENOENT, os.strerror(ENOENT), filename)
# Suppress warnings
with catch_warnings():
if suppress_warnings:
simplefilter("ignore")
for dataloader in all_dls:
dl = dataloader()
# Try loading the data
try:
dataset = dl.load_data(filename, metadata=metadata)
except Exception:
continue
return dataset
[docs]
def dump(data: Dataset, filename: str, force: bool = False) -> None:
"""
Wrapper for :meth:`pickle.dump`, to save opened and modified
:class:`Dataset`.
:param data: dataset to save
:param filename: absolute path to the file,
:param force: if :py:obj:`True`, overwrite existing file without asking.
Default is :py:obj:`False`
"""
# Check if file already exists
if not force and os.path.isfile(filename):
question = "File <{}> exists. Overwrite it? (y/N)".format(filename)
answer = input(question)
# If the answer is anything but a clear affirmative, stop here
if answer.lower() not in ["y", "yes"]:
return
with open(filename, "wb") as f:
pickle.dump(data, f)
message = "Wrote to file <{}>.".format(filename)
print(message)
[docs]
def update_namespace(data: Dataset, *attributes: list) -> None:
"""
Add attributes to a :class:`Dataset`.
:param data: dataset object
:param attributes: list of tuples (*name*, *value*) pairs of the
attributes to add. Where *name* is a :py:obj:`str`
and value any python object.
"""
for name, attribute in attributes:
data.__dict__.update({name: attribute})