# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""
MAST Collections
================
This module contains various methods for querying MAST collections such as catalogs.
"""
import difflib
from json import JSONDecodeError
import warnings
import os
import time
from requests import HTTPError, RequestException
import astropy.units as u
import astropy.coordinates as coord
from astropy.table import Table, Row
from ..utils import commons, async_to_sync
from ..utils.class_or_instance import class_or_instance
from ..exceptions import InvalidQueryError, MaxResultsWarning, InputWarning
from . import utils, conf
from .core import MastQueryWithLogin
__all__ = ['Catalogs', 'CatalogsClass']
[docs]
@async_to_sync
class CatalogsClass(MastQueryWithLogin):
"""
MAST catalog query class.
Class for querying MAST catalog data.
"""
def __init__(self):
super().__init__()
services = {"panstarrs": {"path": "panstarrs/{data_release}/{table}.json",
"args": {"data_release": "dr2", "table": "mean"}}}
self._catalogs_mast_search_options = ['columns', 'sort_by', 'table', 'data_release']
self._service_api_connection.set_service_params(services, "catalogs", True)
self.catalog_limit = None
self._current_connection = None
self._service_columns = dict() # Info about columns for Catalogs.MAST services
def _parse_result(self, response, *, verbose=False):
results_table = self._current_connection._parse_result(response, verbose=verbose)
if len(results_table) == self.catalog_limit:
warnings.warn("Maximum catalog results returned, may not include all sources within radius.",
MaxResultsWarning)
return results_table
def _get_service_col_config(self, catalog, release='dr2', table='mean'):
"""
For a given Catalogs.MAST catalog, return a list of all searchable columns and their descriptions.
As of now, this function is exclusive to the Pan-STARRS catalog.
Parameters
----------
catalog : str
The catalog to be queried.
release : str, optional
Catalog data release to query from.
table : str, optional
Catalog table to query from.
Returns
-------
response : `~astropy.table.Table` that contains columns names, types, and descriptions
"""
# Only supported for PanSTARRS currently
if catalog != 'panstarrs':
return
service_key = (catalog, release, table)
if service_key not in self._service_columns:
try:
# Send server request to get column list for given parameters
request_url = f'{conf.catalogs_server}/api/v0.1/{catalog}/{release}/{table}/metadata.json'
resp = utils._simple_request(request_url)
# Parse JSON and extract necessary info
results = resp.json()
rows = [
(result['column_name'], result['db_type'], result['description'])
for result in results
]
# Create Table with parsed data
col_table = Table(rows=rows, names=('name', 'data_type', 'description'))
self._service_columns[service_key] = col_table
except JSONDecodeError as ex:
raise JSONDecodeError(f'Failed to decode JSON response while attempting to get column list'
f' for {catalog} catalog {table}, {release}: {ex}')
except RequestException as ex:
raise ConnectionError(f'Failed to connect to the server while attempting to get column list'
f' for {catalog} catalog {table}, {release}: {ex}')
except KeyError as ex:
raise KeyError(f'Expected key not found in response data while attempting to get column list'
f' for {catalog} catalog {table}, {release}: {ex}')
except Exception as ex:
raise RuntimeError(f'An unexpected error occurred while attempting to get column list'
f' for {catalog} catalog {table}, {release}: {ex}')
return self._service_columns[service_key]
def _validate_service_criteria(self, catalog, **criteria):
"""
Check that criteria keyword arguments are valid column names for the service.
Raises InvalidQueryError if a criteria argument is invalid.
Parameters
----------
catalog : str
The catalog to be queried.
**criteria
Keyword arguments representing criteria filters to apply.
Raises
-------
InvalidQueryError
If a keyword does not match any valid column names, an error is raised that suggests the closest
matching column name, if available.
"""
# Ensure that self._service_columns is populated
release = criteria.get('data_release', 'dr2')
table = criteria.get('table', 'mean')
col_config = self._get_service_col_config(catalog, release, table)
if col_config:
# Check each criteria argument for validity
valid_cols = list(col_config['name']) + self._catalogs_mast_search_options
for kwd in criteria.keys():
col = next((name for name in valid_cols if name.lower() == kwd.lower()), None)
if not col:
closest_match = difflib.get_close_matches(kwd, valid_cols, n=1)
error_msg = (
f"Filter '{kwd}' does not exist for {catalog} catalog {table}, {release}. "
f"Did you mean '{closest_match[0]}'?"
if closest_match
else f"Filter '{kwd}' does not exist for {catalog} catalog {table}, {release}."
)
raise InvalidQueryError(error_msg)
[docs]
@class_or_instance
def query_region_async(self, coordinates, *, radius=0.2*u.deg, catalog="Hsc",
version=None, pagesize=None, page=None, **criteria):
"""
Given a sky position and radius, returns a list of catalog entries.
See column documentation for specific catalogs `here <https://mast.stsci.edu/api/v0/pages.html>`__.
Parameters
----------
coordinates : str or `~astropy.coordinates` object
The target around which to search. It may be specified as a
string or as the appropriate `~astropy.coordinates` object.
radius : str or `~astropy.units.Quantity` object, optional
Default 0.2 degrees.
The string must be parsable by `~astropy.coordinates.Angle`. The
appropriate `~astropy.units.Quantity` object from
`~astropy.units` may also be used. Defaults to 0.2 deg.
catalog : str, optional
Default HSC.
The catalog to be queried.
version : int, optional
Version number for catalogs that have versions. Default is highest version.
pagesize : int, optional
Default None.
Can be used to override the default pagesize for (set in configs) this query only.
E.g. when using a slow internet connection.
page : int, optional
Default None.
Can be used to override the default behavior of all results being returned to obtain a
specific page of results.
**criteria
Other catalog-specific keyword args.
These can be found in the (service documentation)[https://mast.stsci.edu/api/v0/_services.html]
for specific catalogs. For example, one can specify the magtype for an HSC search.
For catalogs available through Catalogs.MAST (PanSTARRS), the Column Name is the keyword, and the argument
should be either an acceptable value for that parameter, or a list consisting values, or tuples of
decorator, value pairs (decorator, value). In addition, columns may be used to select the return columns,
consisting of a list of column names. Results may also be sorted through the query with the parameter
sort_by composed of either a single Column Name to sort ASC, or a list of Column Nmaes to sort ASC or
tuples of Column Name and Direction (ASC, DESC) to indicate sort order (Column Name, DESC).
Detailed information of Catalogs.MAST criteria usage can
be found `here <https://catalogs.mast.stsci.edu/docs/index.html>`__.
Returns
-------
response : list of `~requests.Response`
"""
# Put coordinates and radius into consistent format
coordinates = commons.parse_coordinates(coordinates)
# if radius is just a number we assume degrees
radius = coord.Angle(radius, u.deg)
# basic params
params = {'ra': coordinates.ra.deg,
'dec': coordinates.dec.deg,
'radius': radius.deg}
# Determine API connection and service name
if catalog.lower() in self._service_api_connection.SERVICES:
self._current_connection = self._service_api_connection
service = catalog
# validate user criteria
self._validate_service_criteria(catalog.lower(), **criteria)
# adding additional user specified parameters
for prop, value in criteria.items():
params[prop] = value
else:
self._current_connection = self._portal_api_connection
# valid criteria keywords
valid_criteria = []
# Sorting out the non-standard portal service names
if catalog.lower() == "hsc":
if version == 2:
service = "Mast.Hsc.Db.v2"
else:
if version not in (3, None):
warnings.warn("Invalid HSC version number, defaulting to v3.", InputWarning)
service = "Mast.Hsc.Db.v3"
# Hsc specific parameters (can be overridden by user)
self.catalog_limit = criteria.pop('nr', 50000)
valid_criteria = ['nr', 'ni', 'magtype']
params['nr'] = self.catalog_limit
params['ni'] = criteria.pop('ni', 1)
params['magtype'] = criteria.pop('magtype', 1)
elif catalog.lower() == "galex":
service = "Mast.Galex.Catalog"
self.catalog_limit = criteria.get('maxrecords', 50000)
# galex specific parameters (can be overridden by user)
valid_criteria = ['maxrecords']
params['maxrecords'] = criteria.pop('maxrecords', 50000)
elif catalog.lower() == "gaia":
if version == 1:
service = "Mast.Catalogs.GaiaDR1.Cone"
else:
if version not in (None, 2):
warnings.warn("Invalid Gaia version number, defaulting to DR2.", InputWarning)
service = "Mast.Catalogs.GaiaDR2.Cone"
elif catalog.lower() == 'plato':
if version in (None, 1):
service = "Mast.Catalogs.Plato.Cone"
else:
warnings.warn("Invalid PLATO catalog version number, defaulting to DR1.", InputWarning)
service = "Mast.Catalogs.Plato.Cone"
else:
service = "Mast.Catalogs." + catalog + ".Cone"
self.catalog_limit = None
# additional user-specified parameters are not valid
if criteria:
key = next(iter(criteria))
closest_match = difflib.get_close_matches(key, valid_criteria, n=1)
error_msg = (
f"Filter '{key}' does not exist for catalog {catalog}. Did you mean '{closest_match[0]}'?"
if closest_match
else f"Filter '{key}' does not exist for catalog {catalog}."
)
raise InvalidQueryError(error_msg)
# Parameters will be passed as JSON objects only when accessing the PANSTARRS API
use_json = catalog.lower() == 'panstarrs'
return self._current_connection.service_request_async(service, params, pagesize=pagesize, page=page,
use_json=use_json)
[docs]
@class_or_instance
def query_object_async(self, objectname, *, radius=0.2*u.deg, catalog="Hsc",
pagesize=None, page=None, version=None, **criteria):
"""
Given an object name, returns a list of catalog entries.
See column documentation for specific catalogs `here <https://mast.stsci.edu/api/v0/pages.html>`__.
Parameters
----------
objectname : str
The name of the target around which to search.
radius : str or `~astropy.units.Quantity` object, optional
Default 0.2 degrees.
The string must be parsable by `~astropy.coordinates.Angle`.
The appropriate `~astropy.units.Quantity` object from
`~astropy.units` may also be used. Defaults to 0.2 deg.
catalog : str, optional
Default HSC.
The catalog to be queried.
pagesize : int, optional
Default None.
Can be used to override the default pagesize for (set in configs) this query only.
E.g. when using a slow internet connection.
page : int, optional
Defaulte None.
Can be used to override the default behavior of all results being returned
to obtain a specific page of results.
version : int, optional
Version number for catalogs that have versions. Default is highest version.
**criteria
Catalog-specific keyword args.
These can be found in the `service documentation <https://mast.stsci.edu/api/v0/_services.html>`__.
for specific catalogs. For example, one can specify the magtype for an HSC search.
For catalogs available through Catalogs.MAST (PanSTARRS), the Column Name is the keyword, and the argument
should be either an acceptable value for that parameter, or a list consisting values, or tuples of
decorator, value pairs (decorator, value). In addition, columns may be used to select the return columns,
consisting of a list of column names. Results may also be sorted through the query with the parameter
sort_by composed of either a single Column Name to sort ASC, or a list of Column Nmaes to sort ASC or
tuples of Column Name and Direction (ASC, DESC) to indicate sort order (Column Name, DESC).
Detailed information of Catalogs.MAST criteria usage can
be found `here <https://catalogs.mast.stsci.edu/docs/index.html>`__.
Returns
-------
response : list of `~requests.Response`
"""
coordinates = utils.resolve_object(objectname)
return self.query_region_async(coordinates,
radius=radius,
catalog=catalog,
version=version,
pagesize=pagesize,
page=page,
**criteria)
[docs]
@class_or_instance
def query_criteria_async(self, catalog, *, pagesize=None, page=None, **criteria):
"""
Given an set of filters, returns a list of catalog entries.
See column documentation for specific catalogs `here <https://mast.stsci.edu/api/v0/pages.html>`__.
Parameters
----------
catalog : str
The catalog to be queried.
pagesize : int, optional
Can be used to override the default pagesize.
E.g. when using a slow internet connection.
page : int, optional
Can be used to override the default behavior of all results being returned to obtain
one specific page of results.
**criteria
Criteria to apply. At least one non-positional criteria must be supplied.
Valid criteria are coordinates, objectname, radius (as in `query_region` and `query_object`),
and all fields listed in the column documentation for the catalog being queried.
The Column Name is the keyword, with the argument being one or more acceptable values for that parameter,
except for fields with a float datatype where the argument should be in the form [minVal, maxVal].
For non-float type criteria wildcards maybe used (both * and % are considered wildcards), however
only one wildcarded value can be processed per criterion.
RA and Dec must be given in decimal degrees, and datetimes in MJD.
For example: filters=["FUV","NUV"],proposal_pi="Ost*",t_max=[52264.4586,54452.8914]
For catalogs available through Catalogs.MAST (PanSTARRS), the Column Name is the keyword, and the argument
should be either an acceptable value for that parameter, or a list consisting values, or tuples of
decorator, value pairs (decorator, value). In addition, columns may be used to select the return columns,
consisting of a list of column names. Results may also be sorted through the query with the parameter
sort_by composed of either a single Column Name to sort ASC, or a list of Column Nmaes to sort ASC or
tuples of Column Name and Direction (ASC, DESC) to indicate sort order (Column Name, DESC).
Detailed information of Catalogs.MAST criteria usage can
be found `here <https://catalogs.mast.stsci.edu/docs/index.html>`__.
Returns
-------
response : list of `~requests.Response`
"""
# Separating any position info from the rest of the filters
coordinates = criteria.pop('coordinates', None)
objectname = criteria.pop('objectname', None)
radius = criteria.pop('radius', 0.2*u.deg)
if objectname or coordinates:
coordinates = utils.parse_input_location(coordinates, objectname)
# if radius is just a number we assume degrees
radius = coord.Angle(radius, u.deg)
# build query
params = {}
if coordinates:
params["ra"] = coordinates.ra.deg
params["dec"] = coordinates.dec.deg
params["radius"] = radius.deg
# Determine API connection, service name, and build filter set
filters = None
if catalog.lower() in self._service_api_connection.SERVICES:
self._current_connection = self._service_api_connection
service = catalog
# validate user criteria
self._validate_service_criteria(catalog.lower(), **criteria)
if not self._current_connection.check_catalogs_criteria_params(criteria):
raise InvalidQueryError("At least one non-positional criterion must be supplied.")
for prop, value in criteria.items():
params[prop] = value
else:
self._current_connection = self._portal_api_connection
if catalog.lower() == "tic":
service = "Mast.Catalogs.Filtered.Tic"
if coordinates or objectname:
service += ".Position"
service += ".Rows" # Using the rowstore version of the query for speed
column_config_name = "Mast.Catalogs.Tess.Cone"
params["columns"] = "*"
elif catalog.lower() == "ctl":
service = "Mast.Catalogs.Filtered.Ctl"
if coordinates or objectname:
service += ".Position"
service += ".Rows" # Using the rowstore version of the query for speed
column_config_name = "Mast.Catalogs.Tess.Cone"
params["columns"] = "*"
elif catalog.lower() == "diskdetective":
service = "Mast.Catalogs.Filtered.DiskDetective"
if coordinates or objectname:
service += ".Position"
column_config_name = "Mast.Catalogs.Dd.Cone"
else:
raise InvalidQueryError("Criteria query not available for {}".format(catalog))
filters = self._current_connection.build_filter_set(column_config_name, service, **criteria)
if not filters:
raise InvalidQueryError("At least one non-positional criterion must be supplied.")
params["filters"] = filters
# Parameters will be passed as JSON objects only when accessing the PANSTARRS API
use_json = catalog.lower() == 'panstarrs'
return self._current_connection.service_request_async(service, params, pagesize=pagesize, page=page,
use_json=use_json)
[docs]
@class_or_instance
def query_hsc_matchid_async(self, match, *, version=3, pagesize=None, page=None):
"""
Returns all the matches for a given Hubble Source Catalog MatchID.
Parameters
----------
match : int or `~astropy.table.Row`
The matchID or HSC entry to return matches for.
version : int, optional
The HSC version to match against. Default is v3.
pagesize : int, optional
Can be used to override the default pagesize.
E.g. when using a slow internet connection.
page : int, optional
Can be used to override the default behavior of all results being returned to obtain
one specific page of results.
Returns
-------
response : list of `~requests.Response`
"""
self._current_connection = self._portal_api_connection
if isinstance(match, Row):
match = match["MatchID"]
match = str(match) # np.int64 gives json serializer problems, so stringify right here
if version == 2:
service = "Mast.HscMatches.Db.v2"
else:
if version not in (3, None):
warnings.warn("Invalid HSC version number, defaulting to v3.", InputWarning)
service = "Mast.HscMatches.Db.v3"
params = {"input": match}
return self._current_connection.service_request_async(service, params, pagesize=pagesize, page=page)
[docs]
@class_or_instance
def get_hsc_spectra_async(self, *, pagesize=None, page=None):
"""
Returns all Hubble Source Catalog spectra.
Parameters
----------
pagesize : int, optional
Can be used to override the default pagesize.
E.g. when using a slow internet connection.
page : int, optional
Can be used to override the default behavior of all results being returned to obtain
one specific page of results.
Returns
-------
response : list of `~requests.Response`
"""
self._current_connection = self._portal_api_connection
service = "Mast.HscSpectra.Db.All"
params = {}
return self._current_connection.service_request_async(service, params, pagesize, page)
[docs]
def download_hsc_spectra(self, spectra, *, download_dir=None, cache=True, curl_flag=False):
"""
Download one or more Hubble Source Catalog spectra.
Parameters
----------
spectra : `~astropy.table.Table` or `~astropy.table.Row`
One or more HSC spectra to be downloaded.
download_dir : str, optional
Specify the base directory to download spectra into.
Spectra will be saved in the subdirectory download_dir/mastDownload/HSC.
If download_dir is not specified the base directory will be '.'.
cache : bool, optional
Default is True. If file is found on disc it will not be downloaded again.
Note: has no affect when downloading curl script.
curl_flag : bool, optional
Default is False. If true instead of downloading files directly, a curl script
will be downloaded that can be used to download the data files at a later time.
Returns
-------
response : list of `~requests.Response`
"""
# if spectra is not a Table, put it in a list
if isinstance(spectra, Row):
spectra = [spectra]
# set up the download directory and paths
if not download_dir:
download_dir = '.'
if curl_flag: # don't want to download the files now, just the curl script
download_file = "mastDownload_" + time.strftime("%Y%m%d%H%M%S")
url_list = []
path_list = []
for spec in spectra:
if spec['SpectrumType'] < 2:
url_list.append('https://hla.stsci.edu/cgi-bin/getdata.cgi?config=ops&dataset={0}'
.format(spec['DatasetName']))
else:
url_list.append('https://hla.stsci.edu/cgi-bin/ecfproxy?file_id={0}'
.format(spec['DatasetName']) + '.fits')
path_list.append(download_file + "/HSC/" + spec['DatasetName'] + '.fits')
description_list = [""]*len(spectra)
producttype_list = ['spectrum']*len(spectra)
service = "Mast.Bundle.Request"
params = {"urlList": ",".join(url_list),
"filename": download_file,
"pathList": ",".join(path_list),
"descriptionList": list(description_list),
"productTypeList": list(producttype_list),
"extension": 'curl'}
response = self._portal_api_connection.service_request_async(service, params)
bundler_response = response[0].json()
local_path = os.path.join(download_dir, "{}.sh".format(download_file))
self._download_file(bundler_response['url'], local_path, head_safe=True, continuation=False)
status = "COMPLETE"
msg = None
url = None
if not os.path.isfile(local_path):
status = "ERROR"
msg = "Curl could not be downloaded"
url = bundler_response['url']
else:
missing_files = [x for x in bundler_response['statusList'].keys()
if bundler_response['statusList'][x] != 'COMPLETE']
if len(missing_files):
msg = "{} files could not be added to the curl script".format(len(missing_files))
url = ",".join(missing_files)
manifest = Table({'Local Path': [local_path],
'Status': [status],
'Message': [msg],
"URL": [url]})
else:
base_dir = download_dir.rstrip('/') + "/mastDownload/HSC"
if not os.path.exists(base_dir):
os.makedirs(base_dir)
manifest_array = []
for spec in spectra:
if spec['SpectrumType'] < 2:
data_url = f'https://hla.stsci.edu/cgi-bin/getdata.cgi?config=ops&dataset={spec["DatasetName"]}'
else:
data_url = f'https://hla.stsci.edu/cgi-bin/ecfproxy?file_id={spec["DatasetName"]}.fits'
local_path = os.path.join(base_dir, f'{spec["DatasetName"]}.fits')
status = "COMPLETE"
msg = None
url = None
try:
self._download_file(data_url, local_path, cache=cache, head_safe=True)
# check file size also this is where would perform md5
if not os.path.isfile(local_path):
status = "ERROR"
msg = "File was not downloaded"
url = data_url
except HTTPError as err:
status = "ERROR"
msg = "HTTPError: {0}".format(err)
url = data_url
manifest_array.append([local_path, status, msg, url])
manifest = Table(rows=manifest_array, names=('Local Path', 'Status', 'Message', "URL"))
return manifest
Catalogs = CatalogsClass()