Source code for astroquery.alma.core

# Licensed under a 3-clause BSD style license - see LICENSE.rst
from __future__ import print_function
import time
import os.path
import keyring
import numpy as np
import re
import tarfile
import string
import requests
from requests import HTTPError
import sys
from pkg_resources import resource_filename
from bs4 import BeautifulSoup

from six.moves.urllib_parse import urljoin, urlparse
from six import iteritems, StringIO
import six
from astropy.table import Table, Column
from astropy import log
from astropy.utils.console import ProgressBar
from astropy import units as u
import astropy.coordinates as coord
import as votable

from ..exceptions import (RemoteServiceError, TableParseError,
                          InvalidQueryError, LoginError)
from ..utils import commons, url_helpers
from ..utils.process_asyncs import async_to_sync
from ..query import QueryWithLogin
from . import conf

__doctest_skip__ = ['AlmaClass.*']

[docs]@async_to_sync class AlmaClass(QueryWithLogin): TIMEOUT = conf.timeout archive_url = conf.archive_url USERNAME = conf.username def __init__(self): super(AlmaClass, self).__init__()
[docs] def query_object_async(self, object_name, cache=True, public=True, science=True, payload=None, **kwargs): """ Query the archive with a source name Parameters ---------- object_name : str The object name. Will be parsed by SESAME on the ALMA servers. cache : bool Cache the query? public : bool Return only publicly available datasets? science : bool Return only data marked as "science" in the archive? payload : dict Dictionary of additional keywords. See `help`. kwargs : dict Passed to `query_async` """ if payload is None: payload = {} payload.update({'source_name_resolver': object_name, }) return self.query_async(payload, cache=cache, public=public, science=science, **kwargs)
[docs] def query_region_async(self, coordinate, radius, cache=True, public=True, science=True, payload=None, **kwargs): """ Query the ALMA archive with a source name and radius Parameters ---------- coordinates : str / `astropy.coordinates` the identifier or coordinates around which to query. radius : str / `~astropy.units.Quantity`, optional the radius of the region cache : bool Cache the query? public : bool Return only publicly available datasets? science : bool Return only data marked as "science" in the archive? payload : dict Dictionary of additional keywords. See `help`. kwargs : dict Passed to `query_async` """ coordinate = commons.parse_coordinates(coordinate) cstr = coordinate.fk5.to_string(style='hmsdms', sep=':') rdc = "{cstr}, {rad}".format(cstr=cstr, rad=coord.Angle(radius).deg) if payload is None: payload = {} payload.update({'ra_dec': rdc}) return self.query_async(payload, cache=cache, public=public, science=science, **kwargs)
[docs] def query_async(self, payload, cache=True, public=True, science=True, max_retries=5, get_html_version=False, get_query_payload=False, **kwargs): """ Perform a generic query with user-specified payload Parameters ---------- payload : dict A dictionary of payload keywords that are accepted by the ALMA archive system. You can look these up by examining the forms at or using the `help` method cache : bool Cache the query? (note: HTML queries *cannot* be cached using the standard caching mechanism because the URLs are different each time public : bool Return only publicly available datasets? science : bool Return only data marked as "science" in the archive? """ url = urljoin(self._get_dataarchive_url(), 'aq/') payload.update(kwargs) if get_html_version: payload.update({'result_view': 'observation', 'format': 'URL', 'download': 'true'}) else: payload.update({'result_view': 'raw', 'format': 'VOTABLE', 'download': 'true'}) if public: payload['public_data'] = 'public' if science: payload['science_observations'] = '=%TARGET%' self.validate_query(payload) if get_query_payload: return payload response = self._request('GET', url, params=payload, timeout=self.TIMEOUT, cache=cache and not get_html_version) self._last_response = response response.raise_for_status() if get_html_version: if 'run' not in response.text: if max_retries > 0:"Failed query. Retrying up to {0} more times" .format(max_retries)) return self.query_async(payload=payload, cache=False, public=public, science=science, max_retries=max_retries-1, get_html_version=get_html_version, get_query_payload=get_query_payload, **kwargs) raise RemoteServiceError("Incorrect return from HTML table query.") response2 = self._request('GET', "{0}/{1}/{2}".format( self._get_dataarchive_url(), 'aq', response.text), params={'query_url': response.url.split("?")[-1]}, timeout=self.TIMEOUT, cache=False, ) self._last_response = response2 response2.raise_for_status() if len(response2.text) == 0: if max_retries > 0:"Failed (empty) query. Retrying up to {0} more times" .format(max_retries)) return self.query_async(payload=payload, cache=cache, public=public, science=science, max_retries=max_retries-1, get_html_version=get_html_version, get_query_payload=get_query_payload, **kwargs) raise RemoteServiceError("Empty return.") return response2 else: return response
[docs] def validate_query(self, payload, cache=True): """ Use the ALMA query validator service to check whether the keywords are valid """ # Check that the keywords specified are allowed self._validate_payload(payload) vurl = self._get_dataarchive_url() + '/aq/validate' bad_kws = {} for kw in payload: vpayload = {'field': kw, kw: payload[kw]} response = self._request('GET', vurl, params=vpayload, cache=cache, timeout=self.TIMEOUT) if response.content: bad_kws[kw] = response.content if bad_kws: raise InvalidQueryError("Invalid query parameters: " "{0}".format(bad_kws))
def _get_dataarchive_url(self): """ If the generic ALMA URL is used, query it to determine which mirror to access for querying data """ if not hasattr(self, 'dataarchive_url'): if self.archive_url in ('', ''): response = self._request('GET', self.archive_url + "/aq", cache=False) response.raise_for_status() # Jan 2017: we have to force https because the archive doesn't # tell us it needs https. self.dataarchive_url = response.url.replace("/aq/", "").replace("http://", "https://") else: self.dataarchive_url = self.archive_url return self.dataarchive_url
[docs] def stage_data(self, uids): """ Stage ALMA data Parameters ---------- uids : list or str A list of valid UIDs or a single UID. UIDs should have the form: 'uid://A002/X391d0b/X7b' Returns ------- data_file_table : Table A table containing 3 columns: the UID, the file URL (for future downloading), and the file size """ """ With log.set_level(10) INFO: Staging files... [astroquery.alma.core] DEBUG: First request URL: [astroquery.alma.core] DEBUG: First request payload: {'dataset': [u'ALMA+uid___A002_X3b3400_X90f']} [astroquery.alma.core] DEBUG: First response URL: [astroquery.alma.core] DEBUG: Request ID: 3f98de33-197e-4692-9afa-496842032ea9 [astroquery.alma.core] DEBUG: Submission URL: [astroquery.alma.core] .DEBUG: Data list URL: [astroquery.alma.core] """ if isinstance(uids, six.string_types + (np.bytes_,)): uids = [uids] if not isinstance(uids, (list, tuple, np.ndarray)): raise TypeError("Datasets must be given as a list of strings.")"Staging files...") self._get_dataarchive_url() url = urljoin(self._get_dataarchive_url(), 'rh/submission') log.debug("First request URL: {0}".format(url)) # 'ALMA+uid___A002_X391d0b_X7b' payload = {'dataset': ['ALMA+' + clean_uid(uid) for uid in uids]} log.debug("First request payload: {0}".format(payload)) self._staging_log = {'first_post_url': url} # Request staging for the UIDs # This component cannot be cached, since the returned data can change # if new data are uploaded response = self._request('POST', url, data=payload, timeout=self.TIMEOUT, cache=False) self._staging_log['initial_response'] = response log.debug("First response URL: {0}".format(response.url)) if 'login' in response.url: raise ValueError("You must login before downloading this data set.") if response.status_code == 405: if hasattr(self, '_last_successful_staging_log'): log.warning("Error 405 received. If you have previously staged " "the same UIDs, the result returned is probably " "correct, otherwise you may need to create a fresh " "astroquery.Alma instance.") return self._last_successful_staging_log['result'] else: raise HTTPError("Received an error 405: this may indicate you " "have already staged the data. Try downloading " "the file URLs directly with download_files.") response.raise_for_status() if 'j_spring_cas_security_check' in response.url: time.sleep(1) # CANNOT cache this stage: it not a real data page! results in # infinite loops response = self._request('POST', url, data=payload, timeout=self.TIMEOUT, cache=False) self._staging_log['initial_response'] = response if 'j_spring_cas_security_check' in response.url: log.warning("Staging request was not successful. Try again?") response.raise_for_status() if 'j_spring_cas_security_check' in response.url: raise RemoteServiceError("Could not access data. This error " "can arise if the data are private and " "you do not have access rights or are " "not logged in.") request_id = response.url.split("/")[-2] self._staging_log['request_id'] = request_id log.debug("Request ID: {0}".format(request_id)) # Submit a request for the specific request ID identified above submission_url = urljoin(self._get_dataarchive_url(), url_helpers.join('rh/submission', request_id)) log.debug("Submission URL: {0}".format(submission_url)) self._staging_log['submission_url'] = submission_url staging_submission = self._request('GET', submission_url, cache=True) self._staging_log['staging_submission'] = staging_submission staging_submission.raise_for_status() data_page_url = staging_submission.url self._staging_log['data_page_url'] = data_page_url dpid = data_page_url.split("/")[-1] self._staging_log['staging_page_id'] = dpid # CANNOT cache this step: please_wait will happen infinitely data_page = self._request('GET', data_page_url, cache=False) self._staging_log['data_page'] = data_page data_page.raise_for_status() has_completed = False while not has_completed: time.sleep(1) summary = self._request('GET', url_helpers.join(data_page_url, 'summary'), cache=False) summary.raise_for_status() print(".", end='') sys.stdout.flush() has_completed = summary.json()['complete'] self._staging_log['summary'] = summary summary.raise_for_status() self._staging_log['json_data'] = json_data = summary.json() username = self.USERNAME if self.USERNAME else 'anonymous' # templates: # # 2013.1.00308.S_uid___A001_X196_X93_001_of_001.tar/2013.1.00308.S_uid___A001_X196_X93_001_of_001.tar # uid___A002_X9ee74a_X26f0/2013.1.00308.S_uid___A002_X9ee74a_X26f0.asdm.sdm.tar url_decomposed = urlparse(data_page_url) base_url = ('{uri.scheme}://{uri.netloc}/' 'dataPortal/requests/{username}/' '{staging_page_id}/ALMA'.format(uri=url_decomposed, staging_page_id=dpid, username=username, )) tbl = self._json_summary_to_table(json_data, base_url=base_url) self._staging_log['result'] = tbl self._staging_log['file_urls'] = tbl['URL'] self._last_successful_staging_log = self._staging_log return tbl
def _HEADER_data_size(self, files): """ Given a list of file URLs, return the data size. This is useful for assessing how much data you might be downloading! (This is discouraged by the ALMA archive, as it puts unnecessary load on their system) """ totalsize = 0 * u.B data_sizes = {} pb = ProgressBar(len(files)) for ii, fileLink in enumerate(files): response = self._request('HEAD', fileLink, stream=False, cache=False, timeout=self.TIMEOUT) filesize = (int(response.headers['content-length']) * u.B).to(u.GB) totalsize += filesize data_sizes[fileLink] = filesize log.debug("File {0}: size {1}".format(fileLink, filesize)) pb.update(ii + 1) response.raise_for_status() return data_sizes,
[docs] def download_files(self, files, savedir=None, cache=True, continuation=True): """ Given a list of file URLs, download them Note: Given a list with repeated URLs, each will only be downloaded once, so the return may have a different length than the input list """ downloaded_files = [] if savedir is None: savedir = self.cache_location for fileLink in unique(files): try: filename = self._request("GET", fileLink, save=True, savedir=savedir, timeout=self.TIMEOUT, cache=cache, continuation=continuation) downloaded_files.append(filename) except requests.HTTPError as ex: if ex.response.status_code == 401:"Access denied to {url}. Skipping to" " next file".format(url=fileLink)) continue else: raise ex return downloaded_files
[docs] def retrieve_data_from_uid(self, uids, cache=True): """ Stage & Download ALMA data. Will print out the expected file size before attempting the download. Parameters ---------- uids : list or str A list of valid UIDs or a single UID. UIDs should have the form: 'uid://A002/X391d0b/X7b' cache : bool Whether to cache the downloads. Returns ------- downloaded_files : list A list of the downloaded file paths """ if isinstance(uids, six.string_types + (np.bytes_,)): uids = [uids] if not isinstance(uids, (list, tuple, np.ndarray)): raise TypeError("Datasets must be given as a list of strings.") files = self.stage_data(uids) file_urls = files['URL'] totalsize = files['size'].sum() * files['size'].unit # each_size, totalsize = self.data_size(files)"Downloading files of size {0}...".format( # TODO: Add cache=cache keyword here. Currently would have no effect. downloaded_files = self.download_files(file_urls) return downloaded_files
def _parse_result(self, response, verbose=False): """ Parse a VOtable response """ if not verbose: commons.suppress_vo_warnings() if 'run?' in response.url: if response.text == "": raise RemoteServiceError("Empty return.") # this is a CSV-like table returned via a direct browser request table =, format='ascii.csv', fast_reader=False) else: fixed_content = self._hack_bad_arraysize_vofix(response.content) tf = six.BytesIO(fixed_content) vo_tree = votable.parse(tf, pedantic=False, invalid='mask') first_table = vo_tree.get_first_table() table = first_table.to_table(use_names_over_ids=True) return table def _hack_bad_arraysize_vofix(self, text): """ Hack to fix an error in the ALMA votables present in most 2016 and 2017 queries. The problem is that this entry: ' <FIELD name="Band" datatype="char" ID="32817" xtype="adql:VARCHAR" arraysize="0*">\r', has an invalid ``arraysize`` entry. Also, it returns a char, but it should be an int. As of February 2019, this issue appears to be half-fixed; the arraysize is no longer incorrect, but the data type remains incorrect. Since that problem was discovered and fixed, many other entries have the same error. Feb 2019, the other instances are gone. According to the IVOA, the tables are wrong, not A new issue, #1340, noted that 'Release date' and 'Mosaic' both lack data type metadata, necessitating the hack below """ lines = text.split(b"\n") newlines = [] for ln in lines: if b'FIELD name="Band"' in ln: ln = ln.replace(b'datatype="char"', b'datatype="int"') elif b'FIELD name="Release date"' in ln or b'FIELD name="Mosaic"' in ln: ln = ln.replace(b'/>', b' arraysize="*"/>') newlines.append(ln) return b"\n".join(newlines) def _login(self, username=None, store_password=False, reenter_password=False, auth_urls=['', '']): """ Login to the ALMA Science Portal. Parameters ---------- username : str, optional Username to the ALMA Science Portal. If not given, it should be specified in the config file. store_password : bool, optional Stores the password securely in your keyring. Default is False. reenter_password : bool, optional Asks for the password even if it is already stored in the keyring. This is the way to overwrite an already stored passwork on the keyring. Default is False. """ if username is None: if not self.USERNAME: raise LoginError("If you do not pass a username to login(), " "you should configure a default one!") else: username = self.USERNAME success = False for auth_url in auth_urls: # set session cookies (they do not get set otherwise) cookiesetpage = self._request("GET", urljoin(self._get_dataarchive_url(), 'rh/forceAuthentication'), cache=False) self._login_cookiepage = cookiesetpage cookiesetpage.raise_for_status() if (auth_url+'/cas/login' in cookiesetpage.request.url): # we've hit a target, we're good success = True break if not success: raise LoginError("Could not log in to any of the known ALMA " "authorization portals: {0}".format(auth_urls)) # Check if already logged in loginpage = self._request("GET", "https://{auth_url}/cas/login".format(auth_url=auth_url), cache=False) root = BeautifulSoup(loginpage.content, 'html5lib') if root.find('div', class_='success'):"Already logged in.") return True # Get password from keyring or prompt password, password_from_keyring = self._get_password( "astroquery:{0}".format(auth_url), username, reenter=reenter_password) # Authenticate"Authenticating {0} on {1} ...".format(username, auth_url)) # Do not cache pieces of the login process data = {kw: root.find('input', {'name': kw})['value'] for kw in ('lt', '_eventId', 'execution')} data['username'] = username data['password'] = password data['submit'] = 'LOGIN' login_response = self._request("POST", "https://{0}/cas/login".format(auth_url), params={'service': self._get_dataarchive_url()}, data=data, cache=False) # save the login response for debugging purposes self._login_response = login_response # do not expose password back to user del data['password'] # but save the parameters for debug purposes self._login_parameters = data authenticated = ('You have successfully logged in' in login_response.text) if authenticated:"Authentication successful!") self.USERNAME = username else: log.exception("Authentication failed!") # When authenticated, save password in keyring if needed if authenticated and password_from_keyring is None and store_password: keyring.set_password("astroquery:{0}".format(auth_url), username, password) return authenticated
[docs] def get_cycle0_uid_contents(self, uid): """ List the file contents of a UID from Cycle 0. Will raise an error if the UID is from cycle 1+, since those data have been released in a different and more consistent format. See for details. """ # First, check if UID is in the Cycle 0 listing if uid in self.cycle0_table['uid']: cycle0id = self.cycle0_table[ self.cycle0_table['uid'] == uid][0]['ID'] contents = [row['Files'] for row in self._cycle0_tarfile_content if cycle0id in row['ID']] return contents else: info_url = urljoin( self._get_dataarchive_url(), 'documents-and-tools/cycle-2/ALMAQA2Productsv1.01.pdf') raise ValueError("Not a Cycle 0 UID. See {0} for details about " "cycle 1+ data release formats.".format(info_url))
@property def _cycle0_tarfile_content(self): """ In principle, this is a static file, but we'll retrieve it just in case """ if not hasattr(self, '_cycle0_tarfile_content_table'): url = urljoin(self._get_dataarchive_url(), 'alma-data/archive/cycle-0-tarfile-content') response = self._request('GET', url, cache=True) # html.parser is needed because some <tr>'s have form: # <tr width="blah"> which the default parser does not pick up root = BeautifulSoup(response.content, 'html.parser') html_table = root.find('table', class_='grid listing') data = list(zip(*[(x.findAll('td')[0].text, x.findAll('td')[1].text) for x in html_table.findAll('tr')])) columns = [Column(data=data[0], name='ID'), Column(data=data[1], name='Files')] tbl = Table(columns) assert len(tbl) == 8497 self._cycle0_tarfile_content_table = tbl else: tbl = self._cycle0_tarfile_content_table return tbl @property def cycle0_table(self): """ Return a table of Cycle 0 Project IDs and associated UIDs. The table is distributed with astroquery and was provided by Felix Stoehr. """ if not hasattr(self, '_cycle0_table'): filename = resource_filename( 'astroquery.alma', 'data/cycle0_delivery_asdm_mapping.txt') self._cycle0_table =, format='ascii.no_header') self._cycle0_table.rename_column('col1', 'ID') self._cycle0_table.rename_column('col2', 'uid') return self._cycle0_table
[docs] def get_files_from_tarballs(self, downloaded_files, regex=r'.*\.fits$', path='cache_path', verbose=True): """ Given a list of successfully downloaded tarballs, extract files with names matching a specified regular expression. The default is to extract all FITS files Parameters ---------- downloaded_files : list A list of downloaded files. These should be paths on your local machine. regex : str A valid regular expression path : 'cache_path' or str If 'cache_path', will use the astroquery.Alma cache directory (``Alma.cache_location``), otherwise will use the specified path. Note that the subdirectory structure of the tarball will be maintained. Returns ------- filelist : list A list of the extracted file locations on disk """ if path == 'cache_path': path = self.cache_location elif not os.path.isdir(path): raise OSError("Specified an invalid path {0}.".format(path)) fitsre = re.compile(regex) filelist = [] for fn in downloaded_files: tf = for member in tf.getmembers(): if fitsre.match( if verbose:"Extracting {0} to {1}".format(, path)) tf.extract(member, path) filelist.append(os.path.join(path, return filelist
[docs] def download_and_extract_files(self, urls, delete=True, regex=r'.*\.fits$', include_asdm=False, path='cache_path', verbose=True): """ Given a list of tarball URLs: 1. Download the tarball 2. Extract all FITS files (or whatever matches the regex) 3. Delete the downloaded tarball See ``Alma.get_files_from_tarballs`` for details Parameters ---------- urls : str or list A single URL or a list of URLs include_asdm : bool Only affects cycle 1+ data. If set, the ASDM files will be downloaded in addition to the script and log files. By default, though, this file will be downloaded and deleted without extracting any information: you must change the regex if you want to extract data from an ASDM tarball """ if isinstance(urls, six.string_types): urls = [urls] if not isinstance(urls, (list, tuple, np.ndarray)): raise TypeError("Datasets must be given as a list of strings.") all_files = [] for url in urls: if url[-4:] != '.tar': raise ValueError("URLs should be links to tarballs.") tarfile_name = os.path.split(url)[-1] if tarfile_name in self._cycle0_tarfile_content['ID']: # It is a cycle 0 file: need to check if it contains FITS match = (self._cycle0_tarfile_content['ID'] == tarfile_name) if not any(re.match(regex, x) for x in self._cycle0_tarfile_content['Files'][match]):"No FITS files found in {0}".format(tarfile_name)) continue else: if 'asdm' in tarfile_name and not include_asdm:"ASDM tarballs do not contain FITS files; " "skipping.") continue try: tarball_name = self._request('GET', url, save=True, timeout=self.TIMEOUT) except requests.ConnectionError as ex: self.partial_file_list = all_files log.error("There was an error downloading the file. " "A partially completed download list is " "in Alma.partial_file_list") raise ex except requests.HTTPError as ex: if ex.response.status_code == 401:"Access denied to {url}. Skipping to" " next file".format(url=url)) continue else: raise ex fitsfilelist = self.get_files_from_tarballs([tarball_name], regex=regex, path=path, verbose=verbose) if delete:"Deleting {0}".format(tarball_name)) os.remove(tarball_name) all_files += fitsfilelist return all_files
[docs] def help(self, cache=True): """ Return the valid query parameters """ help_list = self._get_help_page(cache=cache) print("Valid ALMA keywords. Left column is the description, right " "column is the name of the keyword to pass to astroquery.alma" " queries:") for title, section in help_list: print() print(title) for row in section: if len(row) == 2: # text value name, payload_keyword = row print(" {0:33s}: {1:35s}".format(name, payload_keyword)) # elif len(row) == 3: # radio button # name,payload_keyword,value = row # print(" {0:33s}: {1:20s} = {2:15s}".format(name, # payload_keyword, # value)) elif len(row) == 4: # radio button or checkbox name, payload_keyword, checkbox, value = row if isinstance(checkbox, list): checkbox_str = ", ".join(["{0}={1}".format(x, y) for x, y in zip(checkbox, value)]) print(" {0:33s}: {1:20s} -> {2}" .format(name, payload_keyword, checkbox_str)) else: print(" {2} {0:29s}: {1:20s} = {3:15s}" .format(name, payload_keyword, checkbox, value)) else: raise ValueError("Wrong number of rows - ALMA query page" " did not parse properly.")
def _get_help_page(self, cache=True): if not hasattr(self, '_help_list') or not self._help_list: querypage = self._request( 'GET', self._get_dataarchive_url() + "/aq/", cache=cache, timeout=self.TIMEOUT) root = BeautifulSoup(querypage.content, "html5lib") sections = root.findAll('td', class_='category') whitespace = re.compile(r"\s+") help_list = [] for section in sections: title = section.find( 'div', class_='categorytitle').text.lstrip() help_section = (title, []) for inp in section.findAll('div', class_='inputdiv'): sp = inp.find('span') buttons = inp.findAll('input') for b in buttons: # old version:for=id=rawView; name=viewFormat # new version:for=id=rawView; name=result_view payload_keyword = b.attrs['name'] bid = b.attrs['id'] label = inp.find('label') if sp is not None: name = whitespace.sub(" ", sp.text) elif label.attrs['for'] == bid: name = whitespace.sub(" ", label.text) else: raise TableParseError("ALMA query page has" " an unrecognized entry") if b.attrs['type'] == 'text': help_section[1].append((name, payload_keyword)) elif b.attrs['type'] == 'radio': value = b.attrs['value'] if 'checked' in b.attrs: checked = b.attrs['checked'] == 'checked' checkbox = "(x)" if checked else "( )" else: checkbox = "( )" help_section[1].append((name, payload_keyword, checkbox, value)) elif b.attrs['type'] == 'checkbox': if 'checked' in b.attrs: checked = b.attrs['checked'] == 'checked' else: checked = False value = b.attrs['value'] checkbox = "[x]" if checked else "[ ]" help_section[1].append((name, payload_keyword, checkbox, value)) select = inp.find('select') if select is not None: options = [("".join(filter_printable(option.text)), option.attrs['value']) for option in select.findAll('option')] if sp is not None: name = whitespace.sub(" ", sp.text) else: name = select.attrs['name'] checkbox = [o[0] for o in options] value = [o[1] for o in options] option_str = select.attrs['name'] help_section[1].append((name, option_str, checkbox, value)) help_list.append(help_section) self._help_list = help_list return self._help_list def _validate_payload(self, payload): if not hasattr(self, '_valid_params'): help_list = self._get_help_page(cache=False) self._valid_params = [row[1] for title, section in help_list for row in section] if len(self._valid_params) == 0: raise ValueError("The query validation failed for unknown " "reasons. Try again?") # These parameters are entirely hidden, but Felix says they are # allowed self._valid_params.append('download') self._valid_params.append('format') self._valid_params.append('member_ous_id') invalid_params = [k for k in payload if k not in self._valid_params] if len(invalid_params) > 0: raise InvalidQueryError("The following parameters are not accepted" " by the ALMA query service:" " {0}".format(invalid_params)) def _parse_staging_request_page(self, data_list_page): """ Parse pages like this one: that include links to data sets that have been requested and staged Parameters ---------- data_list_page : requests.Response object """ root = BeautifulSoup(data_list_page.content, 'html5lib') data_table = root.findAll('table', class_='list', id='report')[0] columns = {'uid': [], 'URL': [], 'size': []} for tr in data_table.findAll('tr'): tds = tr.findAll('td') # Cannot check class if it is not defined cl = 'class' in tr.attrs if (len(tds) > 1 and 'uid' in tds[0].text and (cl and 'Level' in tr['class'][0])): # New Style text = tds[0].text.strip().split() if text[0] in ('Asdm', 'Member'): uid = text[-1] elif len(tds) > 1 and 'uid' in tds[1].text: # Old Style uid = tds[1].text.strip() elif cl and tr['class'] == 'Level_1': raise ValueError("Heading was found when parsing the download " "page but it was not parsed correctly") if len(tds) > 3 and (cl and tr['class'][0] == 'fileRow'): # New Style size, unit ='(-|[0-9\.]*)([A-Za-z]*)', tds[2].text).groups() href = tds[1].find('a') if size == '': # this is a header row continue authorized = ('access_authorized.png' in tds[3].findChild('img')['src']) if authorized: columns['uid'].append(uid) if href and 'href' in href.attrs: columns['URL'].append(href.attrs['href']) else: columns['URL'].append('None_Found') unit = (u.Unit(unit) if unit in ('GB', 'MB') else u.Unit('kB') if 'kb' in unit.lower() else 1) try: columns['size'].append(float(size) * u.Unit(unit)) except ValueError: # size is probably a string? columns['size'].append(-1 * u.byte) log.log(level=5, msg="Found a new-style entry. " "size={0} uid={1} url={2}" .format(size, uid, columns['URL'][-1])) else: log.warning("Access to {0} is not authorized.".format(uid)) elif len(tds) > 3 and tds[2].find('a'): # Old Style href = tds[2].find('a') size, unit ='([0-9\.]*)([A-Za-z]*)', tds[3].text).groups() columns['uid'].append(uid) columns['URL'].append(href.attrs['href']) unit = (u.Unit(unit) if unit in ('GB', 'MB') else u.Unit('kB') if 'kb' in unit.lower() else 1) columns['size'].append(float(size) * u.Unit(unit)) log.log(level=5, msg="Found an old-style entry. " "size={0} uid={1} url={2}".format(size, uid, columns['URL'][-1])) columns['size'] = u.Quantity(columns['size'], u.Gbyte) if len(columns['uid']) == 0: raise RemoteServiceError( "No valid UIDs were found in the staged data table. " "Please include {0} in a bug report." .format(self._staging_log['data_list_url'])) tbl = Table([Column(name=k, data=v) for k, v in iteritems(columns)]) return tbl def _json_summary_to_table(self, data, base_url): """ """ columns = {'uid': [], 'URL': [], 'size': []} for entry in data['node_data']: # de_type can be useful (e.g., MOUS), but it is not necessarily # specified # file_name and file_key *must* be specified. is_file = (entry['file_name'] != 'null' and entry['file_key'] != 'null') if is_file: # "de_name": "ALMA+uid://A001/X122/X35e", columns['uid'].append(entry['de_name'][5:]) if entry['file_size'] == 'null': columns['size'].append(np.nan * u.Gbyte) else: columns['size'].append( (int(entry['file_size']) * u.B).to(u.Gbyte)) # example template for constructing url: # # uid___A002_X9d6f4c_X154/2013.1.00546.S_uid___A002_X9d6f4c_X154.asdm.sdm.tar # above is WRONG... except for ASDMs, when it's right # should be: # 2013.1.00546.S_uid___A002_X9d6f4c_X154.asdm.sdm.tar/2013.1.00546.S_uid___A002_X9d6f4c_X154.asdm.sdm.tar # # apparently ASDMs are different from others: # templates: # # 2013.1.00308.S_uid___A001_X196_X93_001_of_001.tar/2013.1.00308.S_uid___A001_X196_X93_001_of_001.tar # uid___A002_X9ee74a_X26f0/2013.1.00308.S_uid___A002_X9ee74a_X26f0.asdm.sdm.tar url = url_helpers.join(base_url, entry['file_key'], entry['file_name']) if 'null' in url: raise ValueError("The URL {0} was created containing " "'null', which is invalid.".format(url)) columns['URL'].append(url) columns['size'] = u.Quantity(columns['size'], u.Gbyte) tbl = Table([Column(name=k, data=v) for k, v in iteritems(columns)]) return tbl
[docs] def get_project_metadata(self, projectid, cache=True): """ Get the metadata - specifically, the project abstract - for a given project ID. """ url = urljoin(self._get_dataarchive_url(), 'aq/') assert len(projectid) == 14, "Wrong length for project ID" assert projectid[4] == projectid[6] == projectid[12] == '.', "Wrong format for project ID" response = self._request('GET', "{0}meta/project/{1}".format(url, projectid), timeout=self.TIMEOUT, cache=cache) response.raise_for_status() return response.json()
Alma = AlmaClass() def clean_uid(uid): """ Return a uid with all unacceptable characters replaced with underscores """ if not hasattr(uid, 'replace'): return clean_uid(str(uid.astype('S'))) try: return uid.decode('utf-8').replace(u"/", u"_").replace(u":", u"_") except AttributeError: return uid.replace("/", "_").replace(":", "_") def reform_uid(uid): """ Convert a uid with underscores to the original format """ return uid[:3] + "://" + "/".join(uid[6:].split("_")) def unique(seq): """ Return unique elements of a list, preserving order """ seen = set() seen_add = seen.add return [x for x in seq if not (x in seen or seen_add(x))] def filter_printable(s): """ extract printable characters from a string """ return filter(lambda x: x in string.printable, s)