# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""
Simbad query class for accessing the Simbad Service
"""
from __future__ import print_function
import re
import json
import os
from collections import namedtuple
import tempfile
import warnings
from ..query import BaseQuery
from ..utils.class_or_instance import class_or_instance
from ..utils import commons
import astropy.units as u
from astropy.utils.data import get_pkg_data_filename
import astropy.coordinates as coord
from astropy.table import Table
try:
import astropy.io.vo.table as votable
except ImportError:
import astropy.io.votable as votable
from . import SIMBAD_SERVER, SIMBAD_TIMEOUT, ROW_LIMIT
__all__ = ['Simbad']
def validate_epoch(func):
"""
A method decorator that checks if the epoch value entered by the user
is acceptable.
"""
def wrapper(*args, **kwargs):
if kwargs.get('epoch'):
value = kwargs['epoch']
try:
p = re.compile('^[JB]\d+[.]?\d+$', re.IGNORECASE)
assert p.match(value) is not None
except (AssertionError, TypeError):
raise Exception("Epoch must be specified as [J|B]<epoch>.\n"
"Example: epoch='J2000'")
return func(*args, **kwargs)
return wrapper
def validate_equinox(func):
"""
A method decorator that checks if the equinox value entered by the user
is acceptable.
"""
def wrapper(*args, **kwargs):
if kwargs.get('equinox'):
value = kwargs['equinox']
try:
float(value)
except ValueError:
raise Exception("Equinox must be a number")
return func(*args, **kwargs)
return wrapper
[docs]class Simbad(BaseQuery):
"""
The class for querying the Simbad web service.
"""
SIMBAD_URL = 'http://' + SIMBAD_SERVER() + '/simbad/sim-script'
TIMEOUT = SIMBAD_TIMEOUT()
WILDCARDS = {
'*': 'Any string of characters (including an empty one)',
'?': 'Any character (exactly one character)',
'[abc]': ('Exactly one character taken in the list. '
'Can also be defined by a range of characters: [A-Z]'
),
'[^0-9]': 'Any (one) character not in the list.'
}
# query around not included since this is a subcase of query_region
_function_to_command = {
'query_object_async': 'query id',
'query_region_async': 'query coo',
'query_catalog_async': 'query cat',
'query_bibcode_async': 'query bibcode',
'query_bibobj_async': 'query bibobj'
}
# also find a way to fetch the votable fields table from <http://simbad.u-strasbg.fr/simbad/sim-help?Page=sim-fscript#VotableFields>
# tried something for this in this ipython nb
# <http://nbviewer.ipython.org/5851110>
VOTABLE_FIELDS = ['main_id', 'coordinates']
ROW_LIMIT = ROW_LIMIT()
@class_or_instance
[docs] def list_wildcards(self):
"""
Displays the available wildcards that may be used in Simbad queries and
their usage.
"""
for key in Simbad.WILDCARDS:
print("{key} : {value}\n".format(key=key, value=Simbad.WILDCARDS[key]))
return
@class_or_instance
[docs] def list_votable_fields(self):
"""
Lists all the fields that can be fetched for a VOTable.
"""
# display additional notes:
notes_file = get_pkg_data_filename(os.path.join('data', 'votable_fields_notes.json'))
with open(notes_file, "r") as f:
notes = json.load(f)
print ("--NOTES--\n")
for i, line in list(enumerate(notes)):
print ("{lineno}. {msg}\n".format(lineno=i+1, msg=line))
# load the table
votable_fields_table = Table.read(get_pkg_data_filename
(os.path.join('data',
'votable_fields_table.txt')),
format='ascii')
print (votable_fields_table)
print("\nFor more information on a field :\nSimbad.get_field_description "
"('field_name')")
@class_or_instance
[docs] def get_field_description(self, field_name):
"""
Displays a description of the VOTable field
Parameters
----------
field_name : str
the name of the field to describe. Must be one of those listed
by `astroquery.simbad.Simbad.list_votable_fields`.
"""
# first load the dictionary from json
dict_file = get_pkg_data_filename(os.path.join('data', 'votable_fields_dict.json'))
with open(dict_file, "r") as f:
fields_dict = json.load(f)
try:
print (fields_dict[field_name])
except KeyError:
raise Exception("No such field_name")
@class_or_instance
[docs] def set_votable_fields(self, *args):
"""
Sets fields to be fetched in the VOTable. Must be one of those listed
by `astroquery.simbad.Simbad.list_votable_fields`.
Parameters
----------
list of field_names
"""
dict_file = get_pkg_data_filename(os.path.join('data', 'votable_fields_dict.json'))
with open(dict_file, "r") as f:
fields_dict = json.load(f)
for field in args:
if field not in fields_dict:
warnings.warn("{field}: no such field".format(field=field))
elif field in Simbad.VOTABLE_FIELDS:
warnings.warn("{field}: field already present".format(field=field))
else:
Simbad.VOTABLE_FIELDS.append(field)
@class_or_instance
[docs] def rm_votable_fields(self, *args):
"""
Removes the specified field names from `astroquery.simbad.Simbad.VOTABLE_FIELDS`
Parameters
----------
list of field_names to be removed
"""
absent_fields = set(args) - set(Simbad.VOTABLE_FIELDS)
for field in absent_fields:
warnings.warn("{field}: this field is not set".format(field=field))
Simbad.VOTABLE_FIELDS = list(set(Simbad.VOTABLE_FIELDS) - set(args))
# check if all fields are removed
if not Simbad.VOTABLE_FIELDS:
self.reset_votable_fields()
@class_or_instance
[docs] def reset_votable_fields(self):
"""
resets VOTABLE_FIELDS to defaults
"""
Simbad.VOTABLE_FIELDS = ['main_id', 'coordinates']
@class_or_instance
[docs] def query_object(self, object_name, wildcard=False, verbose=False):
"""
Queries Simbad for the given object and returns the result as an
`astropy.table.Table`. Object names may also be specified with wildcard.
See examples below.
Parameters
----------
object_name : str
name of object to be queried
wildcard : boolean, optional
When it is set to `True` it implies that the object is specified
with wildcards. Defaults to `False`.
Returns
-------
`astropy.table.Table`
The results of the query as an `astropy.table.Table`.
"""
result = self.query_object_async(object_name, wildcard=wildcard)
return self._parse_result(result, verbose=verbose)
@class_or_instance
[docs] def query_object_async(self, object_name, wildcard=False):
"""
Serves the same function as `astoquery.simbad.Simbad.query_object`, but
only collects the reponse from the Simbad server and returns.
Parameters
----------
object_name : str
name of object to be queried
wildcard : boolean, optional
When it is set to `True` it implies that the object is specified
with wildcards. Defaults to `False`.
Returns
-------
response : `requests.response`
the response of the query from the server
"""
request_payload = self._args_to_payload(object_name, wildcard=wildcard,
caller='query_object_async')
response = commons.send_request(Simbad.SIMBAD_URL, request_payload,
Simbad.TIMEOUT)
return response
@class_or_instance
[docs] def query_region(self, coordinates, radius=None,
equinox=None, epoch=None, verbose=False):
"""
Queries around an object or coordinates as per the specified radius and
returns the results in an `astropy.table.Table`.
Parameters
----------
coordinates : str/`astropy.coordinates`
the identifier or coordinates around which to query.
radius : str/`astropy.units.Qunatity`, optional
the radius of the region. If missing, set to default
value of 20 arcmin.
equinox : float, optional
the equinox of the coordinates. If missing set to
default 2000.0.
epoch : str, optional
the epoch of the input coordiantes. Must be specified as
[J|B] <epoch>. If missing, set to default J2000.
Returns
-------
`astropy.table.Table`
The results of the query as an `astropy.table.Table`.
"""
# if the identifier is given rather than the coordinates, convert to
# coordinates
result = self.query_region_async(coordinates, radius=radius,
equinox=equinox, epoch=epoch)
return self._parse_result(result, verbose=verbose)
@class_or_instance
[docs] def query_region_async(self, coordinates, radius=None, equinox=None,
epoch=None):
"""
Serves the same function as `astoquery.simbad.Simbad.query_region`, but
only collects the reponse from the Simbad server and returns.
Parameters
----------
coordinates : str/`astropy.coordinates`
the identifier or coordinates around which to query.
radius : str/`astropy.units.Qunatity`, optional
the radius of the region. If missing, set to default
value of 20 arcmin.
equinox : float, optional
the equinox of the coordinates. If missing set to
default 2000.0.
epoch : str, optional
the epoch of the input coordiantes. Must be specified as
[J|B] <epoch>. If missing, set to default J2000.
Returns
-------
response : `requests.response` object
the response of the query from the server.
"""
request_payload = self._args_to_payload(coordinates, radius=radius,
equinox=equinox, epoch=epoch,
caller='query_region_async')
response = commons.send_request(Simbad.SIMBAD_URL, request_payload,
Simbad.TIMEOUT)
return response
@class_or_instance
[docs] def query_catalog(self, catalog, verbose=False):
"""
Queries a whole catalog. Results may be very large -number of rows
should be controlled by configuring `astroquery.simbad.ROW_LIMIT`.
Parameters
----------
catalog : str
the name of the catalog.
Returns
-------
`astropy.table.Table`
The results of the query as an `astropy.table.Table`.
"""
result = self.query_catalog_async(catalog)
return self._parse_result(result, verbose=verbose)
@class_or_instance
[docs] def query_catalog_async(self, catalog):
"""
Serves the same function as `astoquery.simbad.Simbad.query_catalog`, but
only collects the reponse from the Simbad server and returns.
Parameters
----------
catalog : str
the name of the catalog.
Returns
-------
response : `requests.response` object
the response of the query from the server.
"""
request_payload = self._args_to_payload(catalog,
caller='query_catalog_async')
response = commons.send_request(Simbad.SIMBAD_URL, request_payload,
Simbad.TIMEOUT)
return response
@class_or_instance
[docs] def query_bibobj(self, bibcode, verbose=False):
"""
Query all the objects that are contained in the article specified by
the bibcode, and return results as an `astropy.table.Table`.
Parameters
----------
bibcode : str
the bibcode of the article
Returns
-------
`astropy.table.Table`
The results of the query as an `astropy.table.Table`.
"""
result = self.query_bibobj_async(bibcode)
return self._parse_result(result, verbose=verbose)
@class_or_instance
[docs] def query_bibobj_async(self, bibcode):
"""
Serves the same function as `astoquery.simbad.Simbad.query_bibobj`, but
only collects the reponse from the Simbad server and returns.
Parameters
----------
bibcode : str
the bibcode of the article
Returns
-------
response : `requests.response` object
the response of the query from the server.
"""
request_payload = self._args_to_payload(
bibcode, caller='query_bibobj_async')
response = commons.send_request(Simbad.SIMBAD_URL, request_payload,
Simbad.TIMEOUT)
return response
@class_or_instance
[docs] def query_bibcode(self, bibcode, wildcard=False, verbose=False):
"""
Queries the references corresponding to a given bibcode, and returns
the results in an `astropy.table.Table`. Wildcards may be used to
specify bibcodes
Parameters
----------
bibcode : str
the bibcode of the article
wildcard : boolean, optional
When it is set to `True` it implies that the object is specified
with wildcards. Defaults to `False`.
Returns
-------
`astropy.table.Table`
The results of the query as an `astropy.table.Table`.
"""
result = self.query_bibcode_async(bibcode, wildcard=wildcard)
return self._parse_result(result, verbose=verbose)
@class_or_instance
[docs] def query_bibcode_async(self, bibcode, wildcard=False):
"""
Serves the same function as `astoquery.simbad.Simbad.query_bibcode`, but
only collects the reponse from the Simbad server and returns.
Parameters
----------
bibcode : str
the bibcode of the article
wildcard : boolean, optional
When it is set to `True` it implies that the object is specified
with wildcards. Defaults to `False`.
Returns
-------
response : `requests.response` object
the response of the query from the server.
"""
request_payload = self._args_to_payload(bibcode, wildcard=wildcard,
caller='query_bibcode_async', get_raw=True)
response = commons.send_request(Simbad.SIMBAD_URL, request_payload,
Simbad.TIMEOUT)
return response
@class_or_instance
@validate_epoch
@validate_equinox
def _args_to_payload(self, *args, **kwargs):
"""
Takes the arguments from any of the query functions
and returns a dictionary that can be used as the
data for an HTTP POST request.
"""
script = ""
caller = kwargs['caller']
del kwargs['caller']
get_raw = kwargs.get('get_raw', False)
if get_raw:
del kwargs['get_raw']
command = self._function_to_command[caller]
votable_fields = ','.join(Simbad.VOTABLE_FIELDS)
# if get_raw is set then don't fetch as votable
votable_def = ("votable {" + votable_fields + "}", "")[get_raw]
votable_open = ("votable open", "")[get_raw]
votable_close = ("votable close", "")[get_raw]
if Simbad.ROW_LIMIT > 0:
script = "set limit " + str(Simbad.ROW_LIMIT)
script = "\n".join([script, votable_def, votable_open, command])
if kwargs.get('wildcard'):
script += " wildcard" # necessary to have a space at the beginning
del kwargs['wildcard']
# now append args and kwds as per the caller
# if caller is query_region_async write coordinates as separate ra dec
if caller == 'query_region_async':
coordinates = args[0]
args = args[1:]
ra, dec, frame = _parse_coordinates(coordinates)
args = [ra, dec]
kwargs['frame'] = frame
if kwargs.get('radius'):
kwargs['radius'] = _parse_radius(kwargs['radius'])
args_str = ' '.join([str(val) for val in args])
# rename equinox to equi as required by SIMBAD script
if kwargs.get('equinox'):
kwargs['equi'] = kwargs['equinox']
del kwargs['equinox']
# remove default None from kwargs
# be compatible with python3
for key in list(kwargs):
if not kwargs[key]:
del kwargs[key]
# join in the order specified otherwise results in error
all_keys = ['radius', 'frame', 'equi', 'epoch']
present_keys =[key for key in all_keys if key in kwargs]
kwargs_str = ' '.join("{key}={value}".format(key=key, value=kwargs[key]) for
key in present_keys)
script += ' '.join([" ", args_str, kwargs_str, "\n"])
script += votable_close
return dict(script=script)
@class_or_instance
def _parse_result(self, result, verbose=False):
if not verbose:
commons.suppress_vo_warnings()
parsed_result = SimbadResult(result.content)
try:
return parsed_result.table
except Exception as E:
warnings.warn("Error in parsing Simbad result. "
"Returning raw result instead.")
raise E
return result.content
def _parse_coordinates(coordinates):
try:
c = commons.parse_coordinates(coordinates)
# now c has some subclass of astropy.coordinate
# get ra, dec and frame
return _get_frame_coords(c)
except (u.UnitsException, TypeError):
raise Exception("Coordinates not specified correctly")
def _get_frame_coords(c):
if isinstance(c, coord.ICRSCoordinates):
ra, dec = _to_simbad_format(c.ra, c.dec)
return (ra, dec, 'ICRS')
if isinstance(c, coord.GalacticCoordinates):
lon, lat = (str(c.lonangle.degree), str(c.latangle.degree))
if lat[0] not in ['+', '-']:
lat = '+' + lat
return (lon, lat, 'GAL')
if isinstance(c, coord.FK4Coordinates):
ra, dec = _to_simbad_format(c.ra, c.dec)
return (ra, dec,'FK4')
if isinstance(c, coord.FK5Coordinates):
ra, dec = _to_simbad_format(c.ra, c.dec)
return (ra, dec, 'FK5')
def _to_simbad_format(ra, dec):
ra = ra.format(u.hour, sep=':')
dec = dec.format(u.degree, sep=':', alwayssign='True')
return (ra, dec)
def _parse_radius(radius):
try:
angle = commons.parse_radius(radius)
# find the most appropriate unit - d, m or s
index = min([i for (i,val) in enumerate(angle.dms) if int(val) > 0])
unit = ('d', 'm', 's')[index]
if unit == 'd':
return str(int(angle.degree)) + unit
if unit == 'm':
sec_to_min = angle.dms[2] * u.arcsec.to(u.arcmin)
total_min = angle.dms[1] + sec_to_min
return str(total_min) + unit
if unit == 's':
return str(angle.dms[2]) + unit
except (u.UnitsException, coord.errors.UnitsError, AttributeError):
raise Exception("Radius specified incorrectly")
error_regex = re.compile(r'(?ms)\[(?P<line>\d+)\]\s?(?P<msg>.+?)(\[|\Z)')
bibcode_regex = re.compile(r'query\s+bibcode\s+(wildcard)?\s+([\w]*)')
SimbadError = namedtuple('SimbadError', ('line', 'msg'))
VersionInfo = namedtuple('VersionInfo', ('major', 'minor', 'micro', 'patch'))
class SimbadResult(object):
__sections = ('script', 'console', 'error', 'data')
def __init__(self, txt, pedantic=False):
self.__txt = txt
self.__pedantic = pedantic
self.__table = None
self.__stringio = None
self.__indexes = {}
self.exectime = None
self.sim_version = None
self.__split_sections()
self.__parse_console_section()
self.__warn()
self.__file = None
def __split_sections(self):
for section in self.__sections:
match = re.search(r'(?ims)^::%s:+?$(?P<content>.*?)(^::|\Z)' %
section, self.__txt)
if match:
self.__indexes[section] = (match.start('content'),
match.end('content'))
def __parse_console_section(self):
if self.console is None:
return
match = re.search(r'(?ims)total execution time: ([.\d]+?)\s*?secs',
self.console)
if match:
try:
self.exectime = float(match.group(1))
except:
# TODO: do something useful here.
pass
match = re.search(r'(?ms)SIMBAD(\d) rel (\d)[.](\d+)([^\d^\s])?',
self.console)
if match:
self.sim_version = VersionInfo(*match.groups(None))
def __warn(self):
for error in self.errors:
warnings.warn("Warning: The script line number %i raised "
"the error: %s." %
(error.line, error.msg))
def __get_section(self, section_name):
if section_name in self.__indexes:
return self.__txt[self.__indexes[section_name][0]:
self.__indexes[section_name][1]].strip()
@property
def script(self):
return self.__get_section('script')
@property
def console(self):
return self.__get_section('console')
@property
def error_raw(self):
return self.__get_section('error')
@property
def data(self):
return self.__get_section('data')
@property
def errors(self):
result = []
if self.error_raw is None:
return result
for err in error_regex.finditer(self.error_raw):
result.append(SimbadError(int(err.group('line')),
err.group('msg').replace('\n', ' ')))
return result
@property
def nb_errors(self):
if self.error_raw is None:
return 0
return len(self.errors)
@property
def table(self):
if self.__file is None:
self.__file = tempfile.NamedTemporaryFile()
self.__file.write(self.data.encode('utf-8'))
self.__file.flush()
# if bibcode query then first create table from raw data
bibcode_match = bibcode_regex.search(self.script)
if bibcode_match:
self.__table = _create_bibcode_table(self.data, bibcode_match.group(2))
else:
self.__table = votable.parse_single_table(self.__file, pedantic=False).to_table()
return self.__table
def _create_bibcode_table(data, splitter):
ref_list = [splitter + ref for ref in data.split(splitter)][1:]
table = Table(names=['References'], dtypes=['object'])
for ref in ref_list:
table.add_row([ref.decode('utf-8')])
return table