import os
import shutil
import requests
import tarfile
import warnings
import numpy as np
from astropy.table import Table
from astropy import coordinates
from astropy import units as u
from astropy.utils.decorators import deprecated, deprecated_renamed_argument
import pyvo
from astroquery import log
from ..query import BaseQuery, BaseVOQuery
from ..utils import commons, parse_coordinates
from ..exceptions import InvalidQueryError, NoResultsWarning
from . import conf
__all__ = ['Heasarc', 'HeasarcClass']
[docs]
class HeasarcClass(BaseVOQuery, BaseQuery):
"""Class for accessing HEASARC data with VO protocol using the Xamin backend.
"""
# we can move url to Config later
VO_URL = conf.VO_URL
TAR_URL = conf.TAR_URL
S3_BUCKET = conf.S3_BUCKET
timeout = conf.timeout
def __init__(self):
"""Initialize some basic useful variables"""
super().__init__()
self._tap = None
self._datalink = None
self._meta_info = None
self._session = None
@property
def tap(self):
"""TAP service"""
if self._tap is None:
self._tap = pyvo.dal.TAPService(
f'{self.VO_URL}/tap', session=self._session
)
self._session = self._tap._session
return self._tap
@property
def _meta(self):
"""Queries and holds meta-information about the catalogs.
This is a table that holds useful information such as
the list of default columns per catalog, the reasonable default
search radius per table that is appropriate for a mission etc.
Instead of making a server call for each catalog for that type information,
we do a single one and then post-process the resulting table.
These are not meant to be used directly by the user.
"""
if self._meta_info is None:
query = (
"SELECT split_part(name, '.', 1) AS table, "
"split_part(name, '.', 2) AS par, "
"CAST(value AS DECIMAL) AS value "
"FROM metainfo "
"WHERE (type = 'parameter' and relation = 'order') "
"OR relation LIKE 'defaultSearchRadius' "
"ORDER BY value"
)
self._meta_info = self.query_tap(query).to_table()
self._meta_info['value'] = np.array(
self._meta_info['value'], np.float32)
self._meta_info = self._meta_info[self._meta_info['value'] > 0]
return self._meta_info
def _get_default_columns(self, catalog_name):
"""Get a list of default columns for a catalog
Parameters
----------
catalog_name : str
The name of catalog as a str
Return
------
a list of column names
"""
meta = self._meta[
(self._meta['table'] == catalog_name)
& (self._meta['par'] != '')
]
meta.sort('value')
defaults = meta['par']
return list(defaults)
[docs]
def get_default_radius(self, catalog_name):
"""Get a mission-appropriate default radius for a catalog
Parameters
----------
catalog_name : str
The name of catalog as a str
Returns
-------
The radius as `~astropy.units.Quantity`
"""
meta = self._meta[
(self._meta['table'] == catalog_name)
& (self._meta['par'] == '')
]
radius = np.float32(meta['value'][0]) * u.arcmin
return radius
def _set_session(self, session):
"""Set requests.Session to use when querying the data
Parameters
----------
session : `~requests.Session`
The requests.Session to use
"""
if not isinstance(session, requests.Session):
raise ValueError('session is not a `~requests.Session` instance')
self._session = session
[docs]
def list_catalogs(self, *, master=False, keywords=None):
"""Return a table of all available catalogs with two columns
(name, description)
Parameters
----------
master : bool
Select only master catalogs. Default is False
keywords : str or list
a str or a list of str of keywords used as search
terms for catalogs. Words with a str separated by a space
are AND'ed, while words in a list are OR'ed
Returns
-------
`~astropy.table.Table` with columns: name, description
"""
if keywords is not None:
if not isinstance(keywords, list):
keywords = [keywords]
if not all([isinstance(wrd, str) for wrd in keywords]):
raise ValueError('non-str found in keywords elements')
# use 'mast' to include both 'master' and 'mastr'
names, desc = [], []
for lab, tab in self.tap.tables.items():
if 'TAP' in lab or (master and 'mast' not in lab):
continue
if keywords is not None:
matched = any(
[
all([wrd.lower() in f'{lab} {tab.description}'.lower()
for wrd in wrds.split()])
for wrds in keywords
]
)
if not matched:
continue
names.append(lab)
desc.append(tab.description)
return Table({'name': names, 'description': desc})
[docs]
@deprecated(
since='0.4.8',
alternative='list_catalogs',
)
def query_mission_list(self, *, cache=True, get_query_payload=False):
"""Returns a list of all available mission catalogs with descriptions.
This method is deprecated, and is included only for limited
backward compatibility with the old astroquery.Heasarc that uses
the Browse interface. Please use `list_catalogs` instead.
"""
return self.list_catalogs(master=False)
[docs]
def list_columns(self, catalog_name, full=False):
"""Return the columns available in catalog_name as a table
Parameters
----------
catalog_name : str
The name of catalog as a str
full : bool
If True, return all columns, otherwise, return the standard list
of columns
Returns
-------
result : `~astropy.table.Table`
A table with columns: name, description, unit
"""
tables = self.tap.tables
if catalog_name not in tables.keys():
msg = (f'{catalog_name} is not available as a public catalog. '
'Try passing keywords to `~Heasarc.list_catalogs` to find '
'the catalog name')
raise ValueError(msg)
default_cols = self._get_default_columns(catalog_name)
names, desc, unit = [], [], []
for col in tables[catalog_name].columns:
if full or col.name in default_cols:
names.append(col.name)
desc.append(col.description)
unit.append(col.unit or '')
cols = Table({'name': names, 'description': desc, 'unit': unit})
return cols
[docs]
@deprecated(
since='0.4.8',
alternative='list_columns',
)
def query_mission_cols(self, mission, *, cache=True,
get_query_payload=False, **kwargs):
"""Query around a specific object within a given mission catalog
NOTE: This method is deprecated, and is included only for limited
backward compatibility with the old astroquery.Heasarc that uses
the Browse interface. Please use `list_columns` instead.
Parameters
----------
mission : str
Mission catalog (short name) to search from
fields : str, optional
Return format for columns from the server available options:
* Standard : Return default catalog columns
* All (default) : Return all catalog columns
* <custom> : User defined csv list of columns to be returned
cache : bool, optional
Defaults to True. If set overrides global caching behavior.
See :ref:`caching documentation <astroquery_cache>`.
All other parameters have no effect
"""
fields = kwargs.get('fields', 'All')
full = fields != 'Standard'
cols = self.list_columns(mission, full=full)
cols = [col.upper() for col in cols['name'] if '__' not in col]
return cols
[docs]
def query_tap(self, query, *, maxrec=None):
"""
Send query to HEASARC's Xamin TAP using ADQL.
Results in `~pyvo.dal.TAPResults` format.
result.to_table gives `~astropy.table.Table` format.
Parameters
----------
query : str
ADQL query to be executed
maxrec : int
maximum number of records to return
Returns
-------
result : `~pyvo.dal.TAPResults`
TAP query result.
result.to_table : `~astropy.table.Table`
TAP query result as `~astropy.table.Table`
result.to_qtable : `~astropy.table.QTable`
TAP query result as `~astropy.table.QTable`
"""
log.debug(f'TAP query: {query}')
self._saved_query = query
return self.tap.search(query, language='ADQL', maxrec=maxrec)
[docs]
@deprecated_renamed_argument(
('mission', 'fields', 'resultmax', 'entry', 'coordsys', 'equinox',
'displaymode', 'action', 'sortvar', 'cache'),
('catalog', 'columns', 'maxrec', None, None, None,
None, None, None, None),
since=['0.4.8']*10,
arg_in_kwargs=(False, True, True, True, True, True,
True, True, True, False)
)
def query_region(self, position=None, catalog=None, radius=None, *,
spatial='cone', width=None, polygon=None, add_offset=False,
get_query_payload=False, columns=None, cache=False,
verbose=False, maxrec=None,
**kwargs):
"""Queries the HEASARC TAP server around a coordinate and returns a
`~astropy.table.Table` object.
Parameters
----------
position : str, `astropy.coordinates` object
Gives the position of the center of the cone or box if performing
a cone or box search. Required if spatial is ``'cone'`` or
``'box'``. Ignored if spatial is ``'polygon'`` or ``'all-sky'``.
catalog : str
The catalog to query. To list the available catalogs, use
:meth:`~astroquery.heasarc.HeasarcClass.list_catalogs`.
spatial : str
Type of spatial query: ``'cone'``, ``'box'``, ``'polygon'``, and
``'all-sky'``. Defaults to ``'cone'``.
radius : str or `~astropy.units.Quantity` object, [optional for
spatial == ``'cone'``].
The string must be parsable by `~astropy.coordinates.Angle`. The
appropriate `~astropy.units.Quantity` object from
`astropy.units` may also be used. If None, a default value
appropriate for the selected catalog is used. To see the default
radius for the catalog, see
`get_default_radius`.
width : str, `~astropy.units.Quantity` object [Required for
spatial == ``'box'``.]
The string must be parsable by `~astropy.coordinates.Angle`. The
appropriate `~astropy.units.Quantity` object from `astropy.units`
may also be used.
polygon : list, [Required for spatial is ``'polygon'``]
A list of ``(ra, dec)`` pairs (as tuples), in decimal degrees,
outlining the polygon to search in. It can also be a list of
`astropy.coordinates` object or strings that can be parsed by
`astropy.coordinates.ICRS`.
add_offset: bool
If True and spatial=='cone', add a search_offset column that
indicates the separation (in arcmin) between the requested
coordinate and the entry coordinates in the catalog. Default is False.
get_query_payload : bool, optional
If `True` then returns the generated ADQL query as str.
Defaults to `False`.
columns : str, optional
Target column list with value separated by a comma(,).
Use * for all the columns. The default is to return a subset
of the columns that are generally the most useful.
verbose : bool, optional
If False, suppress vo warnings.
maxrec : int, optional
Maximum number of records
Returns
-------
table : A `~astropy.table.Table` object.
"""
# if verbose is False then suppress any VOTable related warnings
if not verbose:
commons.suppress_vo_warnings()
if catalog is None:
raise InvalidQueryError("catalog name is required! Use 'xray' "
"to search the master X-ray catalog")
if columns is None:
columns = ', '.join(self._get_default_columns(catalog))
if '__row' not in columns:
columns += ',__row'
if spatial.lower() == 'all-sky':
where = ''
elif spatial.lower() == 'polygon':
try:
coords_list = [parse_coordinates(coord).icrs
for coord in polygon]
except TypeError:
try:
coords_list = [coordinates.SkyCoord(*coord).icrs
for coord in polygon]
except u.UnitTypeError:
warnings.warn("Polygon endpoints are being interpreted as "
"RA/Dec pairs specified in decimal degree "
"units.")
coords_list = [
coordinates.SkyCoord(*coord, unit='deg').icrs
for coord in polygon
]
coords_str = [f'{coord.ra.deg},{coord.dec.deg}'
for coord in coords_list]
where = (" WHERE CONTAINS(POINT('ICRS',ra,dec),"
f"POLYGON('ICRS',{','.join(coords_str)}))=1")
else:
coords_icrs = parse_coordinates(position).icrs
ra, dec = coords_icrs.ra.deg, coords_icrs.dec.deg
if spatial.lower() == 'cone':
if radius is None:
radius = self.get_default_radius(catalog)
elif isinstance(radius, str):
radius = coordinates.Angle(radius)
where = (" WHERE CONTAINS(POINT('ICRS',ra,dec),CIRCLE("
f"'ICRS',{ra},{dec},{radius.to(u.deg).value}))=1")
# add search_offset for the case of cone
if add_offset:
columns += (",DISTANCE(POINT('ICRS',ra,dec), "
f"POINT('ICRS',{ra},{dec})) as search_offset")
elif spatial.lower() == 'box':
if isinstance(width, str):
width = coordinates.Angle(width)
where = (" WHERE CONTAINS(POINT('ICRS',ra,dec),"
f"BOX('ICRS',{ra},{dec},{width.to(u.deg).value},"
f"{width.to(u.deg).value}))=1")
else:
raise ValueError("Unrecognized spatial query type. Must be one"
" of 'cone', 'box', 'polygon', or 'all-sky'.")
adql = f'SELECT {columns} FROM {catalog}{where}'
if get_query_payload:
return adql
response = self.query_tap(query=adql, maxrec=maxrec)
# save the response in case we want to use it later
self._last_result = response
self._last_catalog_name = catalog
table = response.to_table()
if add_offset:
table['search_offset'].unit = u.arcmin
if len(table) == 0:
warnings.warn(
NoResultsWarning("No matching rows were found in the query.")
)
return table
[docs]
@deprecated(
since='0.4.8',
alternative='query_region'
)
def query_object(self, object_name, mission, *,
cache=True, get_query_payload=False,
**kwargs):
"""Query around a specific object within a given mission catalog
Parameters
----------
object_name : str
Object to query around. To set search radius use the 'radius'
parameter.
mission : str
Mission catalog to search from
cache : bool
Defaults to True. If set overrides global caching behavior.
See :ref:`caching documentation <astroquery_cache>`.
**kwargs :
see `~astroquery.heasarc.HeasarcClass._args_to_payload` for list
of additional parameters that can be used to refine search query.
"""
pos = coordinates.SkyCoord.from_name(object_name)
return self.query_region(pos, catalog=mission, spatial='cone',
get_query_payload=get_query_payload)
[docs]
def locate_data(self, query_result=None, catalog_name=None):
"""Get links to data products
Use vo/datalinks to query the data products for some query_results.
Parameters
----------
query_result : `astropy.table.Table`, optional
A table that contain the search results. Typically as
returned by query_region. If None, use the table from the
most recent query_region call.
catalog_name : str
The catalog name for the which the query_result belongs to.
If None, use the one from the most recent query_region call.
Returns
-------
table : A `~astropy.table.Table` object.
"""
if query_result is None:
if (
not hasattr(self, '_last_result')
or self._last_result is None
):
raise ValueError('query_result is None, and none '
'found from a previous search')
else:
query_result = self._last_result
if not isinstance(query_result, Table):
raise TypeError('query_result need to be an astropy.table.Table')
# make sure we have a column __row
if '__row' not in query_result.colnames:
raise ValueError('No __row column found in query_result. '
'query_result needs to be the output of '
'query_region or a subset.')
if catalog_name is None:
catalog_name = self._last_catalog_name
if not (
isinstance(catalog_name, str)
and catalog_name in self.tap.tables.keys()
):
raise ValueError(f'Unknown catalog name: {catalog_name}')
# datalink url
dlink_url = f'{self.VO_URL}/datalink/{catalog_name}'
query = pyvo.dal.adhoc.DatalinkQuery(
baseurl=dlink_url,
id=query_result['__row'],
session=self._session
)
dl_result = query.execute().to_table()
dl_result = dl_result[dl_result['content_type'] == 'directory']
dl_result = dl_result[['ID', 'access_url', 'content_length']]
# add sciserver and s3 columns
newcol = [
f"/FTP/{row.split('FTP/')[1]}".replace('//', '/')
if 'FTP' in row else ''
for row in dl_result['access_url']
]
dl_result.add_column(newcol, name='sciserver', index=2)
newcol = [f"s3://{self.S3_BUCKET}/{row[5:]}" if row != '' else ''
for row in dl_result['sciserver']]
dl_result.add_column(newcol, name='aws', index=3)
return dl_result
[docs]
def enable_cloud(self, provider='aws', profile=None):
"""
Enable downloading public files from the cloud.
Requires the boto3 library to function.
Parameters
----------
provider : str
Which cloud data provider to use. Currently, only 'aws' is
supported.
profile : str
Profile to use to identify yourself to the cloud provider
(usually in ~/.aws/config).
"""
try:
import boto3
import botocore
except ImportError:
raise ImportError(
'The cloud feature requires the boto3 package. '
'Install it first.'
)
if profile is None:
log.info('Enabling anonymous cloud data access ...')
config = botocore.client.Config(
signature_version=botocore.UNSIGNED)
self.s3_resource = boto3.resource('s3', config=config)
elif isinstance(profile, bool) and not profile:
# profile is False, use system env credentials
self.s3_resource = boto3.resource('s3')
else:
log.info(f'Enabling cloud data access with profile: {profile} ...')
session = boto3.session.Session(profile_name=profile)
self.s3_resource = session.resource(service_name='s3')
self.s3_client = self.s3_resource.meta.client
[docs]
def download_data(self, links, host='heasarc', location='.'):
"""Download data products in links with a choice of getting the
data from either the heasarc server, sciserver, or the cloud in AWS.
Parameters
----------
links : `astropy.table.Table`
The result from locate_data
host : str
The data host. The options are: heasarc (default), sciserver, aws.
If host == 'sciserver', data is copied from the local mounted
data drive.
If host == 'aws', data is downloaded from Amazon S3 Open
Data Repository.
location : str
local folder where the downloaded file will be saved.
Default is current working directory
Note that ff you are downloading large datasets (more 10 10GB),
from the main heasarc server, it is recommended that you split
it up, so that if the downloaded is interrupted, you do not need
to start again.
"""
if len(links) == 0:
raise ValueError('Input links table is empty')
if host not in ['heasarc', 'sciserver', 'aws']:
raise ValueError('host has to be one of heasarc, sciserver, aws')
host_column = 'access_url' if host == 'heasarc' else host
if host_column not in links.colnames:
raise ValueError(
f'No {host_column} column found in the table. Call '
'`~locate_data` first'
)
if host == 'heasarc':
log.info('Downloading data from the HEASARC ...')
self._download_heasarc(links, location)
elif host == 'sciserver':
log.info('Copying data on Sciserver ...')
self._copy_sciserver(links, location)
elif host == 'aws':
log.info('Downloading data AWS S3 ...')
self._download_s3(links, location)
def _download_heasarc(self, links, location='.'):
"""Download data from the heasarc main server using xamin's tar servlet
Do not call directly.
Users should be using `~self.download_data` instead
Parameters
----------
links : `astropy.table.Table`
The result from locate_data
location : str
local folder where the downloaded file will be saved.
Default is current working directory
"""
# The limit comes from the size of the string in the POST request
if 'content_length' in links.columns:
size = links['content_length'].sum() / 2**30
if size > 10:
warnings.warn(
f"The size of the requested file is large {size:.3f} GB. "
"If the download is interrupted, you may need to start "
"again. Consider downloading the data in chunks."
)
file_list = [f"/FTP/{link.split('FTP/')[1]}"
for link in links['access_url']]
params = {
'files': f'>{"&&>".join(file_list)}&&',
'filter': ''
}
# get local_filepath name
local_filepath = f'{location}/heasarc-data.tar'
iname = 1
while os.path.exists(local_filepath):
local_filepath = f'{location}/heasarc-data.{iname}.tar'
iname += 1
log.info(f'Downloading to {local_filepath} ...')
self._download_file(self.TAR_URL, local_filepath,
timeout=self.timeout,
continuation=False, cache=False, method="POST",
head_safe=False, data=params, verbose=False)
# if all good and we have a tar file untar it
if tarfile.is_tarfile(local_filepath):
log.info(f'Untar {local_filepath} to {location} ...')
tfile = tarfile.TarFile(local_filepath)
tfile.extractall(path=location, filter="fully_trusted")
tfile.close()
os.remove(local_filepath)
else:
raise ValueError(
'An error occurred when downloading the data. Retry again.'
)
def _copy_sciserver(self, links, location='.'):
"""Copy data from the local archive on sciserver
Do not call directly.
Users should be using `~self.download_data` instead
"""
if not os.path.exists('/FTP/'):
raise FileNotFoundError(
'No data archive found. This should be run on Sciserver '
'with the data drive mounted.'
)
# make sure the output folder exits
os.makedirs(location, exist_ok=True)
for link in links['sciserver']:
link = str(link)
log.info(f'Copying to {link} from the data drive ...')
if not os.path.exists(link):
raise ValueError(
f'No data found in {link}. '
'Make sure you are running this on Sciserver. '
'If you think data is missing, please contact the '
'Heasarc Help desk'
)
if os.path.isdir(link):
download_dir = os.path.basename(link.strip('/'))
shutil.copytree(link, f'{location}/{download_dir}')
else:
shutil.copy(link, location)
def _download_s3(self, links, location='.'):
"""Download data from AWS S3
Assuming open access.
Do not call directly.
Users should be using `~self.download_data` instead
"""
keys_list = [link for link in links['aws']]
if not hasattr(self, 's3_resource'):
# all the data is public for now; no profile is needed
self.enable_cloud(provider='aws', profile=None)
def _s3_tree_download(client, bucket_name, path, local):
"""Download nested keys from s3"""
response = client.list_objects_v2(Bucket=bucket_name, Prefix=path)
content = response.get('Contents', [])
for obj in content:
key = obj['Key']
path2 = '/'.join(path.strip('/').split('/')[:-1])
dest = os.path.join(local, key[len(path2)+1:])
os.makedirs(os.path.dirname(dest), exist_ok=True)
client.download_file(bucket_name, key, dest)
# loop through the requested links
for key in keys_list:
log.info(f'downloading {key}')
path = key.replace(f's3://{self.S3_BUCKET}/', '')
_s3_tree_download(self.s3_client, self.S3_BUCKET, path, location)
Heasarc = HeasarcClass()