import contextlib
import os
import numpy as np
from netCDF4 import Dataset, chartostring, stringtochar
import warnings
import pathlib
from packaging.version import parse
import sofar as sf
from .utils import _verify_convention_and_version, _atleast_nd
[docs]
def read_sofa(filename, verify='auto', verbose=True):
"""
Read SOFA file from disk and convert it to SOFA object.
Numeric data is returned as floats or numpy float arrays unless they have
missing data, in which case they are returned as numpy masked arrays.
If you want to read only parts of the data of a large sofa file use
:class:`SofaStream` instead.
Parameters
----------
filename : str
The full path to the sofa data.
verify : bool, optional
Verify and update the SOFA object by calling :py:func:`~Sofa.verify`.
This helps to find potential errors in the default values and is thus
recommended. If reading a file does not work, try to call `Sofa` with
``verify=False``. The default is ``'auto'`` defaults to ``True`` for
stable conventions with versions of 1.0 or higher and to ``False``
otherwise.
verbose : bool, optional
Print the names of detected custom variables and attributes. The
default is ``True``
Returns
-------
sofa : Sofa
Object containing the data from `filename`.
Notes
-----
1. Missing dimensions are appended when writing the SOFA object to disk.
E.g., if ``sofa.Data_IR`` is of shape (1, 2) it is written as an array
of shape (1, 2, 1) because the SOFA standard AES69 defines it as a
three dimensional array with the dimensions (`M: measurements`,
`R: receivers`, `N: samples`)
2. When reading data from a SOFA file, array data is always returned as
numpy arrays and singleton trailing dimensions are discarded (numpy
default). I.e., ``sofa.Data_IR`` will again be an array of shape (1, 2)
after writing and reading to and from disk.
3. One dimensional arrays with only one element will be converted to scalar
values. E.g. ``sofa.Data_SamplingRate`` is stored as an array of shape
(1, ) inside SOFA files (according to the SOFA standard AES69) but
will be a scalar inside SOFA objects after reading from disk.
"""
return _read_netcdf(filename, verify, verbose, mode="sofa")
[docs]
def read_sofa_as_netcdf(filename):
"""
Read corrupted SOFA data from disk.
.. note::
`read_sofa_as_netcdf` is intended to read and fix corrupted SOFA data
that could not be read by :py:func:`~read_sofa`. The recommend workflow
is
- Try to read the data with `read_sofa` and ``verify=True``
- If this fails, try the above with ``verify=False``
- If this fails, use `read_sofa_as_netcdf`
The SOFA object returned by `read_sofa_as_netcdf` may not work
correctly before the issues with the data were fixed, i.e., before the
data are in agreement with the SOFA standard AES-69.
Numeric data is returned as floats or numpy float arrays unless they have
missing data, in which case they are returned as numpy masked arrays.
Parameters
----------
filename : str
The full path to the NetCDF data.
Returns
-------
sofa : Sofa
Object containing the data from `filename`.
Notes
-----
1. Missing dimensions are appended when writing the SOFA object to disk.
E.g., if ``sofa.Data_IR`` is of shape (1, 2) it is written as an array
of shape (1, 2, 1) because the SOFA standard AES69 defines it as a
three dimensional array with the dimensions (`M: measurements`,
`R: receivers`, `N: samples`)
2. When reading data from a SOFA file, array data is always returned as
numpy arrays and singleton trailing dimensions are discarded (numpy
default). I.e., ``sofa.Data_IR`` will again be an array of shape (1, 2)
after writing and reading to and from disk.
3. One dimensional arrays with only one element will be converted to scalar
values. E.g. ``sofa.Data_SamplingRate`` is stored as an array of shape
(1, ) inside SOFA files (according to the SOFA standard AES69) but
will be a scalar inside SOFA objects after reading from disk.
"""
return _read_netcdf(filename, False, False, mode="netcdf")
def _read_netcdf(filename, verify, verbose, mode):
# check the filename
filename = pathlib.Path(filename).with_suffix('.sofa')
if not os.path.isfile(filename):
raise ValueError(f"{filename} does not exist")
# attributes that are skipped
skip = ["_Encoding"]
# init list of all and custom attributes
all_attr = []
custom = []
# open new NETCDF4 file for reading
with Dataset(filename, "r", format="NETCDF4") as file:
if mode == "sofa":
# get convention name and version
convention = getattr(file, "SOFAConventions")
version = getattr(file, "SOFAConventionsVersion")
# check if convention and version exist
_verify_convention_and_version(version, convention)
# get SOFA object with default values
sofa = sf.Sofa(convention, version=version, verify=verify)
else:
sofa = sf.Sofa(None)
# allow writing read only attributes
sofa.protected = False
# load global attributes
for attr in file.ncattrs():
value = getattr(file, attr)
all_attr.append(f"GLOBAL_{attr}")
if not hasattr(sofa, f"GLOBAL_{attr}"):
sofa._add_custom_api_entry(
f"GLOBAL_{attr}", value, None, None, "attribute")
custom.append(f"GLOBAL_{attr}")
sofa.protected = False
else:
setattr(sofa, f"GLOBAL_{attr}", value)
# load data
for var in file.variables.keys():
value = _format_value_from_netcdf(file[var][:], var)
all_attr.append(var.replace(".", "_"))
if hasattr(sofa, var.replace(".", "_")):
setattr(sofa, var.replace(".", "_"), value)
else:
dimensions = "".join(list(file[var].dimensions))
# SOFA only uses dtypes 'double' and 'S1' but netCDF has more
dtype = "string" if file[var].datatype == "S1" else "double"
sofa._add_custom_api_entry(var.replace(".", "_"), value, None,
dimensions, dtype)
custom.append(var.replace(".", "_"))
sofa.protected = False
# load variable attributes
for attr in [a for a in file[var].ncattrs() if a not in skip]:
value = getattr(file[var], attr)
all_attr.append(var.replace(".", "_") + "_" + attr)
if not hasattr(sofa, var.replace(".", "_") + "_" + attr):
sofa._add_custom_api_entry(
var.replace(".", "_") + "_" + attr, value, None,
None, "attribute")
custom.append(var.replace(".", "_") + "_" + attr)
sofa.protected = False
else:
setattr(sofa, var.replace(".", "_") + "_" + attr, value)
# remove fields from initial Sofa object that were not contained in NetCDF
# file (initial Sofa object contained mandatory and optional fields)
attrs = [attr for attr in sofa.__dict__.keys() if not attr.startswith("_")]
for attr in attrs:
if attr not in all_attr:
delattr(sofa, attr)
# do not allow writing read only attributes any more
sofa.protected = True
# notice about custom entries
if custom and verbose:
print(("SOFA file contained custom entries\n"
"----------------------------------\n"
f"{', '.join(custom)}"))
# set default for verify
if verify == 'auto':
verify = True if parse(version) >= parse('1.0') else False
# update api
if verify:
try:
sofa.verify(mode="read")
except: # noqa (No error handling - just improved verbosity)
raise ValueError((
"The SOFA object could not be verified, maybe due to erroneous"
" data. Call sofa=sofar.read_sofa(filename, verify=False) and "
"then sofa.verify() to get more information"))
return sofa
[docs]
def write_sofa(filename: str, sofa: sf.Sofa, compression=4):
"""
Write a SOFA object to disk as a SOFA file.
Parameters
----------
filename : str
The filename. '.sofa' is appended to the filename, if it is not
explicitly given.
sofa : object
The SOFA object that is written to disk
compression : int
The level of compression with ``0`` being no compression and ``9``
being the best compression. The default of ``9`` optimizes the file
size but increases the time for writing files to disk.
Notes
-----
1. Missing dimensions are appended when writing the SOFA object to disk.
E.g., if ``sofa.Data_IR`` is of shape (1, 2) it is written as an array
of shape (1, 2, 1) because the SOFA standard AES69 defines it as a
three dimensional array with the dimensions (`M: measurements`,
`R: receivers`, `N: samples`)
2. When reading data from a SOFA file, array data is always returned as
numpy arrays and singleton trailing dimensions are discarded (numpy
default). I.e., ``sofa.Data_IR`` will again be an array of shape (1, 2)
after writing and reading to and from disk.
3. One dimensional arrays with only one element will be converted to scalar
values. E.g. ``sofa.Data_SamplingRate`` is stored as an array of shape
(1, ) inside SOFA files (according to the SOFA standard AES69) but
will be a scalar inside SOFA objects after reading from disk.
"""
_write_sofa(filename, sofa, compression, verify=True)
def _write_sofa(filename: str, sofa: sf.Sofa, compression=4, verify=True):
"""
Private write function for writing invalid SOFA files for testing. See
write_sofa for documentation.
"""
# check the filename
filename = pathlib.Path(filename).with_suffix('.sofa')
if verify:
# check if the latest version is used for writing and warn otherwise
# if case required for writing SOFA test data that violates the
# conventions
if sofa.GLOBAL_SOFAConventions != "invalid-value":
latest = sf.Sofa(sofa.GLOBAL_SOFAConventions)
latest = latest.GLOBAL_SOFAConventionsVersion
current = sofa.GLOBAL_SOFAConventionsVersion
if parse(current) < parse(latest):
warnings.warn((
"Writing SOFA object with outdated Convention "
f"version {current}. It is recommend to upgrade "
" data with Sofa.upgrade_convention() before "
"writing to disk if possible."))
# setting the netCDF compression parameter
use_zlib = compression != 0
# update the dimensions
if verify:
sofa.verify(mode="write")
# list of all attribute names
all_keys = [key for key in sofa.__dict__.keys() if not key.startswith("_")]
# open new NETCDF4 file for writing
with Dataset(filename, "w", format="NETCDF4") as file:
# write dimensions
for dim in sofa._api:
file.createDimension(dim, sofa._api[dim])
# write global attributes
keys = [key for key in all_keys if key.startswith("GLOBAL_")]
for key in keys:
setattr(file, key[7:], str(getattr(sofa, key)))
# write data
for key in all_keys:
# skip attributes
# Note: This definition of attribute is blurry:
# lax definition:
# sofa._convention[key]["type"] == "attribute":
# strict definition:
# ("_" in key and not key.startswith("Data_")) or \
# key.count("_") > 1
#
# The strict definition is implicitly included in the SOFA standard
# since underscores only occur for variables starting with Data_
if sofa._convention[key]["type"] == "attribute":
continue
# get the data and type and shape
value, dtype = _format_value_for_netcdf(
getattr(sofa, key), key, sofa._convention[key]["type"],
sofa._dimensions[key], sofa._api["S"])
# create variable and write data
shape = tuple(list(sofa._dimensions[key]))
tmp_var = file.createVariable(
key.replace("Data_", "Data."), dtype, shape,
zlib=use_zlib, complevel=compression)
if dtype == "f8":
tmp_var[:] = value
else:
tmp_var[:] = stringtochar(value, encoding='utf-8')
# write variable attributes
sub_keys = [k for k in all_keys if k.startswith(f"{key}_")]
for sub_key in sub_keys:
setattr(tmp_var, sub_key[len(key)+1:],
str(getattr(sofa, sub_key)))
def _format_value_for_netcdf(value, key, dtype, dimensions, S):
"""
Format value from SOFA object for saving in a NETCDF4 file.
Parameters
----------
value : str, array like
The value to be formatted
key : str
The name of the current attribute. Needed for verbose errors.
dtype : str
The the data type of value
dimensions : str
The intended dimensions from ``sofa.dimensions``
S : int
Length of the string array.
Returns
-------
value : str, numpy array
The formatted value.
netcdf_dtype : str
The data type as a string for writing to a NETCDF4 file ('attribute',
'f8', or 'S1').
"""
# copy value
with contextlib.suppress(AttributeError):
value = value.copy()
# parse data
if dtype == "attribute":
value = str(value)
netcdf_dtype = "attribute"
elif dtype == "double":
value = _atleast_nd(value, len(dimensions))
netcdf_dtype = "f8"
elif dtype == "string":
value = np.array(value, dtype=f"S{str(S)}")
value = _atleast_nd(value, len(dimensions))
netcdf_dtype = 'S1'
else:
raise ValueError(f"Unknown type {dtype} for {key}")
return value, netcdf_dtype
def _format_value_from_netcdf(value, key):
"""
Format value from NETCDF4 file for saving in a SOFA object
Parameters
----------
value : np.array of dtype float or S
The value to be formatted
key : str
The variable name of the current value. Needed for verbose errors.
Returns
-------
value : str, number, numpy array
The formatted value.
"""
if "float" in str(value.dtype) or "int" in str(value.dtype):
if np.ma.is_masked(value):
warnings.warn(f"Entry {key} contains missing data")
else:
# Convert to numpy array or scalar
value = np.asarray(value)
elif str(value.dtype)[1] in ["S", "U"]:
# string arrays are stored in masked arrays with empty strings '' being
# masked. Convert to regular arrays with unmasked empty strings
if str(value.dtype)[1] == "S":
value = chartostring(value, encoding="ascii")
value = np.atleast_1d(value).astype("U")
else:
raise TypeError(
f"{key}: value.dtype is {value.dtype} but must be float, S or, U")
# convert arrays to scalars if they do not store data that is usually used
# as scalar metadata, e.g., the SamplingRate
data_keys = ["Data_IR", "Data_Real", "Data_Imag", "Data_SOS" "Data_Delay"]
if value.size == 1 and key not in data_keys:
value = value[0]
return value