Source code for astroquery.lamda.core

# Licensed under a 3-clause BSD style license - see LICENSE.rst
import os
import json
import platform
import re
import warnings
from urllib import parse as urlparse

from astropy import table
from astroquery import log
from astropy.utils.console import ProgressBar
from bs4 import BeautifulSoup

from ..exceptions import InvalidQueryError
from ..query import BaseQuery

__all__ = ['Lamda']

# should skip only if remote_data = False
__doctest_skip__ = ['LamdaClass.query']

# query_types and collider_ids are potentially useful but not used.
query_types = {
    'erg_levels': '!NUMBER OF ENERGY LEVELS',
    'rad_trans': '!NUMBER OF RADIATIVE TRANSITIONS',
    'coll_rates': '!COLLISIONS BETWEEN'
}

collider_ids = {'H2': 1,
                'PH2': 2,
                'OH2': 3,
                'E': 4,
                'H': 5,
                'HE': 6,
                'H+': 7}
collider_ids.update({v: k for k, v in list(collider_ids.items())})


class LamdaClass(BaseQuery):

    url = "http://home.strw.leidenuniv.nl/~moldata/datafiles/{0}.dat"

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.moldict_path = os.path.join(self.cache_location,
                                         "molecules.json")

    def _get_molfile(self, mol, *, cache=True, timeout=None):
        """
        """
        if mol not in self.molecule_dict:
            raise InvalidQueryError("Molecule {0} is not in the valid "
                                    "molecule list.  See Lamda.molecule_dict")
        response = self._request('GET', self.molecule_dict[mol],
                                 timeout=timeout, cache=cache)
        response.raise_for_status()
        return response

    def download_molfile(self, mol, outfilename):
        """
        Download a particular molecular data file `mol` to output file
        `outfilename`
        """
        molreq = self._get_molfile(mol)
        with open(outfilename, 'w') as f:
            f.write(molreq.text)

    def query(self, mol, *, return_datafile=False, cache=True, timeout=None):
        """
        Query the LAMDA database.

        Parameters
        ----------
        mol : string
            Molecule or atom designation. For a list of valid designations see
            the :meth:`print_mols` method.

        cache : bool
            Defaults to True. If set overrides global caching behavior.
            See :ref:`caching documentation <astroquery_cache>`.

        Returns
        -------
        Tuple of tables: ({rateid: Table, },
                          Table,
                          Table)

        Examples
        --------
        >>> from astroquery.lamda import Lamda
        >>> collrates,radtransitions,enlevels = Lamda.query(mol='co')
        >>> enlevels.pprint()
        LEVEL ENERGIES(cm^-1) WEIGHT  J
        ----- --------------- ------ ---
            2     3.845033413    3.0   1
            3    11.534919938    5.0   2
          ...             ...    ... ...
        >>> collrates['H2'].pprint(max_width=60)
        Transition Upper Lower ... C_ij(T=325) C_ij(T=375)
        ---------- ----- ----- ... ----------- -----------
                 1     2     1 ...     2.8e-11       3e-11
                 2     3     1 ...     1.8e-11     1.9e-11
        """
        # Send HTTP request to open URL
        datafile = [s.strip() for s in
                    self._get_molfile(mol, timeout=timeout,
                                      cache=cache).text.splitlines()]
        if return_datafile:
            return datafile
        # Parse datafile string list and return a table
        tables = parse_lamda_lines(datafile)
        return tables

    def get_molecules(self, *, cache=True):
        """
        Scrape the list of valid molecules

        Parameters
        ----------
        cache : bool
            Defaults to True. If set overrides global caching behavior.
            See :ref:`caching documentation <astroquery_cache>`.
        """

        if cache and hasattr(self, '_molecule_dict'):
            return self._molecule_dict
        elif cache and os.path.isfile(self.moldict_path):
            with open(self.moldict_path, 'r') as f:
                md = json.load(f)
            return md

        main_url = 'http://home.strw.leidenuniv.nl/~moldata/'
        response = self._request('GET', main_url, cache=cache)
        response.raise_for_status()

        soup = BeautifulSoup(response.content, features="html5lib")

        links = soup.find_all('a', href=True)
        datfile_urls = [url
                        for link in ProgressBar(links)
                        for url in self._find_datfiles(link['href'],
                                                       base_url=main_url)]

        molecule_re = re.compile(r'http://[a-zA-Z0-9.]*/~moldata/datafiles/([A-Z0-9a-z_+@-]*).dat')
        molecule_dict = {molecule_re.search(url).groups()[0]:
                         url
                         for url in datfile_urls}

        with open(self.moldict_path, 'w') as f:
            s = json.dumps(molecule_dict)
            f.write(s)

        return molecule_dict

    @property
    def molecule_dict(self):
        if not hasattr(self, '_molecule_dict'):
            warnings.warn("The first time a LAMDA function is called, it must "
                          "assemble a list of valid molecules and URLs.  This "
                          "list will be cached so future operations will be "
                          "faster.")
            self._molecule_dict = self.get_molecules()

        return self._molecule_dict

    def _find_datfiles(self, url, base_url, *, raise_for_status=False):

        myurl = _absurl_from_url(url, base_url)
        if 'http' not in myurl:
            # assume this is a bad URL, like a mailto:blah href
            return []

        response = self._request('GET', myurl)
        if raise_for_status:
            response.raise_for_status()
        elif not response.ok:
            # assume this URL does not contain data b/c it does not exist
            return []

        soup = BeautifulSoup(response.content, features="html5lib")

        links = soup.find_all('a', href=True)

        urls = [_absurl_from_url(link['href'], base_url)
                for link in links if '.dat' in link['href']]

        return urls


def _absurl_from_url(url, base_url):
    if url[:4] != 'http':
        return urlparse.urljoin(base_url, url)
    return url


[docs] def parse_lamda_datafile(filename): """ Read a datafile that follows the format adopted for the atomic and molecular data in the LAMDA database. Parameters ---------- filename : str Fully qualified path of the file to read. Returns ------- Tuple of tables: ({rateid: Table, }, Table, Table) """ with open(filename) as f: lines = f.readlines() return parse_lamda_lines(lines)
[docs] def write_lamda_datafile(filename, tables): """ Write tuple of tables with LAMDA data into a datafile that follows the format adopted for the LAMDA database. Parameters ---------- filename : str Fully qualified path of the file to write. tables: tuple Tuple of Tables ({rateid: coll_table}, rad_table, mol_table) """ collrates, radtransitions, enlevels = tables levels_hdr = ("""! MOLECULE {0} ! MOLECULAR WEIGHT {1} ! NUMBER OF ENERGY LEVELS {2} ! LEVEL + ENERGIES(cm^-1) + WEIGHT + J """) levels_hdr = re.sub('^ +', '', levels_hdr, flags=re.MULTILINE) radtrans_hdr = ("""! NUMBER OF RADIATIVE TRANSITIONS {0} ! TRANS + UP + LOW + EINSTEINA(s^-1) + FREQ(GHz) + E_u(K) """) radtrans_hdr = re.sub('^ +', '', radtrans_hdr, flags=re.MULTILINE) coll_hdr = ("""! NUMBER OF COLL PARTNERS {0} """) coll_hdr = re.sub('^ +', '', coll_hdr, flags=re.MULTILINE) coll_part_hdr = ("""! COLLISION PARTNER {0} {1} ! NUMBER OF COLLISIONAL TRANSITIONS {2} ! NUMBER OF COLLISION TEMPERATURES {3} ! COLLISION TEMPERATURES {4} ! TRANS + UP + LOW + RATE COEFFS(cm^3 s^-1) """) coll_part_hdr = re.sub('^ +', '', coll_part_hdr, flags=re.MULTILINE) if platform.system() == 'Windows': stream = open(filename, 'w', newline='') else: stream = open(filename, 'w') with stream as f: f.write(levels_hdr.format(enlevels.meta['molecule'], enlevels.meta['molwt'], enlevels.meta['nenergylevels'])) enlevels.write(f, format='ascii.no_header') f.write(radtrans_hdr.format(radtransitions.meta['radtrans'])) radtransitions.write(f, format='ascii.no_header') f.write(coll_hdr.format(len(collrates))) for k, v in collrates.items(): temperatures = ' '.join([str(i) for i in v.meta['temperatures']]) f.write(coll_part_hdr.format(v.meta['collider_id'], v.meta['collider'], v.meta['ntrans'], v.meta['ntemp'], temperatures)) v.write(f, format='ascii.no_header')
def parse_lamda_lines(data): """ Extract a LAMDA datafile into a dictionary of tables (non-pythonic! more like, fortranic) """ meta_rad = {} meta_mol = {} meta_coll = {} levels = [] radtrans = [] collider = None ncolltrans = None for ii, line in enumerate(data): if line[0] == '!': continue if 'molecule' not in meta_mol: meta_mol['molecule'] = _cln(line) continue if 'molwt' not in meta_mol: meta_mol['molwt'] = float(_cln(line)) continue if 'nenergylevels' not in meta_mol: meta_mol['nenergylevels'] = int(_cln(line)) continue if len(levels) < meta_mol['nenergylevels']: lev, en, wt = _cln(line).split()[:3] jul = " ".join(_cln(line).split()[3:]) levels.append([int(lev), float(en), int(float(wt)), jul]) continue if 'radtrans' not in meta_rad: meta_rad['radtrans'] = int(_cln(line)) continue if len(radtrans) < meta_rad['radtrans']: # Can have wavenumber at the end. Ignore that. trans, up, low, aval, freq, eu = _cln(line).split()[:6] radtrans.append([int(trans), int(up), int(low), float(aval), float(freq), float(eu)]) continue if 'ncoll' not in meta_coll: meta_coll['ncoll'] = int(_cln(line)) collrates = {} continue if collider is None: collider = int(line[0]) collname = collider_ids[collider] collrates[collider] = [] meta_coll[collname] = {'collider': collname, 'collider_id': collider} continue if ncolltrans is None: ncolltrans = int(_cln(line)) meta_coll[collname]['ntrans'] = ncolltrans continue if 'ntemp' not in meta_coll[collname]: meta_coll[collname]['ntemp'] = int(_cln(line)) continue if 'temperatures' not in meta_coll[collname]: meta_coll[collname]['temperatures'] = [int(float(x)) for x in _cln(line).split()] continue if len(collrates[collider]) < meta_coll[collname]['ntrans']: trans, up, low = [int(x) for x in _cln(line).split()[:3]] temperatures = [float(x) for x in _cln(line).split()[3:]] collrates[collider].append([trans, up, low] + temperatures) if len(collrates[collider]) == meta_coll[collname]['ntrans']: # meta_coll[collider_ids[collider]+'_collrates'] = collrates log.debug("{ii} Finished loading collider {0:d}: " "{1}".format(collider, collider_ids[collider], ii=ii)) collider = None ncolltrans = None if len(collrates) == meta_coll['ncoll']: # All done! break if len(levels[0]) == 4: mol_table_names = ['Level', 'Energy', 'Weight', 'J'] elif len(levels[0]) == 5: mol_table_names = ['Level', 'Energy', 'Weight', 'J', 'F'] else: raise ValueError("Unrecognized levels structure.") mol_table_columns = [table.Column(name=name, data=data) for name, data in zip(mol_table_names, zip(*levels))] mol_table = table.Table(data=mol_table_columns, meta=meta_mol) rad_table_names = ['Transition', 'Upper', 'Lower', 'EinsteinA', 'Frequency', 'E_u(K)'] rad_table_columns = [table.Column(name=name, data=data) for name, data in zip(rad_table_names, zip(*radtrans))] rad_table = table.Table(data=rad_table_columns, meta=meta_rad) coll_tables = {collider_ids[collider]: None for collider in collrates} for collider in collrates: collname = collider_ids[collider] coll_table_names = (['Transition', 'Upper', 'Lower'] + ['C_ij(T={0:d})'.format(tem) for tem in meta_coll[collname]["temperatures"]]) coll_table_columns = [table.Column(name=name, data=data) for name, data in zip(coll_table_names, zip(*collrates[collider]))] coll_table = table.Table(data=coll_table_columns, meta=meta_coll[collname]) coll_tables[collname] = coll_table return coll_tables, rad_table, mol_table def _cln(s): """ Clean a string of comments, newlines """ return s.split("!")[0].strip() Lamda = LamdaClass()