Source code for sigpyproc.Filterbank

import numpy as np

from sigpyproc.Utils import rollArray
from sigpyproc.FoldedData import FoldedData
from sigpyproc.TimeSeries import TimeSeries
import sigpyproc.libSigPyProc as lib


[docs]class Filterbank(object): """New Class exporting methods for the manipulation of frequency-major order pulsar data. Notes ----- The Filterbank class should never be instantiated directly. Instead it should be inherited by data reading classes. """ def __init__(self): self.chan_means = None self.chan_vars = None self.chan_stdevs = None self.chan_skews = None self.chan_kurts = None self.chan_maxima = None self.chan_minima = None
[docs] def setNthreads(self, nthreads=None): """Set the number of threads available to OpenMP. Parameters ---------- nthreads : int, optional number of threads to use, by default 4 """ if nthreads is None: nthreads = 4 lib._omp_set_num_threads(nthreads)
[docs] def collapse(self, gulp=512, start=0, nsamps=None, **kwargs): """Sum across all frequencies for each time sample. Parameters ---------- gulp : int, optional number of samples in each read, by default 512 start : int, optional start sample, by default 0 nsamps : int, optional number of samples to read, by default all Returns ------- :class:`~sigpyproc.TimeSeries.TimeSeries` A zero-DM time series """ if nsamps is None: size = self.header.nsamples - start else: size = nsamps timar = np.zeros(size, dtype="float32") for nsamps, ii, data in self.readPlan( gulp, start=start, nsamps=nsamps, **kwargs ): lib.getTim(data, timar, self.header.nchans, nsamps, ii * gulp) return TimeSeries(timar, self.header.newHeader({"nchans": 1, "refdm": 0.0}))
[docs] def invertFreq(self, gulp=512, start=0, nsamps=None, filename=None, back_compatible=True, **kwargs): """Invert the frequency ordering of the data and write new data to a new file. Parameters ---------- gulp : int, optional number of samples in each read, by default 512 start : int, optional start sample, by default 0 nsamps : int, optional number of samples to read, by default all filename : str, optional name of output file, by default ``basename_inverted.fil`` back_compatible : bool, optional sigproc compatibility flag (legacy code), by default True Returns ------- str name of output file """ if filename is None: filename = f"{self.header.basename}_inverted.fil" if nsamps is None: size = self.header.nsamples - start else: size = nsamps out_ar = np.empty(size * self.header.nchans, dtype=self.header.dtype) sign = 1.0 if self.header.foff >= 0.0 else -1.0 changes = { "fch1": self.header.fch1 + sign * (self.header.nchans - 1) * self.header.foff, "foff": self.header.foff * sign * (-1.0) } # NB bandwidth is +ive by default out_file = self.header.prepOutfile( filename, changes, nbits=self.header.nbits, back_compatible=back_compatible ) for nsamps, ii, data in self.readPlan( gulp, start=start, nsamps=nsamps, **kwargs ): lib.invertFreq(data, out_ar, self.header.nchans, nsamps) out_file.cwrite(out_ar[: nsamps * self.header.nchans]) out_file.close() return out_file.name
[docs] def bandpass(self, gulp=512, **kwargs): """Average across each time sample for all frequencies. Parameters ---------- gulp : int, optional number of samples in each read, by default 512 Returns ------- :class:`~sigpyproc.TimeSeries.TimeSeries` the bandpass of the data """ bpass_ar = np.zeros(self.header.nchans, dtype="float64") num_samples = 0 for nsamps, ii, data in self.readPlan(gulp, **kwargs): lib.getBpass(data, bpass_ar, self.header.nchans, nsamps) num_samples += nsamps bpass_ar = bpass_ar / num_samples return TimeSeries(bpass_ar, self.header.newHeader({"nchans": 1}))
[docs] def dedisperse(self, dm, gulp=10000, **kwargs): """Dedisperse the data to a time series. Parameters ---------- dm : float dispersion measure to dedisperse to gulp : int, optional number of samples in each read, by default 10000 Returns ------- :class:`~sigpyproc.TimeSeries.TimeSeries` a dedispersed time series Notes ----- If gulp < maximum dispersion delay, gulp is taken to be twice the maximum dispersion delay. """ chan_delays = self.header.getDMdelays(dm) max_delay = int(chan_delays.max()) gulp = max(2 * max_delay, gulp) tim_len = self.header.nsamples - max_delay tim_ar = np.zeros(tim_len, dtype="float32") for nsamps, ii, data in self.readPlan(gulp, skipback=max_delay, **kwargs): lib.dedisperse( data, tim_ar, chan_delays, max_delay, self.header.nchans, nsamps, ii * (gulp - max_delay), ) return TimeSeries(tim_ar, self.header.newHeader({"nchans": 1, "refdm": dm}))
[docs] def subband(self, dm, nsub, filename=None, gulp=10000, **kwargs): """Produce a set of dedispersed subbands from the data. Parameters ---------- dm : float the DM of the subbands nsub : int the number of subbands to produce filename : str, optional output file name of subbands, by default ``basename_DM.subbands`` gulp : int, optional number of samples in each read, by default 10000 Returns ------- str name of output subbands file """ subfactor = self.header.nchans // nsub chan_delays = self.header.getDMdelays(dm) max_delay = int(chan_delays.max()) gulp = max(2 * max_delay, gulp) # must be memset to zero in c code out_ar = np.empty((gulp - max_delay) * nsub, dtype="float32") new_foff = self.header.foff * self.header.nchans // nsub new_fch1 = self.header.ftop - new_foff / 2. chan_to_sub = np.arange(self.header.nchans, dtype="int32") // subfactor changes = { "fch1": new_fch1, "foff": new_foff, "refdm": dm, "nchans": nsub, "nbits": 32 } if filename is None: filename = f"{self.header.basename}_DM{dm:06.2f}.subbands" out_file = self.header.prepOutfile( filename, changes, nbits=32, back_compatible=True ) for nsamps, ii, data in self.readPlan(gulp, skipback=max_delay, **kwargs): lib.subband( data, out_ar, chan_delays, chan_to_sub, max_delay, self.header.nchan, nsub, nsamps, ) out_file.cwrite(out_ar[: (nsamps - max_delay) * nsub]) return filename
[docs] def upTo8bit(self, filename=None, gulp=512, back_compatible=True, **kwargs): """Convert 1-,2- or 4-bit data to 8-bit data and write to file. Parameters ---------- filename : str, optional name of file to write to, by default ``basename_8bit.fil`` gulp : int, optional number of samples in each read, by default 512 back_compatible : bool, optional sigproc compatibility flag, by default True Returns ------- str name of output file """ if filename is None: filename = f"{self.header.basename}_8bit.fil" out_file = self.header.prepOutfile( filename, {"nbits": 8}, nbits=8, back_compatible=back_compatible ) for nsamps, ii, data in self.readPlan(gulp, **kwargs): out_file.cwrite(data) return out_file.name
[docs] def applyChannelMask(self, chanmask, outfilename=None, gulp=512, back_compatible=True, **kwargs): """Set the data in the given channels to zero. Parameters ---------- chanmask : list binary channel mask (0 for bad channel, 1 for good) outfilename : str, optional name of the output filterbank file, by default ``basename_masked.fil`` gulp : int, optional number of samples in each read, by default 512 back_compatible : bool, optional sigproc compatibility flag, by default True Returns ------- str name of output file """ if outfilename is None: outfilename = f"{self.header.basename}_masked.fil" mask = np.array(chanmask).astype("ubyte") out_file = self.header.prepOutfile(outfilename, back_compatible=back_compatible) for nsamps, ii, data in self.readPlan(gulp, **kwargs): lib.maskChannels(data, mask, self.header.nchans, nsamps) out_file.cwrite(data) return out_file.name
[docs] def downsample(self, tfactor=1, ffactor=1, gulp=512, filename=None, back_compatible=True, **kwargs): """Downsample data in time and/or frequency and write to file. Parameters ---------- tfactor : int, optional factor by which to downsample in time, by default 1 ffactor : int, optional factor by which to downsample in frequency, by default 1 gulp : int, optional number of samples in each read, by default 512 filename : str, optional name of file to write to, by default ``basename_tfactor_ffactor.fil`` back_compatible : bool, optional sigproc compatibility flag (legacy code), by default True Returns ------- str output file name Raises ------ ValueError If number of channels is not divisible by `ffactor`. """ if filename is None: filename = f"{self.header.basename}_f{ffactor:d}_t{tfactor:d}.fil" if not self.header.nchans % ffactor == 0: raise ValueError("Bad frequency factor given") # Gulp must be a multiple of tfactor gulp = int(np.ceil(gulp / tfactor) * tfactor) out_file = self.header.prepOutfile( filename, { "tsamp": self.header.tsamp * tfactor, "nchans": self.header.nchans // ffactor, "foff": self.header.foff * ffactor }, back_compatible=back_compatible ) write_ar = np.zeros( gulp * self.header.nchans // ffactor // tfactor, dtype=self.header.dtype ) for nsamps, ii, data in self.readPlan(gulp, **kwargs): lib.downsample(data, write_ar, tfactor, ffactor, self.header.nchans, nsamps) out_file.cwrite( write_ar[:nsamps * self.header.nchans // ffactor // tfactor] ) return out_file.name
[docs] def fold(self, period, dm, accel=0, nbins=50, nints=32, nbands=32, gulp=10000, **kwargs): """Fold data into discrete phase, subintegration and subband bins. Parameters ---------- period : float period in seconds to fold with dm : float dispersion measure to dedisperse to accel : float, optional acceleration in m/s/s to fold with, by default 0 nbins : int, optional number of phase bins in output, by default 50 nints : int, optional number of subintegrations in output, by default 32 nbands : int, optional number of subbands in output, by default 32 gulp : int, optional number of samples in each read, by default 10000 Returns ------- :class:`~sigpyproc.FoldedData.FoldedData` 3 dimensional data cube Raises ------ ValueError If `nbands * nints * nbins` is too large Notes ----- If gulp < maximum dispersion delay, gulp is taken to be twice the maximum dispersion delay. """ if np.modf(period / self.header.tsamp)[0] < 0.001: print("WARNING: Foldng interval is an integer multiple of the sampling time") if nbins > period / self.header.tsamp: print("WARNING: Number of phase bins is greater than period/sampling time") if (self.header.nsamples * self.header.nchans) // (nbands * nints * nbins) < 10: raise ValueError("nbands x nints x nbins is too large.") nbands = min(nbands, self.header.nchans) chan_delays = self.header.getDMdelays(dm) max_delay = int(chan_delays.max()) gulp = max(2 * max_delay, gulp) fold_ar = np.zeros(nbins * nints * nbands, dtype="float32") count_ar = np.zeros(nbins * nints * nbands, dtype="int32") for nsamps, ii, data in self.readPlan(gulp, skipback=max_delay, **kwargs): lib.foldFil( data, fold_ar, count_ar, chan_delays, max_delay, self.header.tsamp, period, accel, self.header.nsamples, nsamps, self.header.nchans, nbins, nints, nbands, ii * (gulp - max_delay), ) fold_ar /= count_ar fold_ar = fold_ar.reshape(nints, nbands, nbins) return FoldedData(fold_ar, self.header.newHeader(), period, dm, accel)
[docs] def getChan(self, chan, gulp=512, **kwargs): """Retrieve a single frequency channel from the data. Parameters ---------- chan : int channel to retrieve (0 is the highest frequency channel) gulp : int, optional number of samples in each read, by default 512 Returns ------- :class:`~sigpyproc.TimeSeries.TimeSeries` selected channel as a time series Raises ------ ValueError If chan is out of range (chan < 0 or chan > total channels). """ if chan >= self.header.nchans or chan < 0: raise ValueError("Selected channel out of range.") tim_ar = np.empty(self.header.nsamples, dtype="float32") for nsamps, ii, data in self.readPlan(gulp, **kwargs): lib.getChan(data, tim_ar, chan, self.header.nchans, nsamps, ii * gulp) return TimeSeries( tim_ar, self.header.newHeader({"channel": chan, "refdm": 0.0, "nchans": 1}) )
[docs] def split(self, start, nsamps, filename=None, gulp=1024, back_compatible=True, **kwargs): """Split data in time. Parameters ---------- start : int start sample of split nsamps : int number of samples in split filename : str, optional name of output file, by default ``basename_start_start+nsamps.fil`` gulp : int, optional number of samples in each read, by default 1024 back_compatible : bool, optional sigproc compatibility flag (legacy code), by default True Returns ------- str name of new file """ if filename is None: filename = f"{self.header.basename}_{start:d}_{start+nsamps:d}.fil" new_tstart = self.header.tstart + ((self.header.tsamp * start) / 86400.0) out_file = self.header.prepOutfile( filename, updates={'tstart': new_tstart}, nbits=self.header.nbits ) for count, ii, data in self.readPlan( gulp, start=start, nsamps=nsamps, **kwargs ): out_file.cwrite(data) out_file.close() return out_file.name
[docs] def splitToChans(self, gulp=1024, back_compatible=True, **kwargs): """Split the data into component channels and write each to file. Parameters ---------- gulp : int, optional number of samples in each read, by default 1024 back_compatible : bool, optional sigproc compatibility flag (legacy code), by default True Returns ------- list of str names of all files written to disk Notes ----- Time series are written to disk with names based on channel number. """ tim_ar = np.empty([self.header.nchans, gulp], dtype="float32") out_files = [ self.header.prepOutfile( f"{self.header.basename}_chan{ii:04d}.tim", {"nchans": 1, "nbits": 32, "data_type": 2}, back_compatible=back_compatible, nbits=32, ) for ii in range(self.header.nchans) ] for nsamps, ii, data in self.readPlan(gulp, **kwargs): lib.splitToChans(data, tim_ar, self.header.nchans, nsamps, gulp) for ii, f in enumerate(out_files): f.cwrite(tim_ar[ii][:nsamps]) for f in out_files: f.close() return [f.name for f in out_files]
[docs] def splitToBands(self, chanpersub, chanstart=0, gulp=1024, back_compatible=True, **kwargs): """Split the data into component Sub-bands and write to disk. Parameters ---------- chanpersub : int number of channels in each sub-band chanstart : int, optional start channel of split, by default 0 gulp : int, optional number of samples in each read, by default 1024 back_compatible : bool, optional sigproc compatibility flag (legacy code), by default True Returns ------- list of str names of all files written to disk Notes ----- Filterbanks are written to disk with names based on sub-band number. """ # TODO: C version is too slow. Need to fix nsub = (self.header.nchans - chanstart) // chanpersub fstart = self.header.fch1 + chanstart * self.header.foff out_files = [ self.header.prepOutfile( f"{self.header.basename}_sub{ii:02d}.fil", { "nchans": chanpersub, "fch1": fstart + ii * chanpersub * self.header.foff }, back_compatible=back_compatible, nbits=self.header.nbits, ) for ii in range(nsub) ] # subband_ar = np.empty([gulp*chanpersub, nsub], dtype=self.header.dtype) for nsamps, ii, data in self.readPlan(gulp, **kwargs): """ lib.splitToBands( data, subband_ar_c, self.header.nchans, nsamps, nsub, chanpersub, chanstart ) """ for ii, out_file in enumerate(out_files): data = data.reshape(nsamps, self.header.nchans) subband_ar = data[ :, chanstart + chanpersub * ii:chanstart + chanpersub * (ii + 1) ] out_file.cwrite(subband_ar.ravel()) # out_file.cwrite(subband_ar[:nsamps*chanpersub, ii]) for out_file in out_files: out_file.close() return [out_file.name for out_file in out_files]
[docs] def getStats(self, gulp=512, **kwargs): """Retrieve channelwise statistics of data. Parameters ---------- gulp : int, optional number of samples in each read, by default 512 Function creates following instance attributes: * :attr:`chan_means`: the mean value of each channel * :attr:`chan_vars`: the variance of each channel * :attr:`chan_stdevs`: the standard deviation of each channel * :attr:`chan_skews`: the skewness of each channel * :attr:`chan_kurts`: the kurtosis of each channel * :attr:`chan_max`: the maximum value of each channel * :attr:`chan_min`: the minimum value of each channel """ maxima_ar = np.zeros(self.header.nchans, dtype="float32") minima_ar = np.zeros(self.header.nchans, dtype="float32") count_ar = np.zeros(self.header.nchans, dtype="int64") M1_ar = np.zeros(self.header.nchans, dtype="float32") M2_ar = np.zeros(self.header.nchans, dtype="float32") M3_ar = np.zeros(self.header.nchans, dtype="float32") M4_ar = np.zeros(self.header.nchans, dtype="float32") for nsamps, ii, data in self.readPlan(gulp, **kwargs): lib.getStats( data, M1_ar, M2_ar, M3_ar, M4_ar, maxima_ar, minima_ar, count_ar, self.header.nchans, nsamps, ii, ) means_ar = M1_ar var_ar = M2_ar / self.header.nsamples stdev_ar = np.sqrt(var_ar) M2_ar[M2_ar == 0] = np.nan skew_ar = M3_ar / np.power(M2_ar, 1.5) * np.sqrt(self.header.nsamples) kurt_ar = M4_ar / np.power(M2_ar, 2.0) * self.header.nsamples - 3.0 stdev_ar[np.isnan(stdev_ar)] = 0 skew_ar[np.isnan(skew_ar)] = 0 kurt_ar[np.isnan(kurt_ar)] = -3.0 self.chan_means = means_ar.astype('float32') self.chan_vars = var_ar.astype('float32') self.chan_stdevs = stdev_ar.astype('float32') self.chan_skews = skew_ar.astype('float32') self.chan_kurts = kurt_ar.astype('float32') self.chan_maxima = maxima_ar self.chan_minima = minima_ar
[docs] def removeBandpass(self, gulp=512, filename=None, back_compatible=True, **kwargs): """Remove the bandpass from the data and write new data to a new file. Parameters ---------- gulp : int, optional number of samples in each read, by default 512 filename : str, optional name of output file, by default ``basename_bpcorr.fil`` back_compatible : bool, optional sigproc compatibility flag (legacy code), by default True Returns ------- str name of output file Raises ------ TypeError If `nbits` of input file is < 8. """ if self.header.nbits < 8: raise TypeError(f"{self.header.nbits}-bit filterbank not supported yet!") if filename is None: filename = f"{self.header.basename}_bpcorr.fil" if self.chan_stdevs is None: self.getStats(gulp=gulp, **kwargs) out_ar = np.empty( self.header.nsamples * self.header.nchans, dtype=self.header.dtype ) out_file = self.header.prepOutfile( filename, nbits=self.header.nbits, back_compatible=back_compatible ) for nsamps, ii, data in self.readPlan(gulp, **kwargs): lib.removeBandpass( data, out_ar, self.chan_means, self.chan_stdevs, self.header.nchans, nsamps, ) out_file.cwrite(out_ar[: nsamps * self.header.nchans]) out_file.close() return out_file.name
[docs] def removeZeroDM(self, gulp=512, filename=None, back_compatible=True, **kwargs): """Remove the channel-weighted zero-DM from the data and write to disk. Parameters ---------- gulp : int, optional number of samples in each read, by default 512 filename : str, optional name of output file , by default ``basename_noZeroDM.fil`` back_compatible : bool, optional sigproc compatibility flag (legacy code), by default True Returns ------- str name of output file Notes ----- Based on Presto implementation of Eatough, Keane & Lyne 2009 """ if filename is None: filename = f"{self.header.basename}_noZeroDM.fil" bpass = self.bandpass(gulp=gulp, **kwargs) chanwts = bpass / bpass.sum() out_ar = np.empty( self.header.nsamples * self.header.nchans, dtype=self.header.dtype ) out_file = self.header.prepOutfile( filename, nbits=self.header.nbits, back_compatible=back_compatible ) for nsamps, ii, data in self.readPlan(gulp, **kwargs): lib.removeZeroDM(data, out_ar, bpass, chanwts, self.header.nchans, nsamps) out_file.cwrite(out_ar[: nsamps * self.header.nchans]) out_file.close() return out_file.name
[docs]class FilterbankBlock(np.ndarray): """Class to handle a discrete block of data in time-major order. Parameters ---------- input_array : :py:obj:`numpy.ndarray` 2 dimensional array of shape (nchans, nsamples) header : :class:`~sigpyproc.Header.Header` observational metadata Returns ------- :py:obj:`numpy.ndarray` 2 dimensional array of shape (nchans, nsamples) with header metadata Notes ----- Data is converted to 32 bits regardless of original type. """ def __new__(cls, input_array, header): obj = input_array.astype("float32").view(cls) obj.header = header obj.dm = 0.0 return obj def __array_finalize__(self, obj): if obj is None: return if hasattr(obj, "header"): self.header = obj.header self.dm = getattr(obj, "dm", 0.0)
[docs] def downsample(self, tfactor=1, ffactor=1): """Downsample data block in frequency and/or time. Parameters ---------- tfactor : int, optional factor by which to downsample in time, by default 1 ffactor : int, optional factor by which to downsample in frequency, by default 1 Returns ------- :class:`~sigpyproc.Filterbank.FilterbankBlock` 2 dimensional array of downsampled data Raises ------ ValueError If number of channels is not divisible by `ffactor`. """ if not self.shape[0] % ffactor == 0: raise ValueError("Bad frequency factor given") newnsamps = self.shape[1] - self.shape[1] % tfactor new_ar = np.empty( newnsamps * self.shape[0] // ffactor // tfactor, dtype="float32" ) ar = self.transpose().ravel().copy() lib.downsample(ar, new_ar, tfactor, ffactor, self.shape[0], newnsamps) new_ar = new_ar.reshape( newnsamps // tfactor, self.shape[0] // ffactor ).transpose() new_tsamp = self.header.tsamp * tfactor new_nchans = self.header.nchans // ffactor new_header = self.header.newHeader({"tsamp": new_tsamp, "nchans": new_nchans}) return FilterbankBlock(new_ar, new_header)
[docs] def toFile(self, filename=None, back_compatible=True): """Write the data to file. Parameters ---------- filename : str, optional name of the output file, by default ``basename_split_start_to_end.fil`` back_compatible : bool, optional sigproc compatibility flag (legacy code), by default True Returns ------- str name of output file """ if filename is None: filename = ( f"{self.header.basename}_{self.header.tstart:d}_" f"to_{self.header.mjdAfterNsamps(self.shape[1]):d}.fil" ) new_header = {"nbits": 32} out_file = self.header.prepOutfile( filename, new_header, nbits=32, back_compatible=back_compatible ) out_file.cwrite(self.transpose().ravel()) return filename
[docs] def normalise(self): """Divide each frequency channel by its average. Returns ------- :class:`~sigpyproc.Filterbank.FilterbankBlock` normalised version of the data """ return self / self.mean(axis=1).reshape(self.shape[0], 1)
[docs] def get_tim(self): """Sum across all frequencies for each time sample Returns ------- :class:`~sigpyproc.TimeSeries.TimeSeries` a timeseries """ return self.sum(axis=0)
[docs] def get_bandpass(self): """Average across each time sample for all frequencies. Returns ------- :class:`~sigpyproc.TimeSeries.TimeSeries` the bandpass of the data """ return self.sum(axis=1)
[docs] def dedisperse(self, dm, only_valid_samples=False): """Dedisperse the block. Parameters ---------- dm : float dm to dedisperse to only_valid_samples : bool, optional return a FilterbankBlock with only time samples that contain the full bandwidth, by default False Returns ------- :class:`~sigpyproc.Filterbank.FilterbankBlock` a dedispersed version of the block Raises ------ ValueError If there are not enough time samples to dedisperse. Notes ----- Frequency dependent delays are applied as rotations to each channel in the block. """ delays = self.header.getDMdelays(dm) if not only_valid_samples: new_ar = self.copy() for ii in range(self.shape[0]): new_ar[ii] = rollArray(self[ii], delays[ii] % self.shape[1], 0) else: if self.shape[1] < delays[-1]: raise ValueError( f"Insufficient time samples to dedisperse to {dm} (requires " f"at least {delays[-1]} samples, given {self.shape[1]})." ) new_ar = FilterbankBlock( np.zeros( (self.header.nchans, self.shape[1] - delays[-1]), dtype=self.dtype ), self.header, ) end_samples = delays + new_ar.shape[1] slices = [ np.arange(delay, end_sample) for delay, end_sample in zip(delays, end_samples) ] for idx, timeSlice in enumerate(slices): new_ar[idx, :] = self[idx, timeSlice] new_ar.dm = dm return new_ar