Source code for astroquery.cadc.core

# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""
CADC
====


Module to query the Canadian Astronomy Data Centre (CADC).
"""

from astroquery import log
import warnings
import requests
from numpy import ma
from pathlib import Path
from urllib.parse import urlencode
from urllib.error import HTTPError

from ..utils.class_or_instance import class_or_instance
from ..utils import async_to_sync, commons
from ..query import BaseQuery, BaseVOQuery
from bs4 import BeautifulSoup
from astropy import units as u
from astropy.coordinates import Angle
import pyvo
from pyvo.auth import authsession

from . import conf


__all__ = ['Cadc', 'CadcClass']

CADC_COOKIE_PREFIX = 'CADC_SSO'

# TODO figure out what to do if anything about them. Some might require
# fixes on the CADC servers
warnings.filterwarnings('ignore', module='astropy.io.votable')


[docs] @async_to_sync class CadcClass(BaseVOQuery, BaseQuery): """ Class for accessing CADC data. Typical usage: result = Cadc.query_region('08h45m07.5s +54d18m00s', collection='CFHT') ... do something with result (optional) such as filter as in example below urls = Cadc.get_data_urls(result[result['target_name']=='Nr3491_1']) ... access data Other ways to query the CADC data storage: - target name: Cadc.query_region(SkyCoord.from_name('M31')) - target name in the metadata: Cadc.query_name('M31-A-6') # queries as a like '%lower(name)%' - TAP query on the CADC metadata (CAOM2 format - http://www.opencadc.org/caom2/) Cadc.get_tables() # list the tables Cadc.get_table(table_name) # list table schema Cadc.query """ CADC_REGISTRY_URL = conf.CADC_REGISTRY_URL CADCTAP_SERVICE_URI = conf.CADCTAP_SERVICE_URI CADCDATALINK_SERVICE_URI = conf.CADCDATLINK_SERVICE_URI CADCLOGIN_SERVICE_URI = conf.CADCLOGIN_SERVICE_URI TIMEOUT = conf.TIMEOUT def __init__(self, *, url=None, auth_session=None): """ Initialize Cadc object Parameters ---------- url : str, optional, default 'None; a url to use instead of the default auth_session: `requests.Session` or `pyvo.auth.authsession.AuthSession` A existing authenticated session containing the appropriate credentials to be used by the client to communicate with the server. This is an alternative to using login/logout methods that allows clients to reuse existing session with multiple services. Returns ------- Cadc object """ super().__init__() self.baseurl = url # _auth_session contains the credentials that are used by both # the cadc tap and cadc datalink services if auth_session: self._auth_session = auth_session else: self._auth_session = authsession.AuthSession() @property def cadctap(self): if not hasattr(self, '_cadctap'): if self.baseurl is None: self.baseurl = get_access_url(self.CADCTAP_SERVICE_URI) # remove capabilities endpoint to get to the service url self.baseurl = self.baseurl.rstrip('capabilities') self._cadctap = pyvo.dal.TAPService( self.baseurl, session=self._auth_session) else: self._cadctap = pyvo.dal.TAPService( self.baseurl, session=self._auth_session) return self._cadctap @property def cadcdatalink(self): if not hasattr(self, '_datalink'): self._datalink = pyvo.dal.adhoc.DatalinkService( self.data_link_url, session=self._auth_session) return self._datalink @property def data_link_url(self): if not hasattr(self, '_data_link_url'): self._data_link_url = get_access_url( self.CADCDATALINK_SERVICE_URI, capability="ivo://ivoa.net/std/DataLink#links-1.0") return self._data_link_url
[docs] def login(self, *, user=None, password=None, certificate_file=None): """ login allows user to authenticate to the service. Both user/password and https client certificates are supported. Alternatively, the Cadc class can be instantiated with an authenticated session. Parameters ---------- user : str, required if certificate is None username to login with password : str, required if user is set password to login with certificate : str, required if user is None path to certificate to use with logging in """ # start with a new session if not isinstance(self.cadctap._session, (requests.Session, authsession.AuthSession)): raise AttributeError('Cannot login with user provided session that is ' 'not an pyvo.authsession.AuthSession or ' 'requests.Session') if not certificate_file and not (user and password): raise AttributeError('login credentials missing (user/password ' 'or certificate)') if certificate_file: if isinstance(self.cadctap._session, authsession.AuthSession): self.cadctap._session.credentials.\ set_client_certificate(certificate_file) else: # if the session was already used to call CADC, requests caches # it without using the cert. Therefore need to close all # existing https sessions first. https_adapter = self.cadctap._session.adapters['https://'] if https_adapter: https_adapter.close() self.cadctap._session.cert = certificate_file if user and password: login_url = get_access_url(self.CADCLOGIN_SERVICE_URI, capability='ivo://ivoa.net/std/UMS#login-0.1') if login_url is None: raise RuntimeError("No login URL") # need to login and get a cookie args = { "username": str(user), "password": str(password)} header = { "Content-type": "application/x-www-form-urlencoded", "Accept": "text/plain" } response = self._request(method='POST', url=login_url, data=args, headers=header, cache=False) try: response.raise_for_status() except Exception as e: log.error('Logging error: {}'.format(e)) raise e # extract cookie cookie = '"{}"'.format(response.text) if cookie is not None: if isinstance(self.cadctap._session, authsession.AuthSession): self.cadctap._session.credentials.set_cookie( CADC_COOKIE_PREFIX, cookie) else: self.cadctap._session.cookies.set( CADC_COOKIE_PREFIX, cookie)
[docs] def logout(self): """ Logout. Anonymous access with all the subsequent use of the object. Note that the original session is not affected (in case it was passed when the object was first instantiated) """ if isinstance(self._auth_session, pyvo.auth.AuthSession): # Remove the existing credentials (if any) # PyVO should provide this reset credentials functionality # TODO - this should be implemented in PyVO to avoid this deep # intrusion into that package self._auth_session.credentials.credentials = \ {key: value for (key, value) in self._auth_session.credentials.credentials.items() if key == pyvo.auth.securitymethods.ANONYMOUS} elif isinstance(self._auth_session, requests.Session): # the only way to ensure complete logout is to start with a new # session. This is mainly because of certificates. Removing cert # argument to a session already in use does not force it to # re-do the HTTPS hand shake self._auth_session = requests.Session() self.cadctap._session = self._auth_session self.cadcdatalink._session = self._auth_session else: raise RuntimeError( 'Do not know how to log out from custom session')
[docs] @class_or_instance def query_region_async(self, coordinates, *, radius=0.016666666666667*u.deg, collection=None, get_query_payload=False): """ Queries the CADC for a region around the specified coordinates. Parameters ---------- coordinates : str or `astropy.coordinates`. coordinates around which to query radius : str or `astropy.units.Quantity`. the radius of the cone search collection: Name of the CADC collection to query, optional get_query_payload : bool, optional Just return the dict of HTTP request parameters. Returns ------- response : `requests.Response` The HTTP response returned from the service. All async methods should return the raw HTTP response. """ if isinstance(radius, (int, float)): warnings.warn('Radius should be of type str or ' '`astropy.units.Quantity`') radius = radius * u.deg request_payload = self._args_to_payload(coordinates=coordinates, radius=radius, collection=collection) # primarily for debug purposes, but also useful if you want to send # someone a URL linking directly to the data if get_query_payload: return request_payload response = self.exec_sync(request_payload['query']) return response
[docs] @class_or_instance def query_name_async(self, name): """ Query CADC metadata for a name and return the corresponding metadata in the CAOM2 format (http://www.opencadc.org/caom2/). Parameters ---------- name : str name of object to query for Returns ------- response : `~astropy.table.Table` Results of the query in a tabular format. """ response = self.exec_sync( "select * from caom2.Observation o join caom2.Plane p " "on o.obsID=p.obsID where lower(target_name) like '%{}%'". format(name.lower())) return response
[docs] @class_or_instance def get_collections(self): """ Query CADC for all the hosted collections Returns ------- A dictionary of collections hosted at the CADC where the key is the collection and value represents details of that collection. """ response = self.exec_sync( 'select distinct collection, energy_emBand from caom2.EnumField') collections = {} for row in response: if row['collection'] not in collections: collection = { 'Description': 'The {} collection at the CADC'. format(row['collection']), 'Bands': []} if row['energy_emBand'] is not ma.masked: collection['Bands'].append(row['energy_emBand']) collections[row['collection']] = collection elif row['energy_emBand'] is not ma.masked: collections[row['collection']]['Bands'].\ append(row['energy_emBand']) return collections
[docs] @class_or_instance def get_images(self, coordinates, radius, *, collection=None, get_url_list=False, show_progress=False): """ A coordinate-based query function that returns a list of fits files with cutouts around the passed in coordinates. Parameters ---------- coordinates : str or `astropy.coordinates`. Coordinates around which to query. radius : str or `astropy.units.Quantity` The radius of the cone search AND cutout area. collection : str, optional Name of the CADC collection to query. get_url_list : bool, optional If ``True``, returns the list of data urls rather than the downloaded FITS files. Default is ``False``. show_progress : bool, optional Whether to display a progress bar if the file is downloaded from a remote server. Default is ``False``. Returns ------- list : A list of `~astropy.io.fits.HDUList` objects (or a list of str if returning urls). """ filenames = self.get_images_async(coordinates, radius, collection=collection, get_url_list=get_url_list, show_progress=show_progress) if get_url_list: return filenames images = [] for fn in filenames: try: images.append(fn.get_fits()) except (requests.exceptions.HTTPError, HTTPError) as err: # Catch HTTPError if user is unauthorized to access file log.debug( "{} - Problem retrieving the file: {}". format(str(err), str(err.url))) pass return images
[docs] def get_images_async(self, coordinates, radius, *, collection=None, get_url_list=False, show_progress=False): """ A coordinate-based query function that returns a list of context managers with cutouts around the passed in coordinates. Parameters ---------- coordinates : str or `astropy.coordinates`. Coordinates around which to query. radius : str or `astropy.units.Quantity` The radius of the cone search AND cutout area. collection : str, optional Name of the CADC collection to query. get_url_list : bool, optional If ``True``, returns the list of data urls rather than the list of context managers. Default is ``False``. show_progress : bool, optional Whether to display a progress bar if the file is downloaded from a remote server. Default is ``False``. Returns ------- list : A list of context-managers that yield readable file-like objects """ request_payload = self._args_to_payload(coordinates=coordinates, radius=radius, collection=collection, data_product_type='image') query_result = self.exec_sync(request_payload['query']) images_urls = self.get_image_list(query_result, coordinates, radius) if get_url_list: return images_urls return [commons.FileContainer(url, encoding='binary', show_progress=show_progress) for url in images_urls]
[docs] def get_image_list(self, query_result, coordinates, radius): """ Function to map the results of a CADC query into URLs to corresponding data and cutouts that can be later downloaded. The function uses the IVOA DataLink Service (http://www.ivoa.net/documents/DataLink/) implemented at the CADC. It works directly with the results produced by `query_region` and `query_name` but in principle it can work with other query results produced with the Cadc query as long as the results contain the 'publisherID' column. This column is part of the 'caom2.Plane' table. Parameters ---------- query_result : A `~astropy.table.Table` object Result returned by `query_region` or `query_name`. In general, the result of any CADC TAP query that contains the 'publisherID' column can be used here. coordinates : str or `astropy.coordinates`. Center of the cutout area. radius : str or `astropy.units.Quantity`. The radius of the cutout area. Returns ------- list : A list of URLs to cutout data. """ if not query_result: raise AttributeError('Missing query_result argument') parsed_coordinates = commons.parse_coordinates(coordinates).fk5 radius_deg = Angle(radius).to_value(u.deg) ra = parsed_coordinates.ra.degree dec = parsed_coordinates.dec.degree cutout_params = {'POS': 'CIRCLE {} {} {}'.format(ra, dec, radius_deg)} try: publisher_ids = query_result['publisherID'] except KeyError: raise AttributeError( 'publisherID column missing from query_result argument') result = [] # Send datalink requests in batches of 20 publisher ids batch_size = 20 # Iterate through list of sublists to send datalink requests in batches for pid_sublist in (publisher_ids[pos:pos + batch_size] for pos in range(0, len(publisher_ids), batch_size)): datalink = pyvo.dal.adhoc.DatalinkResults.from_result_url( '{}?{}'.format(self.data_link_url, urlencode({'ID': pid_sublist}, True)), session=self.cadcdatalink._session) for service_def in datalink.bysemantics('#cutout'): access_url = service_def.access_url if '/sync' in access_url: service_params = service_def.input_params input_params = {param.name: param.value for param in service_params if param.name in ['ID', 'RUNID']} input_params.update(cutout_params) result.append('{}?{}'.format(access_url, urlencode(input_params))) return result
[docs] @class_or_instance def get_data_urls(self, query_result, *, include_auxiliaries=False): """ Function to map the results of a CADC query into URLs to corresponding data that can be later downloaded. The function uses the IVOA DataLink Service (http://www.ivoa.net/documents/DataLink/) implemented at the CADC. It works directly with the results produced by `query_region` and `query_name` but in principle it can work with other query results produced with the Cadc query as long as the results contain the 'publisherID' column. This column is part of the 'caom2.Plane' table. Parameters ---------- query_result : A `~astropy.table.Table` object Result returned by `query_region` or `query_name`. In general, the result of any CADC TAP query that contains the 'publisherID' column can be use here. include_auxiliaries : boolean ``True`` to return URLs to auxiliary files such as previews, ``False`` otherwise Returns ------- A list of URLs to data. """ if not query_result: raise AttributeError('Missing metadata argument') try: publisher_ids = query_result['publisherID'] except KeyError: raise AttributeError( 'publisherID column missing from query_result argument') result = [] # Send datalink requests in batches of 20 publisher ids batch_size = 20 # Iterate through list of sublists to send datalink requests in batches for pid_sublist in (publisher_ids[pos:pos + batch_size] for pos in range(0, len(publisher_ids), batch_size)): # REQUEST=download-only is a CADC optimization to restrict # results to downloadable URLs as opposed to redirects # to other services such as cutouts that are not required datalink = pyvo.dal.adhoc.DatalinkResults.from_result_url( '{}?{}'.format(self.data_link_url, urlencode({'ID': pid_sublist, 'REQUEST': 'downloads-only'}, True)), session=self.cadcdatalink._session) for service_def in datalink: if service_def.semantics in ['http://www.opencadc.org/caom2#pkg', '#package']: # TODO http://www.openadc.org/caom2#pkg has been replaced # by "package". Removed it after CADC rolls out the change # package is an alternative for downloading multiple # data files in a tar file as an alternative to separate # downloads. It doesn't make much sense in this case so # filter it out. continue if not include_auxiliaries \ and service_def.semantics != '#this': continue result.append(service_def.access_url) return result
[docs] def get_tables(self, *, only_names=False): """ Gets all public tables Parameters ---------- only_names : bool, optional, default False True to load table names only Returns ------- A list of table objects """ table_set = self.cadctap.tables if only_names: return list(table_set.keys()) else: return list(table_set.values())
[docs] def get_table(self, table): """ Gets the specified table Parameters ---------- table : str, mandatory full qualified table name (i.e. schema name + table name) Returns ------- A table object """ tables = self.get_tables() for t in tables: if table == t.name: return t
[docs] def exec_sync(self, query, *, maxrec=None, uploads=None, output_file=None, output_format='votable'): """ Run a query and return the results or save them in an output_file Parameters ---------- query : str, mandatory SQL to execute maxrec : int the maximum records to return. defaults to the service default uploads : Temporary tables to upload and run with the queries output_file : str, Path, or file handler File to save the results to output_format : Format of the output (default is basic). Must be one of the formats supported by `astropy.table` Returns ------- Results of running the query in (for now) votable format Notes ----- Support for other output formats (tsv, csv) to be added as soon as they are available in pyvo. """ response = self.cadctap.search(query, language='ADQL', uploads=uploads) result = response.to_table() if output_file: if isinstance(output_file, str): fname = output_file elif isinstance(output_file, Path): # Merge this case into the str once astropy is >=5.1 fname = str(output_file) elif hasattr(output_file, 'name'): fname = output_file.name else: raise AttributeError('Not a valid file name, Path, or file handler') result.write(fname, format=output_format, overwrite=True) return result
[docs] def create_async(self, query, *, maxrec=None, uploads=None): """ Creates a TAP job to execute and returns it to the caller. The caller then can start the execution and monitor the job. Typical (no error handling) sequence of events: job = create_async(query) job = job.run().wait() job.raise_if_error() result = job.fetch_result() job.delete() # optional See ``pyvo.dal.tap`` for details about the ``AsyncTAPJob`` Parameters ---------- query : str, mandatory SQL to execute maxrec : int the maximum records to return. defaults to the service default uploads: Temporary tables to upload and run with the queries output_file: str or file handler: File to save the results to Returns ------- AsyncTAPJob the query instance Notes ----- Support for other output formats (tsv, csv) to be added as soon as they are available in pyvo. """ return self.cadctap.submit_job(query, language='ADQL', uploads=uploads)
[docs] def load_async_job(self, jobid): """ Loads an asynchronous job Parameters ---------- jobid : str, mandatory job identifier Returns ------- A Job object """ return pyvo.dal.AsyncTAPJob('{}/async/{}'.format( self.cadctap.baseurl, jobid), session=self._auth_session)
[docs] def list_async_jobs(self, *, phases=None, after=None, last=None, short_description=True): """ Returns all the asynchronous jobs Parameters ---------- phases : list of str Union of job phases to filter the results by. after : datetime Return only jobs created after this datetime last : int Return only the most recent number of jobs short_description : flag - True or False If True, the jobs in the list will contain only the information corresponding to the TAP ShortJobDescription object (job ID, phase, run ID, owner ID and creation ID) whereas if False, a separate GET call to each job is performed for the complete job description Returns ------- A list of Job objects """ return self.cadctap.get_job_list(phases=phases, after=after, last=last, short_description=short_description)
def _parse_result(self, result, *, verbose=None): return result def _args_to_payload(self, *args, **kwargs): # convert arguments to a valid requests payload # and force the coordinates to FK5 (assuming FK5/ICRS are # interchangeable) since RA/Dec are used below coordinates = commons.parse_coordinates(kwargs['coordinates']).fk5 radius_deg = Angle(kwargs["radius"]).to_value(u.deg) payload = {format: 'VOTable'} payload['query'] = \ "SELECT * from caom2.Observation o join caom2.Plane p " \ "ON o.obsID=p.obsID " \ "WHERE INTERSECTS( " \ "CIRCLE('ICRS', {}, {}, {}), position_bounds) = 1 AND " \ "(quality_flag IS NULL OR quality_flag != 'junk')".\ format(coordinates.ra.degree, coordinates.dec.degree, radius_deg) if 'collection' in kwargs and kwargs['collection']: payload['query'] = "{} AND collection='{}'".\ format(payload['query'], kwargs['collection']) if 'data_product_type' in kwargs and kwargs['data_product_type']: payload['query'] = "{} AND dataProductType='{}'".\ format(payload['query'], kwargs['data_product_type']) return payload
def static_vars(**kwargs): def decorate(func): for k in kwargs: setattr(func, k, kwargs[k]) return func return decorate @static_vars(caps={}) def get_access_url(service, *, capability=None): """ Returns the URL corresponding to a service by doing a lookup in the cadc registry. It returns the access URL corresponding to cookie authentication. Parameters ---------- service : str the service the capability belongs to. It can be identified by a CADC uri ('ivo://cadc.nrc.ca/) which is looked up in the CADC registry or by the URL where the service capabilities is found. capability : str uri representing the capability for which the access url is sought Returns ------- The access url Note ------ This function implements the functionality of a CADC registry as defined by the IVOA. It should be eventually moved to its own directory. Caching should be considered to reduce the number of remote calls to CADC registry """ caps_url = '' if service.startswith('http'): if not capability: return service caps_url = service else: # get caps from the CADC registry if not get_access_url.caps: try: response = requests.get(conf.CADC_REGISTRY_URL) response.raise_for_status() except requests.exceptions.HTTPError as err: log.debug( "ERROR getting the CADC registry: {}".format(str(err))) raise err for line in response.text.splitlines(): if len(line) > 0 and not line.startswith('#'): service_id, capabilies_url = line.split('=') get_access_url.caps[service_id.strip()] = \ capabilies_url.strip() # lookup the service service_uri = service if not service.startswith('ivo'): # assume short form of CADC service service_uri = 'ivo://cadc.nrc.ca/{}'.format(service) if service_uri not in get_access_url.caps: raise AttributeError( "Cannot find the capabilities of service {}".format(service)) # look up in the CADC reg for the service capabilities caps_url = get_access_url.caps[service_uri] if not capability: return caps_url try: response2 = requests.get(caps_url) response2.raise_for_status() except Exception as e: log.debug( "ERROR getting the service capabilities: {}".format(str(e))) raise e soup = BeautifulSoup(response2.text, features="html5lib") for cap in soup.find_all('capability'): if cap.get("standardid", None) == capability: if len(cap.find_all('interface')) == 1: return cap.find_all('interface')[0].accessurl.text for i in cap.find_all('interface'): if hasattr(i, 'securitymethod'): sm = i.securitymethod if not sm or sm.get("standardid", None) is None or\ sm['standardid'] == "ivo://ivoa.net/sso#cookie": return i.accessurl.text raise RuntimeError("ERROR - capability {} not found or not working with " "anonymous or cookie access".format(capability)) Cadc = CadcClass()