Source code for sigpyproc.FoldedData

import numpy as np
from os import popen
from sigpyproc.Utils import rollArray


[docs]class Profile(np.ndarray): """Class to handle a 1-D pulse profile. Parameters ---------- input_array : :py:obj:`numpy.ndarray` a pulse profile in array form Returns ------- :py:obj:`numpy.ndarray` pulse profile """ def __new__(cls, input_array): obj = input_array.astype("float32").view(cls) return obj def __array_finalize__(self, obj): if obj is None: return def _getWidth(self): self -= np.median(self) trial_widths = np.arange(1, self.size) convmaxs = np.array( [ np.convolve(np.ones(ii), self, mode="same").max() / np.sqrt(ii) for ii in trial_widths ] ) return trial_widths[convmaxs.argmax()] def _getPosition(self, width): return np.convolve(np.ones(width), self, mode="same").argmax() def _getBaseline(self, width): pos = self._getPosition(width) wing = np.ceil(width / 2.0) return np.hstack((self[: pos - wing], self[pos + wing + 1:]))
[docs] def SN(self): """Return a rudimentary signal-to-noise measure for the profile. Returns ------- float calculated signal-to-noise ratio Notes ----- This is a bare-bones, quick-n'-dirty algorithm that should not be used for high quality signal-to-noise measurements. """ tmp_ar = self.copy() width = self._getWidth() baseline = self._getBaseline(width) tmp_ar -= baseline.mean() tmp_ar /= baseline.std() return float(tmp_ar.sum() / np.sqrt(width))
[docs] def retroProf(self, height=0.7, width=0.7): """Display the profile in ASCII formay in the terminal window. Parameters ---------- height : float, optional fraction of terminal rows to use, by default 0.7 width : float, optional fraction of terminal columns to use, by default 0.7 Notes ----- This function requires a system call to the Linux/Unix ``stty`` command. """ rows, columns = popen("stty size", "r").read().split() rows = int(int(rows) * height) columns = int(int(columns) * width) bins = np.linspace(0, self.size - 1, columns) new_prof = np.interp(bins, np.arange(self.size), self) new_prof -= new_prof.min() new_prof /= new_prof.max() new_prof *= rows for ii in np.arange(rows)[::-1]: print("".join([("#" if val >= ii else " ") for val in new_prof]))
[docs]class FoldSlice(np.ndarray): """Class to handle a 2-D slice of a :class:`~sigpyproc.FoldedData.FoldedData` instance. Parameters ---------- input_array : :py:obj:`numpy.ndarray` a 2-D array with phase in x axis. Returns ------- :py:obj:`numpy.ndarray` 2-D array """ def __new__(cls, input_array): obj = input_array.astype("float32").view(cls) return obj def __array_finalize__(self, obj): if obj is None: return
[docs] def normalise(self): """Normalise the slice by dividing each row by its mean. Returns ------- :class:`~sigpyproc.FoldedData.FoldSlice` normalised version of slice """ return self / self.mean(axis=1).reshape(self.shape[0], 1)
[docs] def getProfile(self): """Return the pulse profile from the slice. Returns ------- :class:`~sigpyproc.FoldedData.Profile` a pulse profile """ return self.sum(axis=0).view(Profile)
[docs]class FoldedData(np.ndarray): """Class to handle a data cube produced by any of the sigpyproc folding methods. Parameters ---------- input_array : :py:obj:`numpy.ndarray` 3-D array of folded data header : :class:`~sigpyproc.Header.Header` observational metadata period : float period that data was folded with dm : float DM that data was folded with accel : float accleration that data was folded with, by default 0 Returns ------- :py:obj:`numpy.ndarray` 3-D array of folded data with header metadata Notes ----- Data cube should have the shape: (number of subintegrations, number of subbands, number of profile bins) """ def __new__(cls, input_array, header, period, dm, accel=0): obj = input_array.astype("float32").view(cls) obj.header = header obj.period = period obj.dm = dm obj.accel = accel obj._setDefaults() return obj def __array_finalize__(self, obj): if obj is None: return self.header = getattr(obj, "header", None) self.period = getattr(obj, "period", None) self.dm = getattr(obj, "dm", None) self.accel = getattr(obj, "accel", None) def _setDefaults(self): self.nints = self.shape[0] self.nbands = self.shape[1] self.nbins = self.shape[2] self._orig = self.copy() self._period = self.period self._dm = self.dm self._tph_shifts = np.zeros(self.nints, dtype="int32") self._fph_shifts = np.zeros(self.nbands, dtype="int32")
[docs] def getSubint(self, n): """Return a single subintegration from the data cube. Parameters ---------- n : int subintegration number (n=0 is first subintegration Returns ------- :class:`~sigpyproc.FoldedData.FoldSlice` a 2-D array containing the subintegration """ return self[n].view(FoldSlice)
[docs] def getSubband(self, n): """Return a single subband from the data cube. Parameters ---------- n : int subband number (n=0 is first subband) Returns ------- :class:`~sigpyproc.FoldedData.FoldSlice` a 2-D array containing the subband """ return self[:, n].view(FoldSlice)
[docs] def getProfile(self): """Return a the data cube summed in time and frequency. Returns ------- :class:`~sigpyproc.FoldedData.Profile` a 1-D array containing the power as a function of phase (pulse profile) """ return self.sum(axis=0).sum(axis=0).view(Profile)
[docs] def getTimePhase(self): """Return the data cube collapsed in frequency. Returns ------- :class:`~sigpyproc.FoldedData.FoldSlice` a 2-D array containing the time vs. phase plane """ return self.sum(axis=1).view(FoldSlice)
[docs] def getFreqPhase(self): """Return the data cube collapsed in time. Returns ------- :class:`~sigpyproc.FoldedData.FoldSlice` a 2-D array containing the frequency vs. phase plane """ return self.sum(axis=0).view(FoldSlice)
[docs] def centre(self): """Try and roll the data cube to center the pulse. """ p = self.getProfile() pos = p._getPosition(p._getWidth()) self = rollArray(self, (pos - self.nbins / 2), 2)
def _replaceNan(self): bad_ids = np.where(np.isnan(self)) good_ids = np.where(np.isfinite(self)) med = np.median(self[good_ids]) self[bad_ids] = med def _normalise(self): self.freqPhase /= self.freqPhase.mean(axis=1).reshape(self.nbands, 1) self.timePhase /= self.timePhase.mean(axis=1).reshape(self.nints, 1) def _getDMdelays(self, dm): delta_dm = dm - self._dm if delta_dm == 0: drifts = -1 * self._fph_shifts self._fph_shifts[:] = 0 return drifts else: chan_width = self.header.foff * self.header.nchans / self.nbands freqs = (np.arange(self.nbands) * chan_width) + self.header.fch1 fact = delta_dm * 4.148808e3 drifts = ( fact * ((freqs**-2) - (self.header.fch1**-2)) / ((self.period / self.nbins)) ) drifts = drifts.round().astype("int32") bin_drifts = drifts - self._fph_shifts self._fph_shifts = drifts return bin_drifts def _getPdelays(self, p): dbins = (p / self._period - 1) * self.header.tobs * self.nbins / self._period if dbins == 0: drifts = -1 * self._tph_shifts self._tph_shifts[:] = 0 return drifts else: drifts = np.round( np.arange(float(self.nints)) / (self.nints / dbins) ).astype("int32") bin_drifts = drifts - self._tph_shifts self._tph_shifts = drifts return bin_drifts
[docs] def updateParams(self, dm=None, period=None): """Install a new folding period and/or DM in the data cube. Parameters ---------- dm : float, optional the new DM to dedisperse to, by default None period : float, optional the new period to fold with, by default None """ if dm is None: dm = self.dm if period is None: period = self.period dmdelays = self._getDMdelays(dm) pdelays = self._getPdelays(period) for ii in range(self.nints): for jj in range(self.nbands): if dmdelays is not None: self[ii][jj] = rollArray(self[ii][jj], dmdelays[jj], 0) if pdelays is not None: self[ii][jj] = rollArray(self[ii][jj], pdelays[ii], 0) self.dm = dm self.period = period