# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""Validate VO Services."""
# STDLIB
import multiprocessing
import os
import warnings
from collections import OrderedDict
from pathlib import Path
# ASTROPY
from astropy.io import votable
from astropy.io.votable.exceptions import E19
from astropy.io.votable.validator import html, result
from astroquery import log
from astropy.utils import data
from astropy.utils.exceptions import AstropyUserWarning
from astropy.utils.xml.unescaper import unescape_all
# LOCAL
from .tstquery import parse_cs
from .exceptions import (ValidationMultiprocessingError,
InvalidValidationAttribute)
from ..exceptions import VOSError
from ..vos_catalog import VOSDatabase, vo_tab_parse
from ...utils.timer import timefunc
# Import configurable items declared in __init__.py
from . import conf
__all__ = ['check_conesearch_sites']
[docs]
@timefunc(num_tries=1)
def check_conesearch_sites(*, destdir=os.curdir, verbose=True, parallel=True,
url_list='default'):
"""
Validate Cone Search Services.
.. note::
URLs are unescaped prior to validation.
Only check queries with ``<testQuery>`` parameters.
Does not perform meta-data and erroneous queries.
Parameters
----------
destdir : str, optional
Directory to store output files. Will be created if does
not exist. Existing files with these names will be deleted
or replaced:
* conesearch_good.json
* conesearch_warn.json
* conesearch_exception.json
* conesearch_error.json
verbose : bool, optional
Print extra info to log.
parallel : bool, optional
Enable multiprocessing.
url_list : list of string, optional
Only check these access URLs against
``astroquery.vo_conesearch.validator.conf.conesearch_master_list``
and ignore the others, which will not appear in output files.
By default, check those in
``astroquery.vo_conesearch.validator.conf.conesearch_urls``.
If `None`, check everything.
Raises
------
OSError
Invalid destination directory.
timeout
URL request timed out.
ValidationMultiprocessingError
Multiprocessing failed.
"""
if url_list == 'default':
url_list = conf.conesearch_urls
if (not isinstance(destdir, (str, Path))
or os.path.exists(destdir) and not os.path.isdir(destdir)):
raise OSError('Invalid destination directory') # pragma: no cover
if not os.path.exists(destdir):
os.mkdir(destdir)
# Output dir created by votable.validator
out_dir = os.path.join(destdir, 'results')
if not os.path.exists(out_dir):
os.mkdir(out_dir)
# Output files
db_file = OrderedDict()
db_file['good'] = os.path.join(destdir, 'conesearch_good.json')
db_file['warn'] = os.path.join(destdir, 'conesearch_warn.json')
db_file['excp'] = os.path.join(destdir, 'conesearch_exception.json')
db_file['nerr'] = os.path.join(destdir, 'conesearch_error.json')
# JSON dictionaries for output files
js_tree = {}
for key in db_file:
js_tree[key] = VOSDatabase.create_empty()
# Delete existing files, if any, to be on the safe side.
# Else can cause confusion if program exited prior to
# new files being written but old files are still there.
if os.path.exists(db_file[key]): # pragma: no cover
os.remove(db_file[key])
if verbose:
log.info('Existing file {0} deleted'.format(db_file[key]))
# Master VO database from registry. Silence all the warnings.
with warnings.catch_warnings():
warnings.simplefilter('ignore')
js_mstr = VOSDatabase.from_registry(
conf.conesearch_master_list, encoding='binary',
show_progress=verbose)
# Validate only a subset of the services.
if url_list is not None:
# Make sure URL is unique and fixed.
url_list = set(map(
unescape_all,
[cur_url if isinstance(cur_url, str) else cur_url
for cur_url in url_list]))
uniq_rows = len(url_list)
url_list_processed = [] # To track if given URL is valid in registry
if verbose:
log.info('Only {0}/{1} site(s) are validated'.format(uniq_rows,
len(js_mstr)))
# Validate all services.
else:
uniq_rows = len(js_mstr)
key_lookup_by_url = {}
# Process each catalog in the registry.
for cur_key, cur_cat in js_mstr.get_catalogs():
cur_url = cur_cat['url']
# Skip if:
# a. not a Cone Search service
# b. not in given subset, if any
if ((cur_cat['cap_type'] != 'conesearch')
or (url_list is not None and cur_url not in url_list)):
continue
# Use testQuery to return non-empty VO table with max verbosity.
testquery_pars = parse_cs(cur_cat['ivoid'], cap_index=cur_cat['cap_index'])
cs_pars_arr = ['{}={}'.format(key, testquery_pars[key])
for key in testquery_pars]
cs_pars_arr += ['VERB=3']
# Track the service.
key_lookup_by_url[cur_url + '&'.join(cs_pars_arr)] = cur_key
if url_list is not None:
url_list_processed.append(cur_url)
# Give warning if any of the user given subset is not in the registry.
if url_list is not None:
url_list_skipped = url_list - set(url_list_processed)
n_skipped = len(url_list_skipped)
if n_skipped > 0:
warn_str = '{0} not found in registry! Skipped:\n'.format(
n_skipped)
for cur_url in url_list_skipped:
warn_str += '\t{0}\n'.format(cur_url)
warnings.warn(warn_str, AstropyUserWarning)
all_urls = list(key_lookup_by_url)
timeout = data.conf.remote_timeout
map_args = [(out_dir, url.encode('utf-8'), timeout) for url in all_urls]
# Validate URLs
if parallel:
pool = multiprocessing.Pool()
try:
mp_list = pool.map(_do_validation, map_args)
except Exception as exc: # pragma: no cover
raise ValidationMultiprocessingError(
'An exception occurred during parallel processing '
'of validation results: {0}'.format(exc))
else:
mp_list = map(_do_validation, map_args)
# Categorize validation results
for r in mp_list:
db_key = r['out_db_name']
cat_key = key_lookup_by_url[r.url.decode('utf-8')]
cur_cat = js_mstr.get_catalog(cat_key)
_copy_r_to_cat(r, cur_cat)
js_tree[db_key].add_catalog(cat_key, cur_cat)
# Write to HTML
html_subsets = result.get_result_subsets(mp_list, out_dir)
html.write_index(html_subsets, all_urls, out_dir)
if parallel:
html_subindex_args = [(out_dir, html_subset, uniq_rows)
for html_subset in html_subsets]
pool.map(_html_subindex, html_subindex_args)
else:
for html_subset in html_subsets:
_html_subindex((out_dir, html_subset, uniq_rows))
# Write to JSON
n = {}
n_tot = 0
for key in db_file:
n[key] = len(js_tree[key])
n_tot += n[key]
js_tree[key].to_json(db_file[key], overwrite=True)
if verbose:
log.info('{0}: {1} catalog(s)'.format(key, n[key]))
# Checksum
if verbose:
log.info('total: {0} out of {1} catalog(s)'.format(n_tot, uniq_rows))
if n['good'] == 0: # pragma: no cover
warnings.warn(
'No good sites available for Cone Search.', AstropyUserWarning)
def _do_validation(args):
"""Validation for multiprocessing support."""
root, url, timeout = args
votable.table.reset_vo_warnings()
r = result.Result(url, root=root, timeout=timeout)
r.validate_vo()
_categorize_result(r)
# This was already checked above.
# Calling this again to get VOTableFile object to catch
# well-formed error responses in downloaded XML.
#
# 'incorrect' is also added in case user wants to use
# 'conesearch_warn.json' anyway.
#
# If using cached data, it will not detect network error
# like the first run, but will raise exception.
#
# When SR is not 0, VOSError is raised for empty table.
#
if r['expected'] in ('good', 'incorrect') and r['nexceptions'] == 0:
nexceptions = 0
nwarnings = 0
lines = []
with warnings.catch_warnings(record=True) as warning_lines:
try:
vo_tab_parse(votable.table.parse(
r.get_vo_xml_path(), verify='warn'), r.url, {})
except (E19, IndexError, VOSError) as e: # pragma: no cover
lines.append(str(e))
nexceptions += 1
lines = [str(x.message) for x in warning_lines] + lines
warning_types = set()
for line in lines: # pragma: no cover
w = votable.exceptions.parse_vowarning(line)
if w['is_warning']:
nwarnings += 1
if w['is_exception']:
nexceptions += 1
warning_types.add(w['warning'])
r['nwarnings'] += nwarnings
r['nexceptions'] += nexceptions
r['warnings'] += lines
r['warning_types'] = r['warning_types'].union(warning_types)
_categorize_result(r)
html.write_result(r)
return r
def _categorize_result(r):
"""
Set success codes.
Parameters
----------
r : `astropy.io.votable.validator.result.Result`
Raises
------
InvalidValidationAttribute
Unhandled validation result attributes.
"""
from . import conf
if ('network_error' in r
and r['network_error'] is not None): # pragma: no cover
r['out_db_name'] = 'nerr'
r['expected'] = 'broken'
elif ((r['nexceptions'] == 0 and r['nwarnings'] == 0)
or r['warning_types'].issubset(conf.noncritical_warnings)):
r['out_db_name'] = 'good'
r['expected'] = 'good'
elif r['nexceptions'] > 0: # pragma: no cover
r['out_db_name'] = 'excp'
r['expected'] = 'incorrect'
elif r['nwarnings'] > 0: # pragma: no cover
r['out_db_name'] = 'warn'
r['expected'] = 'incorrect'
else: # pragma: no cover
raise InvalidValidationAttribute('Unhandled validation result attrib'
'utes: {0}'.format(r._attributes))
def _html_subindex(args):
"""HTML writer for multiprocessing support."""
out_dir, subset, total = args
html.write_index_table(out_dir, *subset, total=total)
def _copy_r_to_cat(r, cat):
"""
Copy validation result attributes to given VO catalog.
Parameters
----------
r : `astropy.io.votable.validate.result.Result`
cat : `astroquery.vo_conesearch.vos_catalog.VOSCatalog`
"""
for key in r._attributes:
new_key = 'validate_' + key
cat[new_key] = r._attributes[key]