# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""
CADC
====
Module to query the Canadian Astronomy Data Centre (CADC).
"""
from astroquery import log
import warnings
import requests
from numpy import ma
from pathlib import Path
from urllib.parse import urlencode
from urllib.error import HTTPError
from ..utils.class_or_instance import class_or_instance
from ..utils import async_to_sync, commons
from ..query import BaseQuery, BaseVOQuery
from bs4 import BeautifulSoup
from astropy import units as u
from astropy.coordinates import Angle
import pyvo
from pyvo.auth import authsession
from . import conf
__all__ = ['Cadc', 'CadcClass']
CADC_COOKIE_PREFIX = 'CADC_SSO'
# TODO figure out what to do if anything about them. Some might require
# fixes on the CADC servers
warnings.filterwarnings('ignore', module='astropy.io.votable')
[docs]
@async_to_sync
class CadcClass(BaseVOQuery, BaseQuery):
"""
Class for accessing CADC data. Typical usage:
result = Cadc.query_region('08h45m07.5s +54d18m00s', collection='CFHT')
... do something with result (optional) such as filter as in example below
urls = Cadc.get_data_urls(result[result['target_name']=='Nr3491_1'])
... access data
Other ways to query the CADC data storage:
- target name:
Cadc.query_region(SkyCoord.from_name('M31'))
- target name in the metadata:
Cadc.query_name('M31-A-6') # queries as a like '%lower(name)%'
- TAP query on the CADC metadata (CAOM2 format -
http://www.opencadc.org/caom2/)
Cadc.get_tables() # list the tables
Cadc.get_table(table_name) # list table schema
Cadc.query
"""
CADC_REGISTRY_URL = conf.CADC_REGISTRY_URL
CADCTAP_SERVICE_URI = conf.CADCTAP_SERVICE_URI
CADCDATALINK_SERVICE_URI = conf.CADCDATLINK_SERVICE_URI
CADCLOGIN_SERVICE_URI = conf.CADCLOGIN_SERVICE_URI
TIMEOUT = conf.TIMEOUT
def __init__(self, *, url=None, auth_session=None):
"""
Initialize Cadc object
Parameters
----------
url : str, optional, default 'None;
a url to use instead of the default
auth_session: `requests.Session` or `pyvo.auth.authsession.AuthSession`
A existing authenticated session containing the appropriate
credentials to be used by the client to communicate with the
server. This is an alternative to using login/logout methods that
allows clients to reuse existing session with multiple services.
Returns
-------
Cadc object
"""
super().__init__()
self.baseurl = url
# _auth_session contains the credentials that are used by both
# the cadc tap and cadc datalink services
if auth_session:
self._auth_session = auth_session
else:
self._auth_session = authsession.AuthSession()
@property
def cadctap(self):
if not hasattr(self, '_cadctap'):
if self.baseurl is None:
self.baseurl = get_access_url(self.CADCTAP_SERVICE_URI)
# remove capabilities endpoint to get to the service url
self.baseurl = self.baseurl.rstrip('capabilities')
self._cadctap = pyvo.dal.TAPService(
self.baseurl, session=self._auth_session)
else:
self._cadctap = pyvo.dal.TAPService(
self.baseurl, session=self._auth_session)
return self._cadctap
@property
def cadcdatalink(self):
if not hasattr(self, '_datalink'):
self._datalink = pyvo.dal.adhoc.DatalinkService(
self.data_link_url, session=self._auth_session)
return self._datalink
@property
def data_link_url(self):
if not hasattr(self, '_data_link_url'):
self._data_link_url = get_access_url(
self.CADCDATALINK_SERVICE_URI,
capability="ivo://ivoa.net/std/DataLink#links-1.0")
return self._data_link_url
[docs]
def login(self, *, user=None, password=None, certificate_file=None):
"""
login allows user to authenticate to the service. Both user/password
and https client certificates are supported.
Alternatively, the Cadc class can be instantiated with an
authenticated session.
Parameters
----------
user : str, required if certificate is None
username to login with
password : str, required if user is set
password to login with
certificate : str, required if user is None
path to certificate to use with logging in
"""
# start with a new session
if not isinstance(self.cadctap._session, (requests.Session,
authsession.AuthSession)):
raise AttributeError('Cannot login with user provided session that is '
'not an pyvo.authsession.AuthSession or '
'requests.Session')
if not certificate_file and not (user and password):
raise AttributeError('login credentials missing (user/password '
'or certificate)')
if certificate_file:
if isinstance(self.cadctap._session, authsession.AuthSession):
self.cadctap._session.credentials.\
set_client_certificate(certificate_file)
else:
# if the session was already used to call CADC, requests caches
# it without using the cert. Therefore need to close all
# existing https sessions first.
https_adapter = self.cadctap._session.adapters['https://']
if https_adapter:
https_adapter.close()
self.cadctap._session.cert = certificate_file
if user and password:
login_url = get_access_url(self.CADCLOGIN_SERVICE_URI,
capability='ivo://ivoa.net/std/UMS#login-0.1')
if login_url is None:
raise RuntimeError("No login URL")
# need to login and get a cookie
args = {
"username": str(user),
"password": str(password)}
header = {
"Content-type": "application/x-www-form-urlencoded",
"Accept": "text/plain"
}
response = self._request(method='POST', url=login_url, data=args,
headers=header, cache=False)
try:
response.raise_for_status()
except Exception as e:
log.error('Logging error: {}'.format(e))
raise e
# extract cookie
cookie = '"{}"'.format(response.text)
if cookie is not None:
if isinstance(self.cadctap._session, authsession.AuthSession):
self.cadctap._session.credentials.set_cookie(
CADC_COOKIE_PREFIX, cookie)
else:
self.cadctap._session.cookies.set(
CADC_COOKIE_PREFIX, cookie)
[docs]
def logout(self):
"""
Logout. Anonymous access with all the subsequent use of the
object. Note that the original session is not affected (in case
it was passed when the object was first instantiated)
"""
if isinstance(self._auth_session, pyvo.auth.AuthSession):
# Remove the existing credentials (if any)
# PyVO should provide this reset credentials functionality
# TODO - this should be implemented in PyVO to avoid this deep
# intrusion into that package
self._auth_session.credentials.credentials = \
{key: value for (key, value) in self._auth_session.credentials.credentials.items()
if key == pyvo.auth.securitymethods.ANONYMOUS}
elif isinstance(self._auth_session, requests.Session):
# the only way to ensure complete logout is to start with a new
# session. This is mainly because of certificates. Removing cert
# argument to a session already in use does not force it to
# re-do the HTTPS hand shake
self._auth_session = requests.Session()
self.cadctap._session = self._auth_session
self.cadcdatalink._session = self._auth_session
else:
raise RuntimeError(
'Do not know how to log out from custom session')
[docs]
@class_or_instance
def query_region_async(self, coordinates, *, radius=0.016666666666667*u.deg,
collection=None,
get_query_payload=False):
"""
Queries the CADC for a region around the specified coordinates.
Parameters
----------
coordinates : str or `astropy.coordinates`.
coordinates around which to query
radius : str or `astropy.units.Quantity`.
the radius of the cone search
collection: Name of the CADC collection to query, optional
get_query_payload : bool, optional
Just return the dict of HTTP request parameters.
Returns
-------
response : `requests.Response`
The HTTP response returned from the service.
All async methods should return the raw HTTP response.
"""
if isinstance(radius, (int, float)):
warnings.warn('Radius should be of type str or '
'`astropy.units.Quantity`')
radius = radius * u.deg
request_payload = self._args_to_payload(coordinates=coordinates,
radius=radius,
collection=collection)
# primarily for debug purposes, but also useful if you want to send
# someone a URL linking directly to the data
if get_query_payload:
return request_payload
response = self.exec_sync(request_payload['query'])
return response
[docs]
@class_or_instance
def query_name_async(self, name):
"""
Query CADC metadata for a name and return the corresponding metadata in
the CAOM2 format (http://www.opencadc.org/caom2/).
Parameters
----------
name : str
name of object to query for
Returns
-------
response : `~astropy.table.Table`
Results of the query in a tabular format.
"""
response = self.exec_sync(
"select * from caom2.Observation o join caom2.Plane p "
"on o.obsID=p.obsID where lower(target_name) like '%{}%'".
format(name.lower()))
return response
[docs]
@class_or_instance
def get_collections(self):
"""
Query CADC for all the hosted collections
Returns
-------
A dictionary of collections hosted at the CADC where the key is the
collection and value represents details of that collection.
"""
response = self.exec_sync(
'select distinct collection, energy_emBand from caom2.EnumField')
collections = {}
for row in response:
if row['collection'] not in collections:
collection = {
'Description': 'The {} collection at the CADC'.
format(row['collection']), 'Bands': []}
if row['energy_emBand'] is not ma.masked:
collection['Bands'].append(row['energy_emBand'])
collections[row['collection']] = collection
elif row['energy_emBand'] is not ma.masked:
collections[row['collection']]['Bands'].\
append(row['energy_emBand'])
return collections
[docs]
@class_or_instance
def get_images(self, coordinates, radius, *,
collection=None,
get_url_list=False,
show_progress=False):
"""
A coordinate-based query function that returns a list of
fits files with cutouts around the passed in coordinates.
Parameters
----------
coordinates : str or `astropy.coordinates`.
Coordinates around which to query.
radius : str or `astropy.units.Quantity`
The radius of the cone search AND cutout area.
collection : str, optional
Name of the CADC collection to query.
get_url_list : bool, optional
If ``True``, returns the list of data urls rather than
the downloaded FITS files. Default is ``False``.
show_progress : bool, optional
Whether to display a progress bar if the file is downloaded
from a remote server. Default is ``False``.
Returns
-------
list : A list of `~astropy.io.fits.HDUList` objects (or a list of
str if returning urls).
"""
filenames = self.get_images_async(coordinates, radius, collection=collection,
get_url_list=get_url_list, show_progress=show_progress)
if get_url_list:
return filenames
images = []
for fn in filenames:
try:
images.append(fn.get_fits())
except (requests.exceptions.HTTPError, HTTPError) as err:
# Catch HTTPError if user is unauthorized to access file
log.debug(
"{} - Problem retrieving the file: {}".
format(str(err), str(err.url)))
pass
return images
[docs]
def get_images_async(self, coordinates, radius, *, collection=None,
get_url_list=False, show_progress=False):
"""
A coordinate-based query function that returns a list of
context managers with cutouts around the passed in coordinates.
Parameters
----------
coordinates : str or `astropy.coordinates`.
Coordinates around which to query.
radius : str or `astropy.units.Quantity`
The radius of the cone search AND cutout area.
collection : str, optional
Name of the CADC collection to query.
get_url_list : bool, optional
If ``True``, returns the list of data urls rather than
the list of context managers. Default is ``False``.
show_progress : bool, optional
Whether to display a progress bar if the file is downloaded
from a remote server. Default is ``False``.
Returns
-------
list : A list of context-managers that yield readable file-like objects
"""
request_payload = self._args_to_payload(coordinates=coordinates,
radius=radius,
collection=collection,
data_product_type='image')
query_result = self.exec_sync(request_payload['query'])
images_urls = self.get_image_list(query_result, coordinates, radius)
if get_url_list:
return images_urls
return [commons.FileContainer(url, encoding='binary',
show_progress=show_progress)
for url in images_urls]
[docs]
def get_image_list(self, query_result, coordinates, radius):
"""
Function to map the results of a CADC query into URLs to
corresponding data and cutouts that can be later downloaded.
The function uses the IVOA DataLink Service
(http://www.ivoa.net/documents/DataLink/) implemented at the CADC.
It works directly with the results produced by `query_region` and
`query_name` but in principle it can work with other query
results produced with the Cadc query as long as the results
contain the 'publisherID' column. This column is part of the
'caom2.Plane' table.
Parameters
----------
query_result : A `~astropy.table.Table` object
Result returned by `query_region` or
`query_name`. In general, the result of any
CADC TAP query that contains the 'publisherID'
column can be used here.
coordinates : str or `astropy.coordinates`.
Center of the cutout area.
radius : str or `astropy.units.Quantity`.
The radius of the cutout area.
Returns
-------
list : A list of URLs to cutout data.
"""
if not query_result:
raise AttributeError('Missing query_result argument')
parsed_coordinates = commons.parse_coordinates(coordinates).fk5
radius_deg = Angle(radius).to_value(u.deg)
ra = parsed_coordinates.ra.degree
dec = parsed_coordinates.dec.degree
cutout_params = {'POS': 'CIRCLE {} {} {}'.format(ra, dec, radius_deg)}
try:
publisher_ids = query_result['publisherID']
except KeyError:
raise AttributeError(
'publisherID column missing from query_result argument')
result = []
# Send datalink requests in batches of 20 publisher ids
batch_size = 20
# Iterate through list of sublists to send datalink requests in batches
for pid_sublist in (publisher_ids[pos:pos + batch_size] for pos in
range(0, len(publisher_ids), batch_size)):
datalink = pyvo.dal.adhoc.DatalinkResults.from_result_url(
'{}?{}'.format(self.data_link_url,
urlencode({'ID': pid_sublist}, True)),
session=self.cadcdatalink._session)
for service_def in datalink.bysemantics('#cutout'):
access_url = service_def.access_url
if '/sync' in access_url:
service_params = service_def.input_params
input_params = {param.name: param.value
for param in service_params if
param.name in ['ID', 'RUNID']}
input_params.update(cutout_params)
result.append('{}?{}'.format(access_url,
urlencode(input_params)))
return result
[docs]
@class_or_instance
def get_data_urls(self, query_result, *, include_auxiliaries=False):
"""
Function to map the results of a CADC query into URLs to
corresponding data that can be later downloaded.
The function uses the IVOA DataLink Service
(http://www.ivoa.net/documents/DataLink/) implemented at the CADC.
It works directly with the results produced by `query_region` and
`query_name` but in principle it can work with other query
results produced with the Cadc query as long as the results
contain the 'publisherID' column. This column is part of the
'caom2.Plane' table.
Parameters
----------
query_result : A `~astropy.table.Table` object
Result returned by `query_region` or
`query_name`. In general, the result of any
CADC TAP query that contains the 'publisherID' column
can be use here.
include_auxiliaries : boolean
``True`` to return URLs to auxiliary files such as
previews, ``False`` otherwise
Returns
-------
A list of URLs to data.
"""
if not query_result:
raise AttributeError('Missing metadata argument')
try:
publisher_ids = query_result['publisherID']
except KeyError:
raise AttributeError(
'publisherID column missing from query_result argument')
result = []
# Send datalink requests in batches of 20 publisher ids
batch_size = 20
# Iterate through list of sublists to send datalink requests in batches
for pid_sublist in (publisher_ids[pos:pos + batch_size] for pos in
range(0, len(publisher_ids), batch_size)):
# REQUEST=download-only is a CADC optimization to restrict
# results to downloadable URLs as opposed to redirects
# to other services such as cutouts that are not required
datalink = pyvo.dal.adhoc.DatalinkResults.from_result_url(
'{}?{}'.format(self.data_link_url,
urlencode({'ID': pid_sublist,
'REQUEST': 'downloads-only'}, True)),
session=self.cadcdatalink._session)
for service_def in datalink:
if service_def.semantics in ['http://www.opencadc.org/caom2#pkg', '#package']:
# TODO http://www.openadc.org/caom2#pkg has been replaced
# by "package". Removed it after CADC rolls out the change
# package is an alternative for downloading multiple
# data files in a tar file as an alternative to separate
# downloads. It doesn't make much sense in this case so
# filter it out.
continue
if not include_auxiliaries \
and service_def.semantics != '#this':
continue
result.append(service_def.access_url)
return result
[docs]
def get_tables(self, *, only_names=False):
"""
Gets all public tables
Parameters
----------
only_names : bool, optional, default False
True to load table names only
Returns
-------
A list of table objects
"""
table_set = self.cadctap.tables
if only_names:
return list(table_set.keys())
else:
return list(table_set.values())
[docs]
def get_table(self, table):
"""
Gets the specified table
Parameters
----------
table : str, mandatory
full qualified table name (i.e. schema name + table name)
Returns
-------
A table object
"""
tables = self.get_tables()
for t in tables:
if table == t.name:
return t
[docs]
def exec_sync(self, query, *, maxrec=None, uploads=None, output_file=None,
output_format='votable'):
"""
Run a query and return the results or save them in an output_file
Parameters
----------
query : str, mandatory
SQL to execute
maxrec : int
the maximum records to return. defaults to the service default
uploads :
Temporary tables to upload and run with the queries
output_file : str, Path, or file handler
File to save the results to
output_format :
Format of the output (default is basic). Must be one
of the formats supported by `astropy.table`
Returns
-------
Results of running the query in (for now) votable format
Notes
-----
Support for other output formats (tsv, csv) to be added as soon
as they are available in pyvo.
"""
response = self.cadctap.search(query, language='ADQL',
uploads=uploads)
result = response.to_table()
if output_file:
if isinstance(output_file, str):
fname = output_file
elif isinstance(output_file, Path):
# Merge this case into the str once astropy is >=5.1
fname = str(output_file)
elif hasattr(output_file, 'name'):
fname = output_file.name
else:
raise AttributeError('Not a valid file name, Path, or file handler')
result.write(fname, format=output_format, overwrite=True)
return result
[docs]
def create_async(self, query, *, maxrec=None, uploads=None):
"""
Creates a TAP job to execute and returns it to the caller. The
caller then can start the execution and monitor the job.
Typical (no error handling) sequence of events:
job = create_async(query)
job = job.run().wait()
job.raise_if_error()
result = job.fetch_result()
job.delete() # optional
See ``pyvo.dal.tap`` for details about the ``AsyncTAPJob``
Parameters
----------
query : str, mandatory
SQL to execute
maxrec : int
the maximum records to return. defaults to the service default
uploads:
Temporary tables to upload and run with the queries
output_file: str or file handler:
File to save the results to
Returns
-------
AsyncTAPJob
the query instance
Notes
-----
Support for other output formats (tsv, csv) to be added as soon
as they are available in pyvo.
"""
return self.cadctap.submit_job(query, language='ADQL',
uploads=uploads)
[docs]
def load_async_job(self, jobid):
"""
Loads an asynchronous job
Parameters
----------
jobid : str, mandatory
job identifier
Returns
-------
A Job object
"""
return pyvo.dal.AsyncTAPJob('{}/async/{}'.format(
self.cadctap.baseurl, jobid), session=self._auth_session)
[docs]
def list_async_jobs(self, *, phases=None, after=None, last=None,
short_description=True):
"""
Returns all the asynchronous jobs
Parameters
----------
phases : list of str
Union of job phases to filter the results by.
after : datetime
Return only jobs created after this datetime
last : int
Return only the most recent number of jobs
short_description : flag - True or False
If True, the jobs in the list will contain only the information
corresponding to the TAP ShortJobDescription object (job ID, phase,
run ID, owner ID and creation ID) whereas if False, a separate GET
call to each job is performed for the complete job description
Returns
-------
A list of Job objects
"""
return self.cadctap.get_job_list(phases=phases, after=after, last=last,
short_description=short_description)
def _parse_result(self, result, *, verbose=None):
return result
def _args_to_payload(self, *args, **kwargs):
# convert arguments to a valid requests payload
# and force the coordinates to FK5 (assuming FK5/ICRS are
# interchangeable) since RA/Dec are used below
coordinates = commons.parse_coordinates(kwargs['coordinates']).fk5
radius_deg = Angle(kwargs["radius"]).to_value(u.deg)
payload = {format: 'VOTable'}
payload['query'] = \
"SELECT * from caom2.Observation o join caom2.Plane p " \
"ON o.obsID=p.obsID " \
"WHERE INTERSECTS( " \
"CIRCLE('ICRS', {}, {}, {}), position_bounds) = 1 AND " \
"(quality_flag IS NULL OR quality_flag != 'junk')".\
format(coordinates.ra.degree, coordinates.dec.degree, radius_deg)
if 'collection' in kwargs and kwargs['collection']:
payload['query'] = "{} AND collection='{}'".\
format(payload['query'], kwargs['collection'])
if 'data_product_type' in kwargs and kwargs['data_product_type']:
payload['query'] = "{} AND dataProductType='{}'".\
format(payload['query'], kwargs['data_product_type'])
return payload
def static_vars(**kwargs):
def decorate(func):
for k in kwargs:
setattr(func, k, kwargs[k])
return func
return decorate
@static_vars(caps={})
def get_access_url(service, *, capability=None):
"""
Returns the URL corresponding to a service by doing a lookup in the cadc
registry. It returns the access URL corresponding to cookie authentication.
Parameters
----------
service : str
the service the capability belongs to. It can be identified
by a CADC uri ('ivo://cadc.nrc.ca/) which is looked up in the CADC registry
or by the URL where the service capabilities is found.
capability : str
uri representing the capability for which the access url is sought
Returns
-------
The access url
Note
------
This function implements the functionality of a CADC registry as defined
by the IVOA. It should be eventually moved to its own directory.
Caching should be considered to reduce the number of remote calls to
CADC registry
"""
caps_url = ''
if service.startswith('http'):
if not capability:
return service
caps_url = service
else:
# get caps from the CADC registry
if not get_access_url.caps:
try:
response = requests.get(conf.CADC_REGISTRY_URL)
response.raise_for_status()
except requests.exceptions.HTTPError as err:
log.debug(
"ERROR getting the CADC registry: {}".format(str(err)))
raise err
for line in response.text.splitlines():
if len(line) > 0 and not line.startswith('#'):
service_id, capabilies_url = line.split('=')
get_access_url.caps[service_id.strip()] = \
capabilies_url.strip()
# lookup the service
service_uri = service
if not service.startswith('ivo'):
# assume short form of CADC service
service_uri = 'ivo://cadc.nrc.ca/{}'.format(service)
if service_uri not in get_access_url.caps:
raise AttributeError(
"Cannot find the capabilities of service {}".format(service))
# look up in the CADC reg for the service capabilities
caps_url = get_access_url.caps[service_uri]
if not capability:
return caps_url
try:
response2 = requests.get(caps_url)
response2.raise_for_status()
except Exception as e:
log.debug(
"ERROR getting the service capabilities: {}".format(str(e)))
raise e
soup = BeautifulSoup(response2.text, features="html5lib")
for cap in soup.find_all('capability'):
if cap.get("standardid", None) == capability:
if len(cap.find_all('interface')) == 1:
return cap.find_all('interface')[0].accessurl.text
for i in cap.find_all('interface'):
if hasattr(i, 'securitymethod'):
sm = i.securitymethod
if not sm or sm.get("standardid", None) is None or\
sm['standardid'] == "ivo://ivoa.net/sso#cookie":
return i.accessurl.text
raise RuntimeError("ERROR - capability {} not found or not working with "
"anonymous or cookie access".format(capability))
Cadc = CadcClass()