# -*- mode: python; coding: utf-8 -*
# Copyright (c) 2018 rasg-affiliates
# Licensed under the 3-clause BSD License
"""Read and Write support for PAPER miriad files with pyuvdata."""
import copy
import numpy as np
from astropy import constants as const
from astropy import units
from pyuvdata import utils as uvutils
from scipy.signal import windows
[docs]def get_data_array(uv, reds, squeeze=True):
"""Remove data from pyuvdata object and store in numpy array.
Uses UVData.get_data function to create a matrix of data of shape (Npols, Nbls, Ntimes, Nfreqs).
Only valid to call on a set of redundant baselines with the same number of times.
Parameters
----------
uv : UVdata object, or subclass
Data object which can support uv.get_data(ant_1, ant_2, pol)
reds: list of ints
list of all redundant baselines of interest as baseline numbers.
squeeze : bool
set true to squeeze the polarization dimension.
This has no effect for data with Npols > 1.
Returns
-------
data_array :complex arrary
(Nbls , Ntimes, Nfreqs) numpy array or (Npols, Nbls, Ntimes, Nfreqs) if squeeze == False
"""
data_shape = (uv.Npols, uv.Nbls, uv.Ntimes, uv.Nfreqs)
data_array = np.zeros(data_shape, dtype=np.complex)
for count, baseline in enumerate(reds):
tmp_data = uv.get_data(baseline, squeeze="none")
# Keep the polarization dimenions and squeeze out spw
tmp_data = np.squeeze(tmp_data, axis=1)
# Reorder to: Npols, Ntimes, Nfreqs
data_array[:, count] = np.transpose(tmp_data, [2, 0, 1])
if squeeze:
if data_array.shape[0] == 1:
data_array = np.squeeze(data_array, axis=0)
return data_array
[docs]def get_nsample_array(uv, reds, squeeze=True):
"""Remove nsamples from pyuvdata object and store in numpy array.
Uses UVData.get_nsamples function to create a matrix of data of shape (Npols, Nbls, Ntimes, Nfreqs).
Only valid to call on a set of redundant baselines with the same number of times.
Parameters
----------
uv : UVdata object, or subclass
Data object which can support uv.get_data(ant_1, ant_2, pol)
reds: list of ints
list of all redundant baselines of interest as baseline numbers.
squeeze : bool
set true to squeeze the polarization dimension.
This has no effect for data with Npols > 1.
Returns
-------
nsample_array : float array
(Nbls, Ntimes, Nfreqs) numpy array or (Npols, Nbls, Ntimes, Nfreqs) if squeeze == False
"""
nsample_shape = (uv.Npols, uv.Nbls, uv.Ntimes, uv.Nfreqs)
nsample_array = np.zeros(nsample_shape, dtype=np.float)
for count, baseline in enumerate(reds):
tmp_data = uv.get_nsamples(baseline, squeeze="none")
# Keep the polarization dimenions and squeeze out spw
tmp_data = np.squeeze(tmp_data, axis=1)
# Reorder to: Npols, Ntimes, Nfreqs
nsample_array[:, count] = np.transpose(tmp_data, [2, 0, 1])
if squeeze:
if nsample_array.shape[0] == 1:
nsample_array = np.squeeze(nsample_array, axis=0)
return nsample_array
[docs]def get_flag_array(uv, reds, squeeze=True):
"""Remove nsamples from pyuvdata object and store in numpy array.
Uses UVData.get_flags function to create a matrix of data of shape (Npols, Nbls, Ntimes, Nfreqs).
Only valid to call on a set of redundant baselines with the same number of times.
Parameters
----------
uv : UVdata object, or subclass
Data object which can support uv.get_data(ant_1, ant_2, pol)
reds: list of ints
list of all redundant baselines of interest as baseline numbers.
squeeze : bool
set true to squeeze the polarization dimension.
This has no effect for data with Npols > 1.
Returns
-------
flag_array : bool array
(Nbls, Ntimes, Nfreqs) numpy array (Npols, Nbls, Ntimes, Nfreqs) if squeeze == False
"""
flag_shape = (uv.Npols, uv.Nbls, uv.Ntimes, uv.Nfreqs)
flag_array = np.zeros(flag_shape, dtype=np.bool)
reds = np.array(reds)
for count, baseline in enumerate(reds):
tmp_data = uv.get_flags(baseline, squeeze="none")
# Keep the polarization dimenions and squeeze out spw
tmp_data = np.squeeze(tmp_data, axis=1)
# Reorder to: Npols, Ntimes, Nfreqs
flag_array[:, count] = np.transpose(tmp_data, [2, 0, 1])
if squeeze:
if flag_array.shape[0] == 1:
flag_array = np.squeeze(flag_array, axis=0)
return flag_array
[docs]def get_integration_time(uv, reds, squeeze=True):
"""Extract the integration_time array from pyuvdata objectself.
Extracts the integration time array from a UVdata object to create a matrix of shape (Npols, Nbls, Ntimes, Nfreqs).
Only valid to call on a set of redundant baselines with the same number of times.
Parameters
----------
uv : UVdata object, or subclass
Data object which can support uv.get_data(ant_1, ant_2, pol)
reds: list of ints
list of all redundant baselines of interest as baseline numbers.
squeeze : bool
set true to squeeze the polarization dimension.
This has no effect for data with Npols > 1.
Returns
-------
integration_time : float array
(Nbls, Ntimes) numpy array of integration times.
"""
shape = (uv.Nbls, uv.Ntimes)
integration_time = np.zeros(shape, dtype=np.float)
reds = np.array(reds)
for count, baseline in enumerate(reds):
blt_inds, conj_inds, pol_inds = uv._key2inds(baseline)
# The integration doesn't care about conjugation, just need all the
# times associated with this baseline
inds = np.concatenate([blt_inds, conj_inds])
inds.sort()
integration_time[count] = uv.integration_time[inds]
return integration_time
[docs]def bootstrap_array(array, nboot=100, axis=0):
"""Bootstrap resample the input array along the given axis.
Randomly sample, with replacement along the given axis `nboot` times.
Output array will always have N+1 dimensions with the extra axis immediately following the bootstrapped axis.
Parameters
----------
array : numpy array
N-dimensional array to bootstrap resample.
nboot : int
Number of resamples to draw.
axis : int
Axis along which resampling is performed
Returns
-------
array : numpy array
The resampled array, if input is N-D output is N+1-D, extra dimension is added imediately suceeding the sampled axis.
Raises
------
ValueError
If `axis` parameter is greater than the dimensions of the input array.
"""
if axis >= len(np.shape(array)):
raise ValueError(
"Specified axis must be shorter than the length "
"of input array.\n"
"axis value was {0} but array has {1} dimensions".format(
axis, len(np.shape(array))
)
)
sample_inds = np.random.choice(
array.shape[axis], size=(array.shape[axis], nboot), replace=True
)
return np.take(array, sample_inds, axis=axis)
[docs]def noise_equivalent_bandwidth(window):
"""Calculate the relative equivalent noise bandwidth of window function.
Approximates the relative Noise Equivalent Bandwidth of a spectral taper function.
Parameters
----------
window : array_like
A 1-Dimenaional array like.
Returns
-------
float
"""
return np.sum(window) ** 2 / (np.sum(window ** 2) * len(window))
[docs]def combine_nsamples(nsample_1, nsample_2=None, axis=-1):
"""Combine the nsample arrays for use in cross-multiplication.
Uses numpy slicing to generate array of all sample cross-multiples.
Used to find the combine samples for a the delay spectrum.
The geometric mean is taken between nsamples_1 and nsamples_2 because
nsmaples array is used to compute thermal variance in the delay spectrum.
Parameters
----------
nsample_1 : array of float
(Nbls, Ntimes, Nfreqs) array from get_nsamples_array can also have shape (Npols, Nbls, Ntimes, Nfreqs)
nsample_2 : array of float, optional
Same type as nsample_1 if take cross-multiplication
Defaults to copying nsample_1 for auto-correlation
axis : int
The axis over which the cross multiplication should occur.
Returns
-------
samples_out : array of float
(Nbls, Nbls, Nfreqs, Ntimes) array of geometric mean of the input sample arrays.
Can also have shape (Npols, Nbls, Nbls, Ntimes, Nfreqs)
Raises
------
ValueError
If both input arrays have different shapes.
"""
if nsample_2 is None:
nsample_2 = nsample_1.copy()
if not nsample_1.shape == nsample_2.shape:
raise ValueError(
"nsample_1 and nsample_2 must have same shape, "
"but nsample_1 has shape {d1_s} and "
"nsample_2 has shape {d2_s}".format(
d1_s=nsample_1.shape, d2_s=nsample_2.shape
)
)
samples_out = np.sqrt(
cross_multiply_array(array_1=nsample_1, array_2=nsample_2, axis=axis)
)
# The nsamples array is used to construct the thermal variance
# Cross-correlation takes the geometric mean of thermal variance.
return samples_out
[docs]def remove_auto_correlations(data_array, axes=(0, 1)):
"""Remove the auto-corrlation term from input array.
Takes an N x N array, removes the diagonal components, and returns a flattened N(N-1) dimenion in place of the array.
If uses on a M dimensional array, returns an M-1 array.
Parameters
----------
data_array : array
Array shaped like (Nbls, Nbls, Ntimes, Nfreqs). Removes same baseline diagonal along the specifed axes.
axes : tuple of int, length 2
axes over which the diagonal will be removed.
Returns
-------
data_out : array with the same type as `data_array`.
(Nbls * (Nbls-1), Ntimes, Nfreqs) array.
if input has pols: (Npols, Nbls * (Nbls -1), Ntimes, Nfreqs)
Raises
------
ValueError
If axes is not a length 2 tuple.
If axes are not adjecent (e.g. axes=(2,7)).
If axes do not have the same shape.
"""
if not np.shape(axes)[0] == 2:
raise ValueError(
"Shape must be a length 2 tuple/array/list of " "axis indices."
)
if axes[0] != axes[1] - 1:
raise ValueError(
"Axes over which diagonal components are to be " "remove must be adjacent."
)
if data_array.shape[axes[0]] != data_array.shape[axes[1]]:
raise ValueError(
"The axes over which diagonal components are to be "
"removed must have the same shape."
)
n_inds = data_array.shape[axes[0]]
# make a boolean index array with True off the diagonal and
# False on the diagonal.
indices = np.logical_not(np.diag(np.ones(n_inds, dtype=bool)))
# move the axes so axes[0] is the 0th axis and axis 1 is the 1th
data_array = np.moveaxis(data_array, axes[0], 0)
data_array = np.moveaxis(data_array, axes[1], 1)
data_out = data_array[indices]
# put the compressed axis back in the original spot
data_out = np.moveaxis(data_out, 0, axes[0])
return data_out
[docs]def cross_multiply_array(array_1, array_2=None, axis=0):
"""Cross multiply the arrays along the given axis.
Cross multiplies along axis and computes array_1.conj() * array_2
if axis has length M then a new axis of size M will be inserted directly succeeding the original.
Parameters
----------
array_1 : array_like
N-dimensional array_like
array_2 : array_like, optional
N-dimenional array.
Defaults to copy of array_1
axis : int
Axis along which to cross multiply
Returns
-------
cross_array : array_like
N+1 Dimensional array
Raises
------
ValueError
If input arrays have different shapes.
"""
if isinstance(array_1, list):
array_1 = np.asarray(array_1)
if array_2 is None:
array_2 = copy.deepcopy(array_1)
if isinstance(array_2, list):
array_2 = np.asarray(array_2)
unit_1, unit_2 = 1, 1
if isinstance(array_1, units.Quantity):
unit_1 = array_1.unit
array_1 = array_1.value
if isinstance(array_2, units.Quantity):
unit_2 = array_2.unit
array_2 = array_2.value
if array_2.shape != array_1.shape:
raise ValueError(
"array_1 and array_2 must have the same shapes. "
"array_1 has shape {a1} but array_2 has shape {a2}".format(
a1=np.shape(array_1), a2=np.shape(array_2)
)
)
cross_array = np.expand_dims(array_1, axis=axis).conj() * np.expand_dims(
array_2, axis=axis + 1
)
if isinstance(unit_1, units.UnitBase) or isinstance(unit_2, units.UnitBase):
cross_array <<= unit_1 * unit_2
return cross_array
[docs]def lst_align(uv1, uv2, ra_range, inplace=True, atol=1e-08, rtol=1e-05):
"""Align the LST values of two pyuvdata objects within the given range.
Attempts to crudely align input UVData objects in LST by finding indices
where the lsts fall within the input range.
Parameters
----------
uv1 : UVData object or subclass
First object where data is desired to lie within the `ra_range`.
Must have a time_array parameter to find lsts.
uv2 : UVData object or subclass
Must have the same properties as `uv1`
ra_range : length 2 list of float
The inclusive start and stop times in hours for the LSTs to align.
inplace : bool
If true performs UVData.select on `uv1` and `uv2` otherwise returns new objects.
atol : float
Absolute tolerance with which the integrations time should agree
rtol : float
Relative tolerance with which the integrations time should agree
Returns
-------
(uv1, uv2) : tuple of UVData objects
only returns when `inplace` is False.
Raises
------
ValueError
if `uv1` and `uv2` have different integration times, or time cadences.
"""
delta_t_1 = uv1._calc_single_integration_time()
delta_t_2 = uv2._calc_single_integration_time()
if not np.isclose(delta_t_1, delta_t_2, rtol=rtol, atol=atol):
raise ValueError(
"The two UVData objects much have matching "
"time sample rates. "
"values were uv1: {0} and uv2: {1}".format(delta_t_1, delta_t_2)
)
bl1 = uv1.baseline_array[0]
bl2 = uv2.baseline_array[0]
times_1 = uv1.get_times(bl1)
times_2 = uv2.get_times(bl2)
uv1_location = uv1.telescope_location_lat_lon_alt_degrees
uv2_location = uv2.telescope_location_lat_lon_alt_degrees
lsts_1 = uvutils.get_lst_for_time(times_1, *uv1_location) * 12.0 / np.pi
lsts_2 = uvutils.get_lst_for_time(times_2, *uv2_location) * 12.0 / np.pi
inds_1 = np.logical_and(lsts_1 >= ra_range[0], lsts_1 <= ra_range[-1])
inds_2 = np.logical_and(lsts_2 >= ra_range[0], lsts_2 <= ra_range[-1])
diff = inds_1.sum() - inds_2.sum()
if diff > 0:
last_ind = inds_1.size - inds_1[::-1].argmax() - 1
inds_1[last_ind : last_ind - diff : -1] = False
elif diff < 0:
diff = np.abs(diff)
last_ind = inds_2.size - inds_2[::-1].argmax() - 1
inds_2[last_ind : last_ind - diff : -1] = False
new_times_1 = times_1[inds_1]
new_times_2 = times_2[inds_2]
return (
uv1.select(times=new_times_1, inplace=inplace),
uv2.select(times=new_times_2, inplace=inplace),
)
[docs]@units.quantity_input(freqs="frequency")
def jy_to_mk(freqs):
"""Calculate the Jy/sr to mK conversion lambda^2/(2 * K_boltzman).
Parameters
----------
freqs : Astropy Quantity with units equivalent to frequency
frequencies where the conversion should be calculated.
Returns
-------
Astropy Quantity
The conversion factor from Jy to mK * sr at the given frequencies.
"""
jy2t = units.sr * const.c.to("m/s") ** 2 / (2 * freqs.to("1/s") ** 2 * const.k_B)
return jy2t << units.Unit("mK*sr/Jy")
[docs]def generate_noise(noise_power):
"""Generate noise given an input array of noise power.
Parameters
----------
noise_power : array_like of float
N-dimensional array of noise power to generate white noise.
Returns
-------
noise : array_like of complex
Complex white noise drawn from a Gaussian distribution with width given by the value of the input noise_power array.
"""
# divide by sqrt(2) to conserve total noise amplitude over real and imag
noise = noise_power * (
1 * np.random.normal(size=noise_power.shape)
+ 1j * np.random.normal(size=noise_power.shape)
)
noise /= np.sqrt(2)
return noise
[docs]def weighted_average(array, uncertainty, weights=None, axis=-1):
"""Compute the weighted average and propagate uncertainty.
Performs weighted average of input array, and propagates the weighted average into the uncertainty.
Parameters
----------
array : array_like
N-dimensional array over which to average an axis.
uncertainty : array_like
N-dimensional array of uncertainties associated with each point in input `array`.
weights : array_like, Optional
N-dimenional or 1-Dimenaiona array of weights to apply to each data point.
If weights are one dimensional must be of length N and assumped to be the same for each row M.
if weights is None, uses inverse variance weighting
axis : int
The axis over which the average is taken.
Returns
-------
array : array_like
MxN-1 dimenionals array averaged array with given weights.
uncertainty : array_like
MxN-1 dimensional propagated uncertainty array for given weights.
Raises
------
ValueError
If either one of the `array` or `uncertainty` is an astropy Quantity object but the other is not.
If `array` and `uncertainty` have different shapes.
If `weights` are 1-Dimensional but length is not the same as the lenght of `array` along the given `axis`.
If `weights` are N-Dimensional but shaped differs from `array`.
"""
if isinstance(array, units.Quantity):
if isinstance(uncertainty, units.Quantity):
uncertainty = uncertainty << array.unit
else:
raise ValueError(
"Input array is a Quantity Object but "
"uncertainty is not. Either both arrays must be "
"Quantity objects or neither must be."
)
elif isinstance(uncertainty, units.Quantity):
raise ValueError(
"Input array is a not Quantity Objcet but "
"uncertainty is. Either both arrays must be "
"Quantity objects or neither must be."
)
# check shapes of array and uncertainty
if not array.shape == uncertainty.shape:
raise ValueError(
"Input array and uncertainties must have the same "
"shape. Array shape: {array}, Uncertainty shape: "
"{error}".format(array=array.shape, error=uncertainty.shape)
)
# if weights is none use uniform? inverse variance?
if weights is None:
weights = 1.0 / uncertainty ** 2
# check shape of weights
if np.ndim(weights) == 1 and weights.size != array.shape[axis]:
raise ValueError(
"1-Dimenionals weights must have the same shape "
"as the axis over which the average is taken."
)
elif not np.shape(weights) == array.shape:
raise ValueError(
"Input array and uncertainties must have the same "
"shape. Array shape: {array}, Uncertainty shape: "
"{weights}".format(array=array.shape, weights=weights.shape)
)
array_out = np.average(array, weights=weights, axis=axis)
uncertainty_out = np.sqrt(
np.sum(uncertainty ** 2 * weights ** 2, axis=axis)
/ np.abs(np.sum(weights, axis=axis)) ** 2
)
return array_out, uncertainty_out
accepted_units = ["mK^2*Mpc^3", "mK^2*Mpc^3/littleh^3", "time"]
[docs]@units.quantity_input(delays="time", array=accepted_units)
def fold_along_delay(delays, array, uncertainty, weights=None, axis=-1):
"""Fold input array over the delay axis.
Averages input `array` along the given `axis` by splitting the `delay` array around the value 0 and
averaging values in `array` correspoding to the same `delay` magnitutde.
Parameters
----------
delays : Astropy Quantity units equivalent to time.
A 1-Dimensional numpy array of interferometric delays.
array : Astropy Quantity units equivalent to time, or power.
An N-Dimensional Quantity to average across delay magnitudes.
uncertainty : Astropy Quantity units equivalent to `array`
An N-Dimensional Quantity of uncertainty values for input `array`
weights : array_like
Weights to use while averaging the input `array`. Must have same shape as input array.
Default: np.ones_like(array)
axis : int
The axis over which the input array is to be folded.
Must have the same shape as the size of input delays.
Returns
-------
array : Astropy Quantity with units equivalent to input `array`
The N-Dimensional input array folded over the axis specified
give axis will have size np.shape(array)[axis]/2 if shape is even
or (np.shape(array)[axis] + 1)/2 if shape is odd
uncertainty : Astropy Quantity with units equivalent to input `uncertainty`.
The folded uncertainties corresponding to the input array.
Raises
------
ValueError
If shape of the specified axis is not equal to the length of the `delay` array.
If input `delays` either have no 0 bin or evenly spaced around zero.
If `uncertainty` is not the same shape as `array`.
"""
delays = copy.deepcopy(delays)
array = copy.deepcopy(array)
uncertainty = copy.deepcopy(uncertainty)
# This function assumes your array is a block-square array,
# e.g. all delays are the same.
if array.shape[axis] != len(delays):
raise ValueError(
(
"Input array must have same length as the "
"delays along the specified axis."
"Axis given was {0}, the array has length {1} "
"but delays are length {2}".format(axis, array.shape[axis], len(delays))
)
)
if len(delays) % 2 != 0 and np.abs(delays).min() != 0:
raise ValueError(
(
"Input delays must have either a delay=0 bin "
"as the central value or have an even size."
)
)
# check shapes of array and uncertainty
if not array.shape == uncertainty.shape:
raise ValueError(
"Input array and uncertainties must have the same "
"shape. Array shape: {array}, Uncertainty shape: "
"{error}".format(array=array.shape, error=uncertainty.shape)
)
if weights is None:
if not array.imag.value.any():
weights = np.ones_like(array)
else:
weights = np.ones_like(array) * (1 + 1j)
if np.logical_and(np.abs(delays).min() == 0, delays.size % 2):
split_index = np.argmin(np.abs(delays), axis=axis)
split_inds = [split_index, split_index + 1]
neg_vals, zero_bin, pos_vals = np.split(array, split_inds, axis=axis)
neg_vals = np.flip(neg_vals, axis=axis)
pos_vals = np.concatenate([zero_bin, pos_vals], axis=axis)
neg_vals = np.concatenate([zero_bin, neg_vals], axis=axis)
neg_errors, zero_errors, pos_errors = np.split(
uncertainty, split_inds, axis=axis
)
# the error on the 0 delay mode will go down like 1/sqrt(2) with the
# inverse vaiance weighting scheme, but we didn't actually gain any information
# so rescale the error, this _could_ be avoided if we didn't concatenate along
# the 0th dimension but would make it require two different calculations
zero_errors *= np.sqrt(2)
neg_errors = np.flip(neg_errors, axis=axis)
pos_errors = np.concatenate([zero_errors, pos_errors], axis=axis)
neg_errors = np.concatenate([zero_errors, neg_errors], axis=axis)
neg_weights, zero_weights, pos_weights = np.split(
weights, split_inds, axis=axis
)
neg_weights = np.flip(neg_weights, axis=axis)
pos_weights = np.concatenate([zero_weights, pos_weights], axis=axis)
neg_weights = np.concatenate([zero_weights, neg_weights], axis=axis)
else:
min_val_bool = np.abs(delays) == np.amin(np.abs(delays), axis=axis)
split_index = np.where(np.logical_and(delays > 0, min_val_bool))
split_inds = [np.squeeze(split_index)]
neg_vals, pos_vals = np.split(array, split_inds, axis=axis)
neg_vals = np.flip(neg_vals, axis=axis)
neg_errors, pos_errors = np.split(uncertainty, split_inds, axis=axis)
neg_errors = np.flip(neg_errors, axis=axis)
neg_weights, pos_weights = np.split(weights, split_inds, axis=axis)
neg_weights = np.flip(neg_weights, axis=axis)
_array = np.stack([pos_vals, neg_vals], axis=0)
_errors = np.stack([pos_errors, neg_errors], axis=0)
_weights = np.stack([pos_weights, neg_weights], axis=0)
if not _array.imag.value.any():
out_array, out_errors = weighted_average(
_array.real, _errors.real, weights=_weights, axis=0
)
else:
weight_check = _weights.imag.value.any()
if not weight_check:
try:
_weights.imag = np.ones_like(_weights.real)
except TypeError:
_weights = _weights.astype(np.complex)
_weights.imag = np.ones_like(_weights.real)
out_array_real, out_errors_real = weighted_average(
_array.real, _errors.real, weights=_weights.real, axis=0
)
out_array_imag, out_errors_imag = weighted_average(
_array.imag, _errors.imag, weights=_weights.imag, axis=0
)
out_array = out_array_real + 1j * out_array_imag
out_errors = out_errors_real + 1j * out_errors_imag
return out_array, out_errors