From 3ee89d7a06f87d68e97eb502e45755fc66ae75d5 Mon Sep 17 00:00:00 2001 From: Joshua Hoskins Date: Tue, 26 Jul 2022 11:16:37 -0400 Subject: [PATCH] Added new function to casagui/utils/__init__.py, __convert_masks that is used to convert the input mask discitonary into an astropy region in either pixel or world cooridates. Currently supported mask shapes are 'rect' and 'polygon'. For mor info see: https://astropy-regions.readthedocs.io/en/stable/shapes.html. In the future, this function needs to support return of 'crtf' format for use with tclean. Currently this is nort supported due to a bug in the pixel version of astropy regions (https://github.com/astropy/regions/issues/471). A bug ticket has been opened. --- casagui/utils/__init__.py | 290 +++++++++++++++++++++++++++++--------- 1 file changed, 226 insertions(+), 64 deletions(-) diff --git a/casagui/utils/__init__.py b/casagui/utils/__init__.py index 1973466..655d51c 100644 --- a/casagui/utils/__init__.py +++ b/casagui/utils/__init__.py @@ -28,17 +28,24 @@ '''General utility functions used by the ``casagui`` tools and applications.''' import inspect -from itertools import groupby, chain -from socket import socket -from os import path as __path +import itertools import urllib.request import urllib.error import sys + +from itertools import groupby, chain +from socket import socket +from os import path as __path from ._ResourceManager import _ResourceManager from ._logging import get_logger +from astropy import units as u +from regions import PixCoord +from regions import RectanglePixelRegion, PolygonPixelRegion + try: from casatools import regionmanager + __have_casatools = True except ImportError: __have_casatools = False @@ -46,6 +53,7 @@ logger = get_logger() resource_manager = _ResourceManager() + def static_vars(**kwargs): '''Initialize static function variables to for use within a function. @@ -64,12 +72,15 @@ def foo(): ---------- Initialized static local variables. ''' + def decorate(func): - for k,v in kwargs.items( ): + for k, v in kwargs.items(): setattr(func, k, v) return func + return decorate + def static_dir(func): '''return a list of static variables associated with ``func`` @@ -80,7 +91,8 @@ def static_dir(func): ''' return [a for a in dir(func) if a[0] != '_'] -def path_to_url( path ): + +def path_to_url(path): '''Convert a single filesystem path to a URL. If the string specified in the ``path`` parameter exists. It is turned into a @@ -99,7 +111,8 @@ def path_to_url( path ): ''' return "file://" + __path.abspath(path) if __path.exists(path) else path -def find_ws_address( address='127.0.0.1' ): + +def find_ws_address(address='127.0.0.1'): '''Find free port on ``address`` network and return a tuple with ``address`` and port number This function uses the low level socket function to find a free port and return @@ -115,12 +128,13 @@ def find_ws_address( address='127.0.0.1' ): tuple of str and int network address (`str`) and port number (`int`) ''' - sock = socket( ) - sock.bind((address,0)) - result = sock.getsockname( ) - sock.close( ) + sock = socket() + sock.bind((address, 0)) + result = sock.getsockname() + sock.close() return result + def partition(pred, iterable): '''Split ``iterable`` into two lists based on ``pred`` predicate. ''' @@ -133,19 +147,21 @@ def partition(pred, iterable): falses.append(item) return trues, falses -def error_msg( *args, **kwargs): + +def error_msg(*args, **kwargs): '''standard method for reporting errors which do not result in aborting out of python This function takes the standard set of arguments that the python ``print`` function takes. The primary difference is that the output will go to ``stderr`` and perhaps other error logs. ''' - print( *args, file=sys.stderr, **kwargs ) + print(*args, file=sys.stderr, **kwargs) + @static_vars(msgs=dict( - casatasks='{package} is not available so interactive clean will not work' - ), reported={} ) -def warn_import( package ): + casatasks='{package} is not available so interactive clean will not work' +), reported={}) +def warn_import(package): '''standard method for reporting (optional) package import failure Parameters @@ -155,10 +171,11 @@ def warn_import( package ): ''' if package not in warn_import.reported: warn_import.reported[package] = True - error_msg( "warning, %s" % warn_import.msgs[package].format(package=package) ) + error_msg("warning, %s" % warn_import.msgs[package].format(package=package)) + @static_vars(url='http://clients3.google.com/generate_204') -def have_network( ): +def have_network(): '''check to see if an active network with general internet connectivity is available. returns ``True`` if we have internet connectivity and ``False`` if we do not. @@ -179,24 +196,19 @@ def have_network( ): except Exception: return False + def ranges(iterable, order=sorted, key=lambda x: x): '''collect elements of ``iterable`` into tuple ranges where each tuple represents a concesecutive range within the iterable. ``key`` can be used to provide ranges for other objects where ``key(element)`` returns the key to be used for sorting into ranges''' - for a, b in groupby( enumerate( order(iterable,key=key) if 'key' in inspect.getfullargspec(order).kwonlyargs else order(iterable) ), lambda pair: key(pair[1]) - pair[0] ): + for a, b in groupby(enumerate( + order(iterable, key=key) if 'key' in inspect.getfullargspec(order).kwonlyargs else order(iterable)), + lambda pair: key(pair[1]) - pair[0]): b = list(b) yield b[0][1], b[-1][1] -def index_to_stokes(index): - '''convert stokes axis index to alphabetic value''' - STOKES_MAP = [ 'I', 'Q', 'U', 'V' ] - try: - return [ STOKES_MAP[i] for i in index ] - except TypeError: - return STOKES_MAP[index] - def contiguous_ranges(iterable, order=sorted, key=lambda x: x): '''split iterable into contiguous index sequence, i.e. reduce consecutive index runs to a tuple containing the first and last. ``iterable`` can be a sequence of values that @@ -204,19 +216,158 @@ def contiguous_ranges(iterable, order=sorted, key=lambda x: x): the key to be used for ordering. ``iterable`` is ordered by ``order`` which by default sorts ``iterable``.''' unique = list(set(iterable)) - for a, b in groupby( enumerate( order(unique,key=key) if 'key' in inspect.getfullargspec(order).kwonlyargs else order(unique) ), lambda pair: key(pair[1]) - pair[0] ): + for a, b in groupby( + enumerate(order(unique, key=key) if 'key' in inspect.getfullargspec(order).kwonlyargs else order(unique)), + lambda pair: key(pair[1]) - pair[0]): b = list(b) yield b[0][1], b[-1][1] + def expand_range_incl(r): '''expand the tuple supplied as ``r`` into a range which *includes* the first and last element ``r``''' if r[0] == r[1]: return [0] else: - return range(r[0],r[1]+1) + return range(r[0], r[1] + 1) + + +def __get_wcs_object(csys: 'image.coordsys', naxis=2) -> 'image.coordsys()': + """Generate world coordinate object from coordinate system. + + Returns: + w: world coordinate object + """ + try: + import numpy as np + from astropy.wcs import WCS + + w = WCS(naxis=naxis) + + rad_to_deg = 180/np.pi + w.wcs.crpix = csys.referencepixel()['numeric'][0:2] + w.wcs.cdelt = csys.increment()['numeric'][0:2]*rad_to_deg + w.wcs.crval = csys.referencevalue()['numeric'][0:2] * rad_to_deg + w.wcs.ctype = ['RA---SIN', 'DEC--SIN'] + + return w + + except ImportError as error: + print('Error importing module:: ' + str(error)) + + except Exception as error: + print(error) + +def __index_to_stokes(index: int): + """Convert stokes axis index to alphabetic value. + + Args: + index (int): enumerated index defining stokes value. + + Returns: + str: String indicating stokes value. + """ + STOKES_MAP = ['I', 'Q', 'U', 'V'] + + try: + return [STOKES_MAP[i] for i in index] + except TypeError: + return STOKES_MAP[index] + +def __get_center_pixels(params: dict): + """Get center pixel offset value from dictionary. + + Args: + params (dict): Submask from mask grouping used by itertools + + Returns: + list: Pixel offset defined in submask. + """ + return params['d'] + +def __convert_masks(masks: dict, coord='pixel', cdesc=None)->list: + '''Convert masks in standard format (as defined by ``CubeMask.jsmask_to_raw``) into + other formats like list of CRTF, single region, etc. + + Parameters + ---------- + masks: dict + Dictionary containing ``masks`` and ``polys`` keys. The values for ``masks`` are the polygon + references for each channel and the values for ``polys`` contain the points making up each + polygon. + coord: str + Coordinate system that should be used in the returned masks. Allowed values are 'pixel'. + cdesc: dict + Dictionary containing ``csys`` and ``shape`` which describes the coordinate system to be + used for creating world coordinate coordinates. The ``shape`` is required along with the + coordinate system for coordinate conversion. + ''' + + if cdesc is None or 'csys' not in cdesc or 'shape' not in cdesc: + raise RuntimeError('region operations requires a coordinate description (cdesc parameter)') + else: + if not isinstance( cdesc['shape'], tuple ): + raise RuntimeError("coordinate description must contain a 'shape' element of type 'tuple'") + + if isinstance( cdesc['csys'], dict ): + csys = cdesc['csys'] + + full_mask_params = [] + + # Get the wcs object of the request format is world + if coord == 'world': + wcs = __get_wcs_object(cdesc['csys']) + + # Build list containing masks with channel and polaization information + # in a more accesible way for the grouping funciton + for index, mask in masks['masks'].items(): + for sub_mask in mask: + sub_mask['s'], sub_mask['c'] = index + full_mask_params.append(sub_mask) + + region_list = [] + + # Group masks by unique center pixel and determine channel range. Extract + # mask properties and build astropy region for each center pixel and geometry + # including region meta data. + for key, props in itertools.groupby(full_mask_params, __get_center_pixels): + channel_range = [] + for prop in props: + channel_range.append(prop['c']) + poly = prop['p'] + center_pixels = prop['d'] + + mask_shape = masks['polys'][poly]['type'] + + xs = masks['polys'][poly]['geometry']['xs'] + ys = masks['polys'][poly]['geometry']['ys'] + + width = max(xs) - min(xs) + height = max(ys) - min(ys) + + stokes_index = prop['s'] + + if mask_shape=='rect': + region = RectanglePixelRegion( + PixCoord(x=center_pixels[0], y=center_pixels[1]), + width=width, height=height, angle=0*u.deg + ) + elif mask_shape=='polygon': + region = PolygonPixelRegion( + vertices=PixCoord(x=xs, y=ys) + ) -def convert_masks( masks, format='crtf', coord='pixel', ret_type='str', cdesc=None ): + else: + raise RuntimeError('Invalid mask shape: {func}-->mask={mask}'.format(func=__convert_masks.__qualname__, mask=mask_shape)) + + region.meta['corr'] = __index_to_stokes(stokes_index) + region.meta['range'] = [min(channel_range), max(channel_range)] + + region_list.append(region.to_sky(wcs) if coord == 'world' else region) + + return region_list + +def convert_masks(masks, format='crtf', coord='pixel', ret_type='str', cdesc=None): '''Convert masks in standard format (as defined by ``CubeMask.jsmask_to_raw``) into other formats like list of CRTF, single region, etc. @@ -244,57 +395,64 @@ def convert_masks( masks, format='crtf', coord='pixel', ret_type='str', cdesc=No if cdesc is None or 'csys' not in cdesc or 'shape' not in cdesc: raise RuntimeError('region operations requires a coordinate description (cdesc parameter)') else: - if not isinstance( cdesc['shape'], tuple ): + if not isinstance(cdesc['shape'], tuple): raise RuntimeError("coordinate description must contain a 'shape' element of type 'tuple'") - if isinstance( cdesc['csys'], dict ): + if isinstance(cdesc['csys'], dict): csys_dict = cdesc['csys'] else: - csys_dict = cdesc['csys'].torecord( ) + csys_dict = cdesc['csys'].torecord() - unique_polygons={ } + unique_polygons = {} ### ### collect masks ordered by (poly_index,dx,dy) ### - for index,mask in masks['masks'].items( ): + for index, mask in masks['masks'].items(): for poly in mask: unique_poly = tuple([poly['p']] + poly['d']) if unique_poly in unique_polygons: - unique_polygons[unique_poly] += [ index ] + unique_polygons[unique_poly] += [index] else: - unique_polygons[unique_poly] = [ index ] + unique_polygons[unique_poly] = [index] - polygon_definitions = { index: poly for index, poly in masks['polys'].items( ) } + polygon_definitions = {index: poly for index, poly in masks['polys'].items()} if format == 'region' and coord == 'pixel': - rg = regionmanager( ) + rg = regionmanager() rg.setcoordinates(csys_dict) + ### ### create a region list given the index of the polygon used, the x/y translation, and the ### channels the region should be found on ### - def create_regions( poly_index, xlate, channels ): - def create_result( shape, points ): - return [ rg.fromtext(f"{shape}[{points}],range=[{chan_range[0]}chan,{chan_range[1]}chan],corr=[{','.join(index_to_stokes(expand_range_incl(stokes_range)))}]", shape=list(cdesc['shape']) ) - for chan_range in contiguous_ranges([c[1] for c in channels]) for stokes_range in contiguous_ranges([c[0] for c in channels]) ] + def create_regions(poly_index, xlate, channels): + def create_result(shape, points): + return [rg.fromtext( + f"{shape}[{points}],range=[{chan_range[0]}chan,{chan_range[1]}chan],corr=[{','.join(index_to_stokes(expand_range_incl(stokes_range)))}]" , + shape=list(cdesc['shape'])) + for chan_range in contiguous_ranges([c[1] for c in channels]) for stokes_range in + contiguous_ranges([c[0] for c in channels])] + if polygon_definitions[poly_index]['type'] == 'poly': - poly_pts = ','.join( [ f"[{x+xlate[0]}pix,{y+xlate[1]}pix]" - for x in polygon_definitions[poly_index]['geometry']['xs'] - for y in polygon_definitions[poly_index]['geometry']['ys'] ] ) - return create_result( 'poly', poly_pts ) + poly_pts = ','.join([f"[{x + xlate[0]}pix,{y + xlate[1]}pix]" + for x in polygon_definitions[poly_index]['geometry']['xs'] + for y in polygon_definitions[poly_index]['geometry']['ys']]) + return create_result('poly', poly_pts) if polygon_definitions[poly_index]['type'] == 'rect': xs = polygon_definitions[poly_index]['geometry']['xs'] ys = polygon_definitions[poly_index]['geometry']['ys'] - box_pts = f"[{min(xs)+xlate[0]}pix,{min(ys)+xlate[1]}pix],[{max(xs)+xlate[0]}pix,{max(ys)+xlate[1]}pix]" - return create_result( 'box', box_pts ) + box_pts = f"[{min(xs) + xlate[0]}pix,{min(ys) + xlate[1]}pix],[{max(xs) + xlate[0]}pix,{max(ys) + xlate[1]}pix]" + return create_result('box', box_pts) + ### flatten list of unique polygon regions - result = list(chain.from_iterable([ create_regions( index[0], index[1:], channels ) for index, channels in unique_polygons.items( ) ])) + result = list(chain.from_iterable( + [create_regions(index[0], index[1:], channels) for index, channels in unique_polygons.items()])) if ret_type == 'singleton': if len(result) == 0: raise RuntimeError("no regions created") if len(result) == 1: return result[0] ###return dict(enumerate(result))) ## see: CAS-13764 - return rg.makeunion(dict(map( lambda t: (str(t[0]),t[1]), list(enumerate(result)) ))) + return rg.makeunion(dict(map(lambda t: (str(t[0]), t[1]), list(enumerate(result))))) if ret_type == 'list': return result raise RuntimeError(f"unknown ret_type for region format ({ret_type})") @@ -304,25 +462,28 @@ def create_result( shape, points ): ### create a region list given the index of the polygon used, the x/y translation, and the ### channels the region should be found on ### - def create_regions( poly_index, xlate, channels ): - def create_result( shape, points ): - return [ f"{shape}[{points}],range=[{chan_range[0]}chan,{chan_range[1]}chan],corr=[{','.join(index_to_stokes(expand_range_incl(stokes_range)))}]" - for chan_range in contiguous_ranges([c[1] for c in channels]) for stokes_range in contiguous_ranges([c[0] for c in channels]) ] + def create_regions(poly_index, xlate, channels): + def create_result(shape, points): + return [ + f"{shape}[{points}],range=[{chan_range[0]}chan,{chan_range[1]}chan],corr=[{','.join(index_to_stokes(expand_range_incl(stokes_range)))}]" + for chan_range in contiguous_ranges([c[1] for c in channels]) for stokes_range in + contiguous_ranges([c[0] for c in channels])] + if polygon_definitions[poly_index]['type'] == 'poly': - poly_pts = ','.join( [ f"[{x+xlate[0]}pix,{y+xlate[1]}pix]" - for x in polygon_definitions[poly_index]['geometry']['xs'] - for y in polygon_definitions[poly_index]['geometry']['ys'] ] ) - return create_result( 'poly', poly_pts ) + poly_pts = ','.join([f"[{x + xlate[0]}pix,{y + xlate[1]}pix]" + for x in polygon_definitions[poly_index]['geometry']['xs'] + for y in polygon_definitions[poly_index]['geometry']['ys']]) + return create_result('poly', poly_pts) if polygon_definitions[poly_index]['type'] == 'rect': xs = polygon_definitions[poly_index]['geometry']['xs'] ys = polygon_definitions[poly_index]['geometry']['ys'] - box_pts = f"[{min(xs)+xlate[0]}pix,{min(ys)+xlate[1]}pix],[{max(xs)+xlate[0]}pix,{max(ys)+xlate[1]}pix]" - return create_result( 'box', box_pts ) + box_pts = f"[{min(xs) + xlate[0]}pix,{min(ys) + xlate[1]}pix],[{max(xs) + xlate[0]}pix,{max(ys) + xlate[1]}pix]" + return create_result('box', box_pts) ### ### generate CFTF ### - result = [ create_regions( index[0], index[1:], channels ) for index,channels in unique_polygons.items( ) ] + result = [create_regions(index[0], index[1:], channels) for index, channels in unique_polygons.items()] if ret_type == 'str': return '\n'.join(chain.from_iterable(result)) if ret_type == 'list': @@ -330,7 +491,8 @@ def create_result( shape, points ): raise RuntimeError(f"unknown ret_type for crtf format ({ret_type})") raise RuntimeError(f"invalid format ({format}), or coord ({coord})") -def set_attributes( obj, **kw ): + +def set_attributes(obj, **kw): '''Given an object and a set of keyword arguments, set the attributes in the object that correspond to the keywords to the specified values. @@ -346,6 +508,6 @@ def set_attributes( obj, **kw ): object ``obj`` parameter ''' - for k,v in kw.items( ): - setattr( obj, k, v ) + for k, v in kw.items(): + setattr(obj, k, v) return obj