Source code for astroquery.lamda.core

# Licensed under a 3-clause BSD style license - see LICENSE.rst
import os
import json
from astropy import table
from astropy import log
from astropy.utils.console import ProgressBar
from bs4 import BeautifulSoup
from six.moves import urllib_parse as urlparse
import re
import warnings

from ..exceptions import InvalidQueryError
from ..query import BaseQuery

__all__ = ['Lamda']

# should skip only if remote_data = False
__doctest_skip__ = ['LamdaClass.query']

# query_types and collider_ids are potentially useful but not used.
query_types = {
    'erg_levels': '!NUMBER OF ENERGY LEVELS',
    'rad_trans': '!NUMBER OF RADIATIVE TRANSITIONS',
    'coll_rates': '!COLLISIONS BETWEEN'
}

collider_ids = {'H2': 1,
                'PH2': 2,
                'OH2': 3,
                'E': 4,
                'H': 5,
                'HE': 6,
                'H+': 7}
collider_ids.update({v: k for k, v in list(collider_ids.items())})


class LamdaClass(BaseQuery):

    url = "http://home.strw.leidenuniv.nl/~moldata/datafiles/{0}.dat"

    def __init__(self, **kwargs):
        super(LamdaClass, self).__init__(**kwargs)
        self.moldict_path = os.path.join(self.cache_location,
                                         "molecules.json")

    def _get_molfile(self, mol, cache=True, timeout=None):
        """
        """
        if mol not in self.molecule_dict:
            raise InvalidQueryError("Molecule {0} is not in the valid "
                                    "molecule list.  See Lamda.molecule_dict")
        response = self._request('GET', self.molecule_dict[mol],
                                 timeout=timeout, cache=cache)
        response.raise_for_status()
        return response

    def download_molfile(self, mol, outfilename):
        """
        Download a particular molecular data file `mol` to output file
        `outfilename`
        """
        molreq = self._get_molfile(mol)
        with open(outfilename, 'w') as f:
            f.write(molreq.text)

    def query(self, mol, return_datafile=False, cache=True, timeout=None):
        """
        Query the LAMDA database.

        Parameters
        ----------
        mol : string
            Molecule or atom designation. For a list of valid designations see
            the :meth:`print_mols` method.

        Returns
        -------
        Tuple of tables: ({rateid: Table, },
                          Table,
                          Table)

        Examples
        --------
        >>> from astroquery.lamda import Lamda
        >>> collrates,radtransitions,enlevels = Lamda.query(mol='co')
        >>> enlevels.pprint()
        LEVEL ENERGIES(cm^-1) WEIGHT  J
        ----- --------------- ------ ---
            2     3.845033413    3.0   1
            3    11.534919938    5.0   2
          ...             ...    ... ...
        >>> collrates['H2'].pprint(max_width=60)
        Transition Upper Lower ... C_ij(T=325) C_ij(T=375)
        ---------- ----- ----- ... ----------- -----------
                 1     2     1 ...     2.8e-11       3e-11
                 2     3     1 ...     1.8e-11     1.9e-11
        """
        # Send HTTP request to open URL
        datafile = [s.strip() for s in
                    self._get_molfile(mol, timeout=timeout,
                                      cache=cache).text.splitlines()]
        if return_datafile:
            return datafile
        # Parse datafile string list and return a table
        tables = parse_lamda_lines(datafile)
        return tables

    def get_molecules(self, cache=True):
        """
        Scrape the list of valid molecules
        """
        if cache and hasattr(self, '_molecule_dict'):
            return self._molecule_dict
        elif cache and os.path.isfile(self.moldict_path):
            with open(self.moldict_path, 'r') as f:
                md = json.load(f)
            return md

        main_url = 'http://home.strw.leidenuniv.nl/~moldata/'
        response = self._request('GET', main_url, cache=cache)
        response.raise_for_status()

        soup = BeautifulSoup(response.content)

        links = soup.find_all('a', href=True)
        datfile_urls = [url
                        for link in ProgressBar(links)
                        for url in self._find_datfiles(link['href'],
                                                       base_url=main_url)]

        molecule_re = re.compile(r'http://[a-zA-Z0-9.]*/~moldata/datafiles/([A-Z0-9a-z_+@-]*).dat')
        molecule_dict = {molecule_re.search(url).groups()[0]:
                         url
                         for url in datfile_urls}

        with open(self.moldict_path, 'w') as f:
            s = json.dumps(molecule_dict)
            f.write(s)

        return molecule_dict

    @property
    def molecule_dict(self):
        if not hasattr(self, '_molecule_dict'):
            warnings.warn("The first time a LAMDA function is called, it must "
                          "assemble a list of valid molecules and URLs.  This "
                          "list will be cached so future operations will be "
                          "faster.")
            self._molecule_dict = self.get_molecules()

        return self._molecule_dict

    def _find_datfiles(self, url, base_url, raise_for_status=False):

        myurl = _absurl_from_url(url, base_url)
        if 'http' not in myurl:
            # assume this is a bad URL, like a mailto:blah href
            return []

        response = self._request('GET', myurl)
        if raise_for_status:
            response.raise_for_status()
        elif not response.ok:
            # assume this URL does not contain data b/c it does not exist
            return []

        soup = BeautifulSoup(response.content)

        links = soup.find_all('a', href=True)

        urls = [_absurl_from_url(link['href'], base_url)
                for link in links if '.dat' in link['href']]

        return urls


def _absurl_from_url(url, base_url):
    if url[:4] != 'http':
        return urlparse.urljoin(base_url, url)
    return url


[docs]def parse_lamda_datafile(filename): """ Read a datafile that follows the format adopted for the atomic and molecular data in the LAMDA database. Parameters ---------- filename : str Fully qualified path of the file to read. Returns ------- Tuple of tables: ({rateid: Table, }, Table, Table) """ with open(filename) as f: lines = f.readlines() return parse_lamda_lines(lines)
[docs]def write_lamda_datafile(filename, tables): """ Write tuple of tables with LAMDA data into a datafile that follows the format adopted for the LAMDA database. Parameters ---------- filename : str Fully qualified path of the file to write. tables: tuple Tuple of Tables ({rateid: coll_table}, rad_table, mol_table) """ import platform import sys collrates, radtransitions, enlevels = tables levels_hdr = ("""! MOLECULE {0} ! MOLECULAR WEIGHT {1} ! NUMBER OF ENERGY LEVELS {2} ! LEVEL + ENERGIES(cm^-1) + WEIGHT + J """) levels_hdr = re.sub('^ +', '', levels_hdr, flags=re.MULTILINE) radtrans_hdr = ("""! NUMBER OF RADIATIVE TRANSITIONS {0} ! TRANS + UP + LOW + EINSTEINA(s^-1) + FREQ(GHz) + E_u(K) """) radtrans_hdr = re.sub('^ +', '', radtrans_hdr, flags=re.MULTILINE) coll_hdr = ("""! NUMBER OF COLL PARTNERS {0} """) coll_hdr = re.sub('^ +', '', coll_hdr, flags=re.MULTILINE) coll_part_hdr = ("""! COLLISION PARTNER {0} {1} ! NUMBER OF COLLISIONAL TRANSITIONS {2} ! NUMBER OF COLLISION TEMPERATURES {3} ! COLLISION TEMPERATURES {4} ! TRANS + UP + LOW + RATE COEFFS(cm^3 s^-1) """) coll_part_hdr = re.sub('^ +', '', coll_part_hdr, flags=re.MULTILINE) if platform.system() == 'Windows': if sys.version_info[0] >= 3: stream = open(filename, 'w', newline='') else: stream = open(filename, 'wb') else: stream = open(filename, 'w') with stream as f: f.write(levels_hdr.format(enlevels.meta['molecule'], enlevels.meta['molwt'], enlevels.meta['nenergylevels'])) enlevels.write(f, format='ascii.no_header') f.write(radtrans_hdr.format(radtransitions.meta['radtrans'])) radtransitions.write(f, format='ascii.no_header') f.write(coll_hdr.format(len(collrates))) for k, v in collrates.items(): temperatures = ' '.join([str(i) for i in v.meta['temperatures']]) f.write(coll_part_hdr.format(v.meta['collider_id'], v.meta['collider'], v.meta['ntrans'], v.meta['ntemp'], temperatures)) v.write(f, format='ascii.no_header')
def parse_lamda_lines(data): """ Extract a LAMDA datafile into a dictionary of tables (non-pythonic! more like, fortranic) """ meta_rad = {} meta_mol = {} meta_coll = {} levels = [] radtrans = [] collider = None ncolltrans = None for ii, line in enumerate(data): if line[0] == '!': continue if 'molecule' not in meta_mol: meta_mol['molecule'] = _cln(line) continue if 'molwt' not in meta_mol: meta_mol['molwt'] = float(_cln(line)) continue if 'nenergylevels' not in meta_mol: meta_mol['nenergylevels'] = int(_cln(line)) continue if len(levels) < meta_mol['nenergylevels']: lev, en, wt = _cln(line).split()[:3] jul = " ".join(_cln(line).split()[3:]) levels.append([int(lev), float(en), int(float(wt)), jul]) continue if 'radtrans' not in meta_rad: meta_rad['radtrans'] = int(_cln(line)) continue if len(radtrans) < meta_rad['radtrans']: # Can have wavenumber at the end. Ignore that. trans, up, low, aval, freq, eu = _cln(line).split()[:6] radtrans.append([int(trans), int(up), int(low), float(aval), float(freq), float(eu)]) continue if 'ncoll' not in meta_coll: meta_coll['ncoll'] = int(_cln(line)) collrates = {} continue if collider is None: collider = int(line[0]) collname = collider_ids[collider] collrates[collider] = [] meta_coll[collname] = {'collider': collname, 'collider_id': collider} continue if ncolltrans is None: ncolltrans = int(_cln(line)) meta_coll[collname]['ntrans'] = ncolltrans continue if 'ntemp' not in meta_coll[collname]: meta_coll[collname]['ntemp'] = int(_cln(line)) continue if 'temperatures' not in meta_coll[collname]: meta_coll[collname]['temperatures'] = [int(float(x)) for x in _cln(line).split()] continue if len(collrates[collider]) < meta_coll[collname]['ntrans']: trans, up, low = [int(x) for x in _cln(line).split()[:3]] temperatures = [float(x) for x in _cln(line).split()[3:]] collrates[collider].append([trans, up, low] + temperatures) if len(collrates[collider]) == meta_coll[collname]['ntrans']: # meta_coll[collider_ids[collider]+'_collrates'] = collrates log.debug("{ii} Finished loading collider {0:d}: " "{1}".format(collider, collider_ids[collider], ii=ii)) collider = None ncolltrans = None if len(collrates) == meta_coll['ncoll']: # All done! break if len(levels[0]) == 4: mol_table_names = ['Level', 'Energy', 'Weight', 'J'] elif len(levels[0]) == 5: mol_table_names = ['Level', 'Energy', 'Weight', 'J', 'F'] else: raise ValueError("Unrecognized levels structure.") mol_table_columns = [table.Column(name=name, data=data) for name, data in zip(mol_table_names, zip(*levels))] mol_table = table.Table(data=mol_table_columns, meta=meta_mol) rad_table_names = ['Transition', 'Upper', 'Lower', 'EinsteinA', 'Frequency', 'E_u(K)'] rad_table_columns = [table.Column(name=name, data=data) for name, data in zip(rad_table_names, zip(*radtrans))] rad_table = table.Table(data=rad_table_columns, meta=meta_rad) coll_tables = {collider_ids[collider]: None for collider in collrates} for collider in collrates: collname = collider_ids[collider] coll_table_names = (['Transition', 'Upper', 'Lower'] + ['C_ij(T={0:d})'.format(tem) for tem in meta_coll[collname]["temperatures"]]) coll_table_columns = [table.Column(name=name, data=data) for name, data in zip(coll_table_names, zip(*collrates[collider]))] coll_table = table.Table(data=coll_table_columns, meta=meta_coll[collname]) coll_tables[collname] = coll_table return coll_tables, rad_table, mol_table def _cln(s): """ Clean a string of comments, newlines """ return s.split("!")[0].strip() Lamda = LamdaClass()