Source code for astroquery.vo_conesearch.validator.validate

# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""Validate VO Services."""

# STDLIB
import multiprocessing
import os
import warnings
from collections import OrderedDict
from pathlib import Path

# ASTROPY
from astropy.io import votable
from astropy.io.votable.exceptions import E19
from astropy.io.votable.validator import html, result
from astroquery import log
from astropy.utils import data
from astropy.utils.exceptions import AstropyUserWarning
from astropy.utils.xml.unescaper import unescape_all

# LOCAL
from .tstquery import parse_cs
from .exceptions import (ValidationMultiprocessingError,
                         InvalidValidationAttribute)
from ..exceptions import VOSError
from ..vos_catalog import VOSDatabase, vo_tab_parse
from ...utils.timer import timefunc

# Import configurable items declared in __init__.py
from . import conf

__all__ = ['check_conesearch_sites']


[docs] @timefunc(num_tries=1) def check_conesearch_sites(*, destdir=os.curdir, verbose=True, parallel=True, url_list='default'): """ Validate Cone Search Services. .. note:: URLs are unescaped prior to validation. Only check queries with ``<testQuery>`` parameters. Does not perform meta-data and erroneous queries. Parameters ---------- destdir : str, optional Directory to store output files. Will be created if does not exist. Existing files with these names will be deleted or replaced: * conesearch_good.json * conesearch_warn.json * conesearch_exception.json * conesearch_error.json verbose : bool, optional Print extra info to log. parallel : bool, optional Enable multiprocessing. url_list : list of string, optional Only check these access URLs against ``astroquery.vo_conesearch.validator.conf.conesearch_master_list`` and ignore the others, which will not appear in output files. By default, check those in ``astroquery.vo_conesearch.validator.conf.conesearch_urls``. If `None`, check everything. Raises ------ OSError Invalid destination directory. timeout URL request timed out. ValidationMultiprocessingError Multiprocessing failed. """ if url_list == 'default': url_list = conf.conesearch_urls if (not isinstance(destdir, (str, Path)) or os.path.exists(destdir) and not os.path.isdir(destdir)): raise OSError('Invalid destination directory') # pragma: no cover if not os.path.exists(destdir): os.mkdir(destdir) # Output dir created by votable.validator out_dir = os.path.join(destdir, 'results') if not os.path.exists(out_dir): os.mkdir(out_dir) # Output files db_file = OrderedDict() db_file['good'] = os.path.join(destdir, 'conesearch_good.json') db_file['warn'] = os.path.join(destdir, 'conesearch_warn.json') db_file['excp'] = os.path.join(destdir, 'conesearch_exception.json') db_file['nerr'] = os.path.join(destdir, 'conesearch_error.json') # JSON dictionaries for output files js_tree = {} for key in db_file: js_tree[key] = VOSDatabase.create_empty() # Delete existing files, if any, to be on the safe side. # Else can cause confusion if program exited prior to # new files being written but old files are still there. if os.path.exists(db_file[key]): # pragma: no cover os.remove(db_file[key]) if verbose: log.info('Existing file {0} deleted'.format(db_file[key])) # Master VO database from registry. Silence all the warnings. with warnings.catch_warnings(): warnings.simplefilter('ignore') js_mstr = VOSDatabase.from_registry( conf.conesearch_master_list, encoding='binary', show_progress=verbose) # Validate only a subset of the services. if url_list is not None: # Make sure URL is unique and fixed. url_list = set(map( unescape_all, [cur_url if isinstance(cur_url, str) else cur_url for cur_url in url_list])) uniq_rows = len(url_list) url_list_processed = [] # To track if given URL is valid in registry if verbose: log.info('Only {0}/{1} site(s) are validated'.format(uniq_rows, len(js_mstr))) # Validate all services. else: uniq_rows = len(js_mstr) key_lookup_by_url = {} # Process each catalog in the registry. for cur_key, cur_cat in js_mstr.get_catalogs(): cur_url = cur_cat['url'] # Skip if: # a. not a Cone Search service # b. not in given subset, if any if ((cur_cat['cap_type'] != 'conesearch') or (url_list is not None and cur_url not in url_list)): continue # Use testQuery to return non-empty VO table with max verbosity. testquery_pars = parse_cs(cur_cat['ivoid'], cap_index=cur_cat['cap_index']) cs_pars_arr = ['{}={}'.format(key, testquery_pars[key]) for key in testquery_pars] cs_pars_arr += ['VERB=3'] # Track the service. key_lookup_by_url[cur_url + '&'.join(cs_pars_arr)] = cur_key if url_list is not None: url_list_processed.append(cur_url) # Give warning if any of the user given subset is not in the registry. if url_list is not None: url_list_skipped = url_list - set(url_list_processed) n_skipped = len(url_list_skipped) if n_skipped > 0: warn_str = '{0} not found in registry! Skipped:\n'.format( n_skipped) for cur_url in url_list_skipped: warn_str += '\t{0}\n'.format(cur_url) warnings.warn(warn_str, AstropyUserWarning) all_urls = list(key_lookup_by_url) timeout = data.conf.remote_timeout map_args = [(out_dir, url.encode('utf-8'), timeout) for url in all_urls] # Validate URLs if parallel: pool = multiprocessing.Pool() try: mp_list = pool.map(_do_validation, map_args) except Exception as exc: # pragma: no cover raise ValidationMultiprocessingError( 'An exception occurred during parallel processing ' 'of validation results: {0}'.format(exc)) else: mp_list = map(_do_validation, map_args) # Categorize validation results for r in mp_list: db_key = r['out_db_name'] cat_key = key_lookup_by_url[r.url.decode('utf-8')] cur_cat = js_mstr.get_catalog(cat_key) _copy_r_to_cat(r, cur_cat) js_tree[db_key].add_catalog(cat_key, cur_cat) # Write to HTML html_subsets = result.get_result_subsets(mp_list, out_dir) html.write_index(html_subsets, all_urls, out_dir) if parallel: html_subindex_args = [(out_dir, html_subset, uniq_rows) for html_subset in html_subsets] pool.map(_html_subindex, html_subindex_args) else: for html_subset in html_subsets: _html_subindex((out_dir, html_subset, uniq_rows)) # Write to JSON n = {} n_tot = 0 for key in db_file: n[key] = len(js_tree[key]) n_tot += n[key] js_tree[key].to_json(db_file[key], overwrite=True) if verbose: log.info('{0}: {1} catalog(s)'.format(key, n[key])) # Checksum if verbose: log.info('total: {0} out of {1} catalog(s)'.format(n_tot, uniq_rows)) if n['good'] == 0: # pragma: no cover warnings.warn( 'No good sites available for Cone Search.', AstropyUserWarning)
def _do_validation(args): """Validation for multiprocessing support.""" root, url, timeout = args votable.table.reset_vo_warnings() r = result.Result(url, root=root, timeout=timeout) r.validate_vo() _categorize_result(r) # This was already checked above. # Calling this again to get VOTableFile object to catch # well-formed error responses in downloaded XML. # # 'incorrect' is also added in case user wants to use # 'conesearch_warn.json' anyway. # # If using cached data, it will not detect network error # like the first run, but will raise exception. # # When SR is not 0, VOSError is raised for empty table. # if r['expected'] in ('good', 'incorrect') and r['nexceptions'] == 0: nexceptions = 0 nwarnings = 0 lines = [] with warnings.catch_warnings(record=True) as warning_lines: try: vo_tab_parse(votable.table.parse( r.get_vo_xml_path(), verify='warn'), r.url, {}) except (E19, IndexError, VOSError) as e: # pragma: no cover lines.append(str(e)) nexceptions += 1 lines = [str(x.message) for x in warning_lines] + lines warning_types = set() for line in lines: # pragma: no cover w = votable.exceptions.parse_vowarning(line) if w['is_warning']: nwarnings += 1 if w['is_exception']: nexceptions += 1 warning_types.add(w['warning']) r['nwarnings'] += nwarnings r['nexceptions'] += nexceptions r['warnings'] += lines r['warning_types'] = r['warning_types'].union(warning_types) _categorize_result(r) html.write_result(r) return r def _categorize_result(r): """ Set success codes. Parameters ---------- r : `astropy.io.votable.validator.result.Result` Raises ------ InvalidValidationAttribute Unhandled validation result attributes. """ from . import conf if ('network_error' in r and r['network_error'] is not None): # pragma: no cover r['out_db_name'] = 'nerr' r['expected'] = 'broken' elif ((r['nexceptions'] == 0 and r['nwarnings'] == 0) or r['warning_types'].issubset(conf.noncritical_warnings)): r['out_db_name'] = 'good' r['expected'] = 'good' elif r['nexceptions'] > 0: # pragma: no cover r['out_db_name'] = 'excp' r['expected'] = 'incorrect' elif r['nwarnings'] > 0: # pragma: no cover r['out_db_name'] = 'warn' r['expected'] = 'incorrect' else: # pragma: no cover raise InvalidValidationAttribute('Unhandled validation result attrib' 'utes: {0}'.format(r._attributes)) def _html_subindex(args): """HTML writer for multiprocessing support.""" out_dir, subset, total = args html.write_index_table(out_dir, *subset, total=total) def _copy_r_to_cat(r, cat): """ Copy validation result attributes to given VO catalog. Parameters ---------- r : `astropy.io.votable.validate.result.Result` cat : `astroquery.vo_conesearch.vos_catalog.VOSCatalog` """ for key in r._attributes: new_key = 'validate_' + key cat[new_key] = r._attributes[key]