from astropy import units as u
from astropy.io import fits
from astropy import constants
from astropy import wcs
import numpy as np
import warnings
from .beam import Beam, _to_area, SIGMA_TO_FWHM
from .commonbeam import commonbeam
from .utils import InvalidBeamOperationError
[docs]
class Beams(u.Quantity):
"""
An object to handle a set of radio beams for a data cube.
"""
def __new__(cls, major=None, minor=None, pa=None,
areas=None, default_unit=u.arcsec, meta=None,
beams=None):
"""
Create a new set of Gaussian beams
Parameters
----------
major : :class:`~astropy.units.Quantity` with angular equivalency
The FWHM major axes
minor : :class:`~astropy.units.Quantity` with angular equivalency
The FWHM minor axes
pa : :class:`~astropy.units.Quantity` with angular equivalency
The beam position angles
area : :class:`~astropy.units.Quantity` with steradian equivalency
The area of the beams. This is an alternative to specifying the
major/minor/PA, and will create those values assuming a circular
Gaussian beam.
default_unit : :class:`~astropy.units.Unit`
The unit to impose on major, minor if they are specified as floats
beams : List of :class:`~radio_beam.Beam` objects
List of individual `Beam` objects. The resulting `Beams` object will
have major and minor axes in degrees.
"""
# improve to some kwargs magic later
# error checking
if beams is not None:
major = [beam.major.to(u.deg).value for beam in beams] * u.deg
minor = [beam.minor.to(u.deg).value for beam in beams] * u.deg
pa = [beam.pa.to(u.deg).value for beam in beams] * u.deg
# ... given an area make a round beam assuming it is Gaussian
if areas is not None:
rad = np.sqrt(areas / (2 * np.pi)) * u.deg
major = rad * SIGMA_TO_FWHM
minor = rad * SIGMA_TO_FWHM
pa = np.zeros_like(areas) * u.deg
# give specified values priority
if major is not None:
if u.deg.is_equivalent(major.unit):
pass
else:
warnings.warn("Assuming major axes has been specified in degrees")
major = major * u.deg
if minor is not None:
if u.deg.is_equivalent(minor.unit):
pass
else:
warnings.warn("Assuming minor axes has been specified in degrees")
minor = minor * u.deg
if pa is not None:
if len(pa) != len(major):
raise ValueError("Number of position angles must match number of major axis lengths")
if u.deg.is_equivalent(pa.unit):
pass
else:
warnings.warn("Assuming position angles has been specified in degrees")
pa = pa * u.deg
else:
pa = np.zeros_like(major.value) * u.deg
# some sensible defaults
if minor is None:
minor = major
elif len(minor) != len(major):
raise ValueError("Minor and major axes must have same number of values")
if np.any(minor > major):
raise ValueError("Minor axis greater than major axis.")
self = super(Beams, cls).__new__(cls, value=_to_area(major, minor).value, unit=u.sr)
self.major = major
self.minor = minor
self.pa = pa
self.default_unit = default_unit
if meta is None:
self.meta = [{}]*len(self)
else:
self.meta = meta
return self
@property
def meta(self):
return self._meta
@meta.setter
def meta(self, value):
if len(value) == len(self):
self._meta = value
else:
raise TypeError("metadata must be a list of dictionaries")
def __len__(self):
return len(self.major)
@property
def isfinite(self):
return ((self.major > 0) & (self.minor > 0) & np.isfinite(self.major) &
np.isfinite(self.minor) & np.isfinite(self.pa))
def __getslice__(self, start, stop, increment=None):
return self.__getitem__(slice(start, stop, increment))
def __getitem__(self, view):
if isinstance(view, (int, np.int64)):
return Beam(major=self.major[view],
minor=self.minor[view],
pa=self.pa[view],
meta=self.meta[view])
elif isinstance(view, slice):
return Beams(major=self.major[view],
minor=self.minor[view],
pa=self.pa[view],
meta=self.meta[view])
elif isinstance(view, np.ndarray):
if view.dtype.name != 'bool':
raise ValueError("If using an array to index beams, it must "
"be a boolean array.")
return Beams(major=self.major[view],
minor=self.minor[view],
pa=self.pa[view],
meta=[x for ii,x in zip(view, self.meta) if ii])
else:
raise ValueError("Invalid slice")
def __array_finalize__(self, obj):
# If our unit is not set and obj has a valid one, use it.
if self._unit is None:
unit = getattr(obj, '_unit', None)
if unit is not None:
self._set_unit(unit)
if isinstance(obj, Beams):
# Multiplication and division should change the area,
# but not the PA or major/minor ratio
self.major = obj.major
self.minor = obj.minor
self.pa = obj.pa
self.meta = obj.meta
# Copy info if the original had `info` defined. Because of the way the
# DataInfo works, `'info' in obj.__dict__` is False until the
# `info` attribute is accessed or set. Note that `obj` can be an
# ndarray which doesn't have a `__dict__`.
if 'info' in getattr(obj, '__dict__', ()):
self.info = obj.info
@property
def sr(self):
return _to_area(self.major, self.minor)
[docs]
@classmethod
def from_fits_bintable(cls, bintable):
"""
Instantiate a Beams list from a bintable HDU from a CASA-produced image
HDU.
Parameters
----------
bintable : fits.BinTableHDU
The table data containing the beam information
Returns
-------
beams : Beams
A new Beams object
"""
header = bintable.header
# Read the bmaj/bmin units from the header
# (we still assume BPA is degrees because we've never seen an exceptional case)
# this will crash if there is no appropriate header info
maj_kw = [kw for kw, val in header.items() if val == 'BMAJ'][0]
min_kw = [kw for kw, val in header.items() if val == 'BMIN'][0]
maj_unit = header[maj_kw.replace('TTYPE', 'TUNIT')]
min_unit = header[min_kw.replace('TTYPE', 'TUNIT')]
# AIPS uses non-FITS-standard unit names; this catches the
# only case we've seen so far
if maj_unit == 'DEGREES':
maj_unit = 'degree'
if min_unit == 'DEGREES':
min_unit = 'degree'
maj_unit = u.Unit(maj_unit)
min_unit = u.Unit(min_unit)
major = u.Quantity(bintable.data['BMAJ'], maj_unit)
minor = u.Quantity(bintable.data['BMIN'], min_unit)
pa = u.Quantity(bintable.data['BPA'], u.deg)
meta = [{key: row[key] for key in bintable.columns.names
if key not in ('BMAJ', 'BPA', 'BMIN')}
for row in bintable.data]
return cls(major=major, minor=minor, pa=pa, meta=meta)
[docs]
@classmethod
def from_casa_image(cls, imagename):
'''
Instantiate beams from a CASA image. Cannot currently handle beams for
different polarizations.
** Must be run in a CASA environment! **
Parameters
----------
imagename : str
Name of CASA image.
'''
try:
from casatools import image as iatool
ia = iatool()
except ImportError:
raise ImportError("Could not import CASA and therefore"
" cannot read CASA .image files")
ia.open(imagename)
beam_props = ia.restoringbeam()
ia.close()
nchans = beam_props['nChannels']
# Assuming there is always a 0th channel...
maj_unit = u.Unit(beam_props['beams']['*0']['*0']['major']['unit'])
min_unit = u.Unit(beam_props['beams']['*0']['*0']['minor']['unit'])
pa_unit = u.Unit(beam_props['beams']['*0']['*0']['positionangle']['unit'])
major = np.empty((nchans)) * maj_unit
minor = np.empty((nchans)) * min_unit
pa = np.empty((nchans)) * pa_unit
for chan in range(nchans):
chan_name = '*{}'.format(chan)
chanbeam_props = beam_props['beams'][chan_name]['*0']
# Can CASA have mixes of units between channels? Let's test just
# in case
assert maj_unit == u.Unit(chanbeam_props['major']['unit'])
assert min_unit == u.Unit(chanbeam_props['minor']['unit'])
assert pa_unit == u.Unit(chanbeam_props['positionangle']['unit'])
major[chan] = chanbeam_props['major']['value'] * maj_unit
minor[chan] = chanbeam_props['minor']['value'] * min_unit
pa[chan] = chanbeam_props['positionangle']['value'] * pa_unit
return cls(major=major, minor=minor, pa=pa)
[docs]
def average_beam(self, includemask=None, raise_for_nan=True):
"""
Average the beam major, minor, and PA attributes.
This is usually a dumb thing to do!
"""
warnings.warn("Do not use the average beam for convolution! Use the"
" smallest common beam from `Beams.common_beam()`.")
from astropy.stats import circmean
if includemask is None:
includemask = self.isfinite
else:
includemask = np.logical_and(includemask, self.isfinite)
new_beam = Beam(major=self.major[includemask].mean(),
minor=self.minor[includemask].mean(),
pa=circmean(self.pa[includemask],
weights=(self.major / self.minor)[includemask]))
if raise_for_nan and np.any(np.isnan(new_beam)):
raise ValueError("NaNs after averaging. This is a bug.")
return new_beam
[docs]
def largest_beam(self, includemask=None):
"""
Returns the largest beam (by area) in a list of beams.
"""
if includemask is None:
includemask = self.isfinite
else:
includemask = np.logical_and(includemask, self.isfinite)
largest_idx = (self.major * self.minor)[includemask].argmax()
new_beam = Beam(major=self.major[includemask][largest_idx],
minor=self.minor[includemask][largest_idx],
pa=self.pa[includemask][largest_idx])
return new_beam
[docs]
def smallest_beam(self, includemask=None):
"""
Returns the smallest beam (by area) in a list of beams.
"""
if includemask is None:
includemask = self.isfinite
else:
includemask = np.logical_and(includemask, self.isfinite)
largest_idx = (self.major * self.minor)[includemask].argmin()
new_beam = Beam(major=self.major[includemask][largest_idx],
minor=self.minor[includemask][largest_idx],
pa=self.pa[includemask][largest_idx])
return new_beam
[docs]
def extrema_beams(self, includemask=None):
return [self.smallest_beam(includemask),
self.largest_beam(includemask)]
[docs]
def common_beam(self, includemask=None, method='pts', **kwargs):
'''
Return the smallest common beam size. For set of two beams,
the solution is solved analytically. All larger sets solve for the
minimum volume ellipse using the
`Khachiyan Algorithm <http://www.mathworks.com/matlabcentral/fileexchange/9542>`_,
where the convex hull of the set of ellipse edges is used to find the
boundaries of the set.
Since the minimum ellipse method is approximate, some solutions for
the common beam will be slightly underestimated and the solution
cannot be deconvolved from the whole set of beams. To overcome
this issue, a small `epsilon` correction factor is added to the
ellipse edges to encourage a valid common beam solution.
Since `epsilon` is added to all sides, this correction will at most
increase the common beam size by :math:`2\times(1+\epsilon)`.
The default values of `epsilon` is :math:`5\times10^{-4}`, so this
will have a very small effect on the size of the common beam.
In some cases, `epsilon` must be increased to find a valid common
beam solution. The algorithm does this by default
(set by `auto_increase_epsilon=True`), and will incrementally
increase `epsilon` until the common beam can be deconvolved from
all beams, or until either(1) `max_iter` is reached (default is 10)
or (2) `max_epsilon` is reached (default is 1e-3). In practice, we
find these settings work well for different ALMA or VLA data, but
these `kwargs` may need to be changed for discrepant cases.
Parameters
----------
includemask : `~numpy.ndarray`, optional
Boolean mask.
method : {'pts'}, optional
Many beam method. Only `pts` is currently available.
kwargs : Passed to `~radio_beam.commonbeam`.
'''
return commonbeam(self if includemask is None else self[includemask],
method=method, **kwargs)
def __iter__(self):
for i in range(len(self)):
yield self[i]
def __mul__(self, other):
# Other must be a single beam. Assume multiplying is convolving
# as set of beams with a given beam
if not isinstance(other, Beam):
raise InvalidBeamOperationError("Multiplication is defined as a "
"convolution of the set of beams "
"with a given beam. Must be "
"multiplied with a Beam object.")
return Beams(beams=[beam * other for beam in self])
def __truediv__(self, other):
# Other must be a single beam. Assume dividing is deconvolving
# as set of beams with a given beam
if not isinstance(other, Beam):
raise InvalidBeamOperationError("Division is defined as a "
"deconvolution of the set of beams"
" with a given beam. Must be "
"divided by a Beam object.")
return Beams(beams=[beam / other for beam in self])
def __add__(self, other):
raise InvalidBeamOperationError("Addition of a set of Beams "
"is not defined.")
def __sub__(self, other):
raise InvalidBeamOperationError("Addition of a set of Beams "
"is not defined.")
def __eq__(self, other):
# other should be a single beam, or a another Beams object
if isinstance(other, Beam):
return np.array([beam == other for beam in self])
elif isinstance(other, Beams):
# These should have the same size.
if not self.size == other.size:
raise InvalidBeamOperationError("Beams objects must have the "
"same shape to test "
"equality.")
return np.all([beam == other_beam for beam, other_beam in
zip(self, other)])
else:
raise InvalidBeamOperationError("Must test equality with a Beam"
" or Beams object.")
def __ne__(self, other):
eq_out = self.__eq__(other)
# If other is a Beam, will get array back
if isinstance(eq_out, np.ndarray):
return ~eq_out
# If other is a Beams, will get boolean back
else:
return not eq_out