import io
import numpy as np
import warnings
from sigpyproc.HeaderParams import nbits_to_dtype
import sigpyproc.libSigPyProc as lib
[docs]class File(io.FileIO):
"""A class to handle writing of arbitrary bit size data to file.
:param filename: name of file to open
:type filename: :func:`str`
:param mode: file access mode, can be either "r", "r+", "w" or "a".
:type mode: :func:`str`
:param nbits: the bit size of units to be read from or written to file
:type nbits:
.. note::
The File class handles all packing and unpacking of sub-byte size data
under the hood, so all calls can be made requesting numbers of units
rather than numbers of bits or bytes.
"""
def __init__(self, filename, mode, nbits=8):
super().__init__(filename, mode)
self.nbits = nbits
self.dtype = nbits_to_dtype[self.nbits]
if nbits in [1, 2, 4]:
self.bitfact = nbits / 8.0
self.unpack = True
else:
self.bitfact = 1
self.unpack = False
[docs] def cread(self, nunits):
"""Read nunits of data from the file.
:param nunits: number of units to be read from file
:type nunits: int
:return: an array containing the read data
:rtype: :class:`numpy.ndarray`
"""
count = int(nunits * self.bitfact)
data = np.fromfile(self, count=count, dtype=self.dtype)
if self.unpack:
unpacked = lib.unpack(data, self.nbits)
return unpacked
else:
return data
[docs] def cwrite(self, ar):
"""Write an array to file.
:param ar: a numpy array
:type ar: :class:`numpy.ndarray`
.. note::
Regardless of the dtype of the array argument, the data will be packed
with a bitsize determined by the nbits attribute of the File instance.
To change this attribute, use the _setNbits methods.
It is the responsibility of the user to ensure that values in the array
do not go beyond the maximum and minimum values allowed by the nbits
attribute.
"""
if self.dtype != ar.dtype:
warnings.warn(
f"Given data (dtype={ar.dtype}) will be unsafely cast to the"
f"requested dtype={self.dtype} before being written out to file",
stacklevel=2
)
ar = ar.astype(self.dtype, casting="unsafe")
# The lib.pack function has an assumption that the given array has 8-bit
# data. If the given array was, say 32-bit floats and the requested nbits
# is, say 2-bit, then the output will be garbage, hence the casting above is
# necessary.
if self.unpack:
packed = lib.pack(ar, self.nbits)
packed.tofile(self)
else:
ar.tofile(self)
def __del__(self):
self.close()
[docs]def rollArray(y, shift, axis):
"""Roll the elements in the array by 'shift' positions along the
given axis.
Args:
y -- array to roll
shift -- number of bins to shift by
axis -- axis to roll along
Returns: shifted Ndarray
"""
y = np.asanyarray(y)
n = y.shape[axis]
shift %= n
return y.take(np.concatenate((np.arange(shift, n), np.arange(shift))), axis)
def _flattenList(n):
new = []
repack = lambda x: [new.append(int(y)) for y in x]
for elem in n:
if hasattr(elem, "__iter__"):
repack(elem)
else:
new.append(int(elem))
return new
[docs]def stackRecarrays(arrays):
"""Wrapper for stacking :class:`numpy.recarrays`"""
return arrays[0].__array_wrap__(np.hstack(arrays))
[docs]def nearestFactor(n, val):
"""Find nearest factor.
:param n: number that we wish to factor
:type n: int
:param val: number that we wish to find nearest factor to
:type val: int
:return: nearest factor
:rtype: int
"""
fact = [1, n]
check = 2
rootn = np.sqrt(n)
while check < rootn:
if n % check == 0:
fact.append(check)
fact.append(n // check)
check += 1
if rootn == check:
fact.append(check)
fact.sort()
return fact[np.abs(np.array(fact) - val).argmin()]
[docs]def editInplace(inst, key, value):
"""Edit a sigproc style header in place
:param inst: a header instance with a ``.filename`` attribute
:type inst: :class:`~sigpyproc.Header.Header`
:param key: name of parameter to change (must be a valid sigproc key)
:type key: :func:`str`
:param value: new value to enter into header
.. note::
It is up to the user to be responsible with this function, as it will directly
change the file on which it is being operated. The only fail contition of
editInplace comes when the new header to be written to file is longer or shorter than the
header that was previously in the file.
"""
temp = File(inst.header.filename, "r+")
if key == "source_name":
oldlen = len(inst.header.source_name)
value = value[:oldlen] + " " * (oldlen - len(value))
inst.header[key] = value
new_header = inst.header.SPPHeader(back_compatible=True)
if inst.header.hdrlen != len(new_header):
raise ValueError("New header is too long/short for file")
else:
temp.seek(0)
temp.write(new_header)