Source code for radio_beam.commonbeam


import numpy as np
import astropy.units as u

try:
    from scipy import optimize as opt
    from scipy.spatial import ConvexHull
    HAS_SCIPY = True
except ImportError:
    HAS_SCIPY = False

from .beam import Beam
from .utils import BeamError, transform_ellipse, deconvolve_optimized

__all__ = ['commonbeam', 'common_2beams', 'getMinVolEllipse',
           'common_manybeams_mve', 'find_commonbeam_between']


[docs] def commonbeam(beams, method='pts', **method_kwargs): """ Use analytic method if there are only two beams. Otherwise use constrained optimization to find the common beam. """ if beams.size == 1: return beams[0] elif fits_in_largest(beams): return beams.largest_beam() else: if beams.size == 2: try: return common_2beams(beams) # Sometimes this method can fail. Use the many beam solution in # this case except (ValueError, BeamError): pass if method == 'pts': return common_manybeams_mve(beams, **method_kwargs) elif method == 'opt': return common_manybeams_opt(beams, **method_kwargs) else: raise ValueError("method must be 'pts' or 'opt'.")
[docs] def common_2beams(beams, check_deconvolution=True): """ Find a common beam from a `Beams` object with 2 beams. This function is based on the CASA implementation `ia.commonbeam`. Note that the solution is only valid for 2 beams. Parameters ---------- beams : `~radio_beam.Beams` Beams object with 2 beams. Returns ------- common_beam : `~radio_beam.Beam` The smallest common beam in the set of beams. """ if beams.size != 2: raise BeamError("This method is only valid for two beams.") if (~beams.isfinite).all(): raise BeamError("All beams in the object are invalid.") return find_commonbeam_between(beams[0], beams[1], check_deconvolution=check_deconvolution)
[docs] def find_commonbeam_between(beam1, beam2, check_deconvolution=True): """ Find the common beam between 2 `~radio_beam.Beam` objects. This function is based on the CASA implementation `ia.commonbeam` that the solution is valid when comparing 2 beams. Parameters ---------- beam1 : `~radio_beam.Beam` Beam object. beam2 : `~radio_beam.Beam` Beam object. check_deconvolution : bool, optional Check that the common beam solution can be deconvolved from both input beams. Returns ------- common_beam : `~radio_beam.Beam` The smallest common beam in the set of beams. """ # This code is based on the implementation in CASA: # https://open-bitbucket.nrao.edu/projects/CASA/repos/casa/browse/code/imageanalysis/ImageAnalysis/CasaImageBeamSet.cc if not beam1.isfinite or not beam2.isfinite: raise BeamError("At least one beam is invalid.") # If equal, the common beam is itself. if beam1 == beam2: return beam1 if beam1 > beam2: large_beam = beam1 small_beam = beam2 else: large_beam = beam2 small_beam = beam1 large_major = large_beam.major.to(u.arcsec) large_minor = large_beam.minor.to(u.arcsec) small_major = small_beam.major.to(u.arcsec) small_minor = small_beam.minor.to(u.arcsec) deconv_beam = large_beam.deconvolve(small_beam, failure_returns_pointlike=True) # Larger beam can be deconvolved. It is already the smallest common beam if deconv_beam.isfinite: return large_beam # If the smaller beam is a circle, the minor axis is the circle radius if small_beam.iscircular(): common_beam = Beam(large_beam.major, small_beam.major, large_beam.pa) return common_beam # Wrap angle about 0 to pi. pa_diff = ((small_beam.pa.to(u.rad).value - large_beam.pa.to(u.rad).value + np.pi / 2. + np.pi) % np.pi - np.pi / 2.) * u.rad # If the difference is pi / 2, the larger major is set to the # new major and the minor is the other major. if np.isclose(np.abs(pa_diff).value, np.pi / 2.): larger_major = large_beam.major >= small_beam.major major = large_major if larger_major else small_major minor = small_major if larger_major else small_major pa = large_beam.pa if larger_major else small_beam.pa conv_beam = Beam(major=major, minor=minor, pa=pa) return conv_beam else: # Transform to coordinates where large_beam is circular major_comb = np.sqrt(large_major * small_major) p = major_comb / large_major q = major_comb / large_minor # Transform beam into the same coordinates, and rotate so its # major axis is along the x axis. trans_major_sc, trans_minor_sc, trans_pa_sc = \ transform_ellipse(small_major, small_minor, pa_diff, p, q) # The transformed minor axis is major_comb, as defined in CASA trans_minor_sc = major_comb # Return beam to the original coordinates, still rotated with # the major along the x axis trans_major_unsc, trans_minor_unsc, trans_pa_unsc = \ transform_ellipse(trans_major_sc, trans_minor_sc, trans_pa_sc, 1 / p, 1 / q) # Lastly, rotate the PA to the enclosing ellipse trans_major = trans_major_unsc.to(u.arcsec) trans_minor = trans_minor_unsc.to(u.arcsec) trans_pa = trans_pa_unsc + large_beam.pa # The minor axis becomes an issue when checking against the smaller # beam from deconvolution. Adding a tiny fraction makes the deconvolved # beam JUST larger than zero (~1e-7). epsilon = 100 * np.finfo(trans_major.dtype).eps * trans_major.unit trans_beam = Beam(major=trans_major + epsilon, minor=trans_minor + epsilon, pa=trans_pa) if check_deconvolution: # Ensure this beam can now be deconvolved deconv_large_beam = \ trans_beam.deconvolve(large_beam, failure_returns_pointlike=True) deconv_prob_beam = \ trans_beam.deconvolve(small_beam, failure_returns_pointlike=True) if not deconv_large_beam.isfinite or not deconv_prob_beam.isfinite: raise BeamError("Failed to find common beam that both beams can " "be deconvolved by.") # Taken from CASA implementation, but by adding epsilon, this shouldn't # be needed # Scale the enclosing ellipse by a small factor until it does. # can_deconv = False # num = 0 # while not can_deconv: # deconv_large_beam = \ # trans_beam.deconvolve(large_beam, # failure_returns_pointlike=True) # deconv_prob_beam = \ # trans_beam.deconvolve(small_beam, # failure_returns_pointlike=True) # if (not deconv_large_beam.isfinite or not deconv_prob_beam.isfinite): # scale_factor = 1.001 # trans_beam = Beam(major=trans_major * scale_factor, # minor=trans_minor * scale_factor, # pa=trans_pa) # else: # can_deconv = True # if num == 10: # break # num += 1 common_beam = trans_beam return common_beam
def boundingcircle(bmaj, bmin, bpa): thisone = np.argmax(bmaj) # PA really shouldn't matter here. But the minimization performed better # in some cases with a non-zero PA. Presumably this is b/c the PA of the # common beam is affected more by the beam with the largest major axis. return bmaj[thisone], bmaj[thisone], bpa[thisone] def PtoA(bmaj, bmin, bpa): """ Express the ellipse parameters into `center-form <https://en.wikipedia.org/wiki/Matrix_representation_of_conic_sections>`_. """ A = np.zeros((2, 2)) A[0, 0] = np.cos(bpa)**2 / bmaj**2 + np.sin(bpa)**2 / bmin**2 A[1, 0] = np.cos(bpa) * np.sin(bpa) * (1 / bmaj**2 - 1 / bmin**2) A[0, 1] = A[1, 0] A[1, 1] = np.sin(bpa)**2 / bmaj**2 + np.cos(bpa)**2 / bmin**2 return A def BinsideA(B, A): try: np.linalg.cholesky(B - A) return True except np.linalg.LinAlgError: return False def myobjective_regularized(p, bmajvec, bminvec, bpavec): # Force bmaj > bmin if p[0] < p[1]: return 1e30 # We can safely assume the common major axis is at most the # largest major axis in the set if (p[0] <= bmajvec).any(): return 1e30 A = PtoA(*p) test = np.zeros_like(bmajvec) for idx, (bmx, bmn, bp) in enumerate(zip(bmajvec, bminvec, bpavec)): test[idx] = BinsideA(PtoA(bmx, bmn, bp), A) obj = 1 / np.linalg.det(A) if np.all(test): return obj else: return obj * 1e30 def common_manybeams_opt(beams, p0=None, opt_method='Nelder-Mead', optdict={'maxiter': 5000, 'ftol': 1e-14, 'maxfev': 5000}, verbose=False, brute=False, brute_steps=40): """ Optimize the common beam solution by maximizing the determinant of the common beam. ..note:: This method is experimental and requires further testing. Parameters ---------- beams : `~radio_beam.Beams` Beams object. p0 : tuple, optional Initial guess parameters (`major, minor, pa`). opt_method : str, optional Optimization method to use. See `~scipy.optimize.minimize`. The default of Nelder-Mead is the only method we have had some success with. optdict : dict, optional Dictionary parameters passed to `~scipy.optimize.minimize`. verbose : bool, optional Print the full output from `~scipy.optimize.minimize`. brute : bool, optional Use `~scipy.optimize.brute` to find the optimal solution. brute_steps : int, optional Number of positions to sample in each parameter (3). Returns ------- com_beam : `~radio_beam.Beam` Common beam. """ raise NotImplementedError("This method is not fully tested. Remove this " "line for testing purposes.") if not HAS_SCIPY: raise ImportError("common_manybeams_opt requires scipy.optimize.") bmaj = beams.major.value bmin = beams.minor.value bpa = beams.pa.to(u.rad).value if p0 is None: p0 = boundingcircle(bmaj, bmin, bpa) # It seems to help to make the initial guess slightly larger p0 = (1.1 * p0[0], 1.1 * p0[1], p0[2]) if brute: maj_range = [beams.major.max(), 1.5 * beams.major.max()] maj_step = (maj_range[1] - maj_range[0]) / brute_steps min_range = [beams.minor.min(), 1.5 * beams.major.max()] min_step = (min_range[1] - min_range[0]) / brute_steps rranges = (slice(maj_range[0], maj_range[1], maj_step), slice(min_range[0], min_range[1], min_step), slice(0, 179.9, 180. / brute_steps)) result = opt.brute(myobjective_regularized, rranges, args=(bmaj, bmin, bpa), full_output=True, finish=opt.fmin) params = result[0] else: result = opt.minimize(myobjective_regularized, p0, method=opt_method, args=(bmaj, bmin, bpa), options=optdict, tol=1e-14) params = result.x if verbose: print(result.viewitems()) if not result.success: raise Warning("Optimization failed") com_beam = Beam(params[0] * beams.major.unit, params[1] * beams.major.unit, (params[2] % np.pi) * u.rad) # Test if it deconvolves all if not fits_in_largest(beams, com_beam): raise BeamError("Could not find common beam to deconvolve all beams.") return com_beam def fits_in_largest(beams, large_beam=None): """ Test if all beams can be deconvolved by the largest beam """ if large_beam is None: large_beam = beams.largest_beam() large_hdr_keywords = large_beam.to_header_keywords() majors = beams.major.to(u.deg).value minors = beams.minor.to(u.deg).value pas = beams.pa.to(u.deg).value # Catch differences below << 1 microarsec = 2.8e10 # This is the same limit used for checking equal beams in Beam.__eq__ atol_limit = 1e-12 for major, minor, pa in zip(majors, minors, pas): equal = abs(large_hdr_keywords['BMAJ'] - major) < atol_limit equal = equal and (abs(large_hdr_keywords['BMIN'] - minor) < atol_limit) # Check if the beam is circular # This checks for fractional changes below 1e-6 between the major and minor. # Same limit used in Beam.__eq__ iscircular = (major - minor) / minor < 1e-6 # position angle only matters if the beam is asymmetric if not iscircular: equal = equal and (abs(((large_hdr_keywords['BPA'] % np.pi) - (pa % np.pi))) < atol_limit) if equal: continue out = deconvolve_optimized(large_hdr_keywords, {'BMAJ': major, 'BMIN': minor, 'BPA': pa}, failure_returns_pointlike=True) if np.any([ax == 0. for ax in out[:2]]): return False return True
[docs] def getMinVolEllipse(P, tolerance=1e-5, maxiter=1e5): """ Use the Khachiyan Algorithm to compute that minimum volume ellipsoid. For the purposes of finding a common beam, there is an added check that requires the center to be within the tolerance range. Adapted code from: https://github.com/minillinim/ellipsoid/blob/master/ellipsoid.py That implementation relies on the original work by Nima Moshtagh: http://www.mathworks.com/matlabcentral/fileexchange/9542 and an alternate python version from: http://cctbx.sourceforge.net/current/python/scitbx.math.minimum_covering_ellipsoid.html Parameters ---------- P : `~numpy.ndarray` Points to compute solution. tolerance : float, optional Allowed error range in the Khachiyan Algorithm. Decreasing the tolerance by an order of magnitude requires an order of magnitude more iterations to converge. maxiter : int, optional Maximum iterations. Returns ------- center : `~numpy.ndarray` Center point of the ellipse. Is required to be smaller than the tolerance. radii : `~numpy.ndarray` Radii of the ellipse. rotation : `~numpy.ndarray` Rotation matrix of the ellipse. """ N, d = P.shape d = float(d) # Q will be our working array Q = np.vstack([np.copy(P.T), np.ones(N)]) QT = Q.T # initializations err = 1.0 u = np.ones(N) / N # Khachiyan Algorithm i = 0 while err > tolerance: V = np.dot(Q, np.dot(np.diag(u), QT)) # M the diagonal vector of an NxN matrix M = np.diag(np.dot(QT, np.dot(np.linalg.inv(V), Q))) j = np.argmax(M) maximum = M[j] step_size = (maximum - d - 1.0) / ((d + 1.0) * (maximum - 1.0)) new_u = (1.0 - step_size) * u err = np.linalg.norm(new_u - u) if err <= tolerance: break new_u[j] += step_size u = new_u i += 1 if i == maxiter: raise ValueError("Reached maximum iterations without converging." " Try increasing the tolerance.") # center of the ellipse center = np.atleast_2d(np.dot(P.T, u)) # For our purposes, the centre should always be very small center_square = np.outer(center, center) if not (center_square < tolerance**2).any(): raise ValueError("The solved centre ({0}) is larger than the tolerance" " ({1}). Check the input data.".format(center, tolerance)) # the A matrix for the ellipse A = np.linalg.inv(np.dot(P.T, np.dot(np.diag(u), P)) - center_square) / d # ellip_vals = np.dot(P - center, np.dot(A, (P - center).T)) # assert (ellip_vals <= 1. + tolerance).all() # Get the values we'd like to return U, s, rotation = np.linalg.svd(A) radii = 1.0 / np.sqrt(s) radii *= 1. + tolerance return center, radii, rotation
def ellipse_edges(beam, npts=300, epsilon=1e-3): """ Return the edge points of the beam. Parameters ---------- beam : `~radio_beam.Beam` Beam object. npts : int, optional Number of samples. epsilon : float Increase the radii of the ellipse by 1 + epsilon. This is to ensure that `getMinVolEllipse` returns a marginally deconvolvable beam to within the error tolerance. Returns ------- pts : `~numpy.ndarray` The x, y coordinates of the ellipse edge. """ bpa = beam.pa.to(u.rad).value major = beam.major.to(u.deg).value * (1. + epsilon) minor = beam.minor.to(u.deg).value * (1. + epsilon) phi = np.linspace(0, 2 * np.pi, npts) x = major * np.cos(phi) y = minor * np.sin(phi) xr = x * np.cos(bpa) - y * np.sin(bpa) yr = x * np.sin(bpa) + y * np.cos(bpa) pts = np.vstack([xr, yr]) return pts
[docs] def common_manybeams_mve(beams, tolerance=1e-4, nsamps=200, epsilon=5e-4, auto_increase_epsilon=True, max_epsilon=1e-3, max_iter=10): """ Calculate a common beam size using the Khachiyan Algorithm to find the minimum enclosing ellipse from all beam edges. Parameters ---------- beams : `~radio_beam.Beams` Beams object. tolerance : float, optional Allowed error range in the Khachiyan Algorithm. Decreasing the tolerance by an order of magnitude requires an order of magnitude more iterations to converge. nsamps : int, optional Number of edge points to sample from each beam. epsilon : float, optional Increase the radii of each beam by a factor of 1 + epsilon to ensure the common beam can marginally be deconvolved for all beams. Small deviations result from the finite sampling of points and the choice of the tolerance. auto_increase_epsilon : bool, optional Re-run the algorithm when the solution cannot quite be deconvolved from from all the beams. When `True`, epsilon is slightly increased with each iteration until the common beam can be deconvolved from all beams. Default is `True`. max_epsilon : float, optional Maximum epsilon value that is acceptable. Reached with `max_iter`. Default is `1e-3`. max_iter : int, optional Maximum number of times to increase epsilon to try finding a valid common beam solution. Returns ------- com_beam : `~radio_beam.Beam` The common beam for all beams in the set. """ if not HAS_SCIPY: raise ImportError("common_manybeams_mve requires scipy.optimize.") step = 1 while True: pts = [] for beam in beams: pts.append(ellipse_edges(beam, nsamps, epsilon=epsilon)) all_pts = np.hstack(pts).T # Now find the outer edges of the convex hull. hull = ConvexHull(all_pts) edge_pts = all_pts[hull.vertices] center, radii, rotation = \ getMinVolEllipse(edge_pts, tolerance=tolerance) # The rotation matrix is coming out as: # ((sin theta, cos theta) # (cos theta, - sin theta)) pa = np.arctan2(- rotation[0, 0], rotation[1, 0]) * u.rad if pa.value == -np.pi or pa.value == np.pi: pa = 0.0 * u.rad com_beam = Beam(major=radii.max() * u.deg, minor=radii.min() * u.deg, pa=pa) # If common beam is just slightly smaller than one of the beams, # we increase epsilon to encourage a solution marginally larger # so all beams can be convolved. if auto_increase_epsilon: if not fits_in_largest(beams, com_beam): # Increase epsilon and run again epsilon += (step + 1) * (max_epsilon - epsilon) / max_iter step += 1 if step == max_iter + 1: raise BeamError("Could not increase epsilon to find" " common beam.") continue else: break else: break if not fits_in_largest(beams, com_beam): raise BeamError("Could not find common beam to deconvolve all beams.") return com_beam