Source code for astroquery.esa.jwst.core

# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""
=======================
eJWST Astroquery Module
=======================

European Space Astronomy Centre (ESAC)
European Space Agency (ESA)

"""

import binascii
import gzip
import os
import shutil
import tarfile as esatar
import zipfile
from datetime import datetime, timezone
from urllib.parse import urlencode

import astroquery.esa.utils.utils as esautils

from astropy import log
from astropy import units
from astropy.coordinates import Angle, SkyCoord
from astropy.table import vstack
from astropy.units import Quantity
from requests.exceptions import ConnectionError

from astroquery.exceptions import RemoteServiceError
from astroquery.ipac.ned import Ned
from astroquery.query import BaseQuery
from astroquery.simbad import Simbad
from astroquery.utils import commons
from astroquery.utils.tap import TapPlus
from astroquery.vizier import Vizier
from . import conf
from .data_access import JwstDataHandler


__all__ = ['Jwst', 'JwstClass']


# We do trust the ESA tar files, this is to avoid the new to Python 3.12 deprecation warning
# https://docs.python.org/3.12/library/tarfile.html#tarfile-extraction-filter
if hasattr(esatar, "fully_trusted_filter"):
    esatar.TarFile.extraction_filter = staticmethod(esatar.fully_trusted_filter)


[docs] class JwstClass(BaseQuery): """ Proxy class to default TapPlus object (pointing to JWST Archive) """ JWST_DEFAULT_COLUMNS = ['observationid', 'calibrationlevel', 'public', 'dataproducttype', 'instrument_name', 'energy_bandpassname', 'target_name', 'target_ra', 'target_dec', 'position_bounds_center', 'position_bounds_spoly'] PLANE_DATAPRODUCT_TYPES = ['image', 'cube', 'measurements', 'spectrum'] ARTIFACT_PRODUCT_TYPES = ['info', 'thumbnail', 'auxiliary', 'science', 'preview'] INSTRUMENT_NAMES = ['NIRISS', 'NIRSPEC', 'NIRCAM', 'MIRI', 'FGS'] TARGET_RESOLVERS = ['ALL', 'SIMBAD', 'NED', 'VIZIER'] CAL_LEVELS = ['ALL', 1, 2, 3, -1] REQUESTED_OBSERVATION_ID = "Missing required argument: 'observation_id'" def __init__(self, *, tap_plus_handler=None, data_handler=None, show_messages=True): if tap_plus_handler is None: self.__jwsttap = TapPlus(url=conf.JWST_TAP_SERVER, data_context='data', client_id='ASTROQUERY') else: self.__jwsttap = tap_plus_handler if data_handler is None: self.__jwstdata = JwstDataHandler( base_url=conf.JWST_DATA_SERVER) else: self.__jwstdata = data_handler if show_messages: self.get_status_messages()
[docs] def load_tables(self, *, only_names=False, include_shared_tables=False, verbose=False): """Loads all public tables TAP & TAP+ Parameters ---------- only_names : bool, TAP+ only, optional, default 'False' True to load table names only include_shared_tables : bool, TAP+, optional, default 'False' True to include shared tables verbose : bool, optional, default 'False' flag to display information about the process Returns ------- A list of table objects """ return self.__jwsttap.load_tables(only_names=only_names, include_shared_tables=include_shared_tables, verbose=verbose)
[docs] def load_table(self, table, *, verbose=False): """Loads the specified table TAP+ only Parameters ---------- table : str, mandatory full qualified table name (i.e. schema name + table name) verbose : bool, optional, default 'False' flag to display information about the process Returns ------- A table object """ return self.__jwsttap.load_table(table, verbose=verbose)
[docs] def launch_job(self, query, *, name=None, output_file=None, output_format="votable", verbose=False, dump_to_file=False, background=False, upload_resource=None, upload_table_name=None, async_job=False): """Launches a synchronous or asynchronous job TAP & TAP+ Parameters ---------- query : str, mandatory query to be executed name : str, optional, default None name of the job to be executed output_file : str, optional, default None file name where the results are saved if dumpToFile is True. If this parameter is not provided, the jobid is used instead output_format : str, optional, default 'votable' results format. Options are: 'votable': str, binary VOTable format 'csv': str, comma-separated values format 'fits': str, FITS format verbose : bool, optional, default 'False' flag to display information about the process dump_to_file : bool, optional, default 'False' if True, the results are saved in a file instead of using memory background : bool, optional, default 'False' when the job is executed in asynchronous mode, this flag specifies whether the execution will wait until results are available upload_resource: str, optional, default None resource to be uploaded to UPLOAD_SCHEMA upload_table_name: str, required if uploadResource is provided Default None resource temporary table name associated to the uploaded resource async_job: bool, optional, default 'False' tag to execute the job in sync or async mode Returns ------- A Job object """ if async_job: return (self.__jwsttap.launch_job_async(query=query, name=name, output_file=output_file, output_format=output_format, verbose=verbose, dump_to_file=dump_to_file, background=background, upload_resource=upload_resource, upload_table_name=upload_table_name)) else: return self.__jwsttap.launch_job(query=query, name=name, output_file=output_file, output_format=output_format, verbose=verbose, dump_to_file=dump_to_file, upload_resource=upload_resource, upload_table_name=upload_table_name)
[docs] def load_async_job(self, *, jobid=None, name=None, verbose=False): """Loads an asynchronous job TAP & TAP+ Parameters ---------- jobid : str, mandatory if no name is provided, default None job identifier name : str, mandatory if no jobid is provided, default None job name verbose : bool, optional, default 'False' flag to display information about the process Returns ------- A Job object """ return self.__jwsttap.load_async_job(jobid=jobid, name=name, verbose=verbose)
[docs] def search_async_jobs(self, *, jobfilter=None, verbose=False): """Searches for jobs applying the specified filter TAP+ only Parameters ---------- jobfilter : JobFilter, optional, default None job filter verbose : bool, optional, default 'False' flag to display information about the process Returns ------- A list of Job objects """ return self.__jwsttap.search_async_jobs(jobfilter=jobfilter, verbose=verbose)
[docs] def list_async_jobs(self, *, verbose=False): """Returns all the asynchronous jobs TAP & TAP+ Parameters ---------- verbose : bool, optional, default 'False' flag to display information about the process Returns ------- A list of Job objects """ return self.__jwsttap.list_async_jobs(verbose=verbose)
[docs] def query_region(self, coordinate, *, radius=None, width=None, height=None, observation_id=None, cal_level="Top", prod_type=None, instrument_name=None, filter_name=None, proposal_id=None, only_public=False, show_all_columns=False, async_job=False, verbose=False): """Launches a query region job in sync/async mode TAP & TAP+ Parameters ---------- coordinate : astropy.coordinate, mandatory coordinates center point radius : astropy.units, required if no 'width' nor 'height' are provided radius (deg) width : astropy.units, required if no 'radius' is provided box width height : astropy.units, required if no 'radius' is provided box height observation_id : str, optional, default None get the observation given by its ID. cal_level : object, optional, default 'Top' get the planes with the given calibration level. Options are: 'Top': str, only the planes with the highest calibration level 1,2,3: int, the given calibration level prod_type : str, optional, default None get the observations providing the given product type. Options are: 'image','cube','measurements','spectrum': str, only results of the given product type instrument_name : str, optional, default None get the observations corresponding to the given instrument name. Options are: 'NIRISS', 'NIRSPEC', 'NIRCAM', 'MIRI', 'FGS': str, only results of the given instrument filter_name : str, optional, default None get the observations made with the given filter. proposal_id : str, optional, default None get the observations from the given proposal ID. show_all_columns : bool, optional, default 'False' flag to show all available columns in the output. Default behaviour is to show the most representative columns only only_public : bool, optional, default 'False' flag to show only metadata corresponding to public observations async_job : bool, optional, default 'False' executes the query (job) in asynchronous/synchronous mode (default synchronous) verbose : bool, optional, default 'False' flag to display information about the process Returns ------- The job results (astropy.table). """ coord = self.__get_coord_input(value=coordinate, msg="coordinate") job = None if radius is not None: job = self.cone_search(coordinate=coord, radius=radius, only_public=only_public, observation_id=observation_id, cal_level=cal_level, prod_type=prod_type, instrument_name=instrument_name, filter_name=filter_name, proposal_id=proposal_id, show_all_columns=show_all_columns, async_job=async_job, verbose=verbose) else: raHours, dec = commons.coord_to_radec(coord) ra = raHours * 15.0 # Converts to degrees widthQuantity = self.__get_quantity_input(value=width, msg="width") heightQuantity = self.__get_quantity_input(value=height, msg="height") widthDeg = widthQuantity.to(units.deg) heightDeg = heightQuantity.to(units.deg) obsid_cond = self.__get_observationid_condition(value=observation_id) cal_level_condition = self.__get_callevel_condition(cal_level=cal_level) public_condition = self.__get_public_condition(only_public=only_public) prod_cond = self.__get_plane_dataproducttype_condition(prod_type=prod_type) instr_cond = self.__get_instrument_name_condition(value=instrument_name) filter_name_cond = self.__get_filter_name_condition(value=filter_name) props_id_cond = self.__get_proposal_id_condition(value=proposal_id) columns = str(', '.join(self.JWST_DEFAULT_COLUMNS)) if show_all_columns: columns = '*' query = (f"SELECT DISTANCE(POINT('ICRS'," f"{str(conf.JWST_MAIN_TABLE_RA)}," f"{str(conf.JWST_MAIN_TABLE_DEC)} ), " f"POINT('ICRS',{str(ra)},{str(dec)} )) " f"AS dist, {columns} " f"FROM {str(conf.JWST_MAIN_TABLE)} " f"WHERE CONTAINS(" f"POINT('ICRS'," f"{str(conf.JWST_MAIN_TABLE_RA)}," f"{str(conf.JWST_MAIN_TABLE_DEC)})," f"BOX('ICRS',{str(ra)},{str(dec)}, " f"{str(widthDeg.value)}, " f"{str(heightDeg.value)}))=1 " f"{obsid_cond}" f"{cal_level_condition}" f"{public_condition}" f"{prod_cond}" f"{instr_cond}" f"{filter_name_cond}" f"{props_id_cond}" f"ORDER BY dist ASC") if verbose: print(query) if async_job: job = self.__jwsttap.launch_job_async(query=query, verbose=verbose) else: job = self.__jwsttap.launch_job(query=query, verbose=verbose) return job.get_results()
[docs] def query_target(self, target_name, *, target_resolver="ALL", radius=None, width=None, height=None, observation_id=None, cal_level="Top", prod_type=None, instrument_name=None, filter_name=None, proposal_id=None, only_public=False, show_all_columns=False, async_job=False, verbose=False): """Searches for a specific target defined by its name and other parameters TAP & TAP+ Parameters ---------- target_name : str, mandatory name of the target that will be used as center point target_resolver : str, optional, default ALL resolver used to associate the target name with its coordinates. The ALL option evaluates a "SIMBAD then NED then VIZIER" approach. Options are: ALL, SIMBAD, NED, VIZIER. radius : astropy.units, required if no 'width' nor 'height' are provided. radius (deg) width : astropy.units, required if no 'radius' is provided box width height : astropy.units, required if no 'radius' is provided box height observation_id : str, optional, default None get the observation given by its ID. cal_level : object, optional, default 'Top' get the planes with the given calibration level. Options are: 'Top': str, only the planes with the highest calibration level 1,2,3: int, the given calibration level prod_type : str, optional, default None get the observations providing the given product type. Options are: 'image','cube','measurements','spectrum': str, only results of the given product type instrument_name : str, optional, default None get the observations corresponding to the given instrument name. Options are: 'NIRISS', 'NIRSPEC', 'NIRCAM', 'MIRI', 'FGS': str, only results of the given instrument filter_name : str, optional, default None get the observations made with the given filter. proposal_id : str, optional, default None get the observations from the given proposal ID. only_public : bool, optional, default 'False' flag to show only metadata corresponding to public observations show_all_columns : bool, optional, default 'False' flag to show all available columns in the output. Default behaviour is to show the most representative columns only async_job : bool, optional, default 'False' executes the query (job) in asynchronous/synchronous mode (default synchronous) verbose : bool, optional, default 'False' flag to display information about the process Returns ------- The job results (astropy.table). """ coordinates = self.resolve_target_coordinates(target_name=target_name, target_resolver=target_resolver) return self.query_region(coordinate=coordinates, radius=radius, width=width, height=height, observation_id=observation_id, cal_level=cal_level, prod_type=prod_type, instrument_name=instrument_name, filter_name=filter_name, proposal_id=proposal_id, only_public=only_public, async_job=async_job, show_all_columns=show_all_columns, verbose=verbose)
[docs] def resolve_target_coordinates(self, target_name, target_resolver): if target_resolver not in self.TARGET_RESOLVERS: raise ValueError("This target resolver is not allowed") result_table = None if target_resolver == "ALL" or target_resolver == "SIMBAD": try: result_table = Simbad.query_object(target_name) return SkyCoord((f'{result_table["RA"][0]} ' f'{result_table["DEC"][0]}'), unit=(units.hourangle, units.deg), frame="icrs") except (KeyError, TypeError, ConnectionError): log.info("SIMBAD could not resolve this target") if target_resolver == "ALL" or target_resolver == "NED": try: result_table = Ned.query_object(target_name) return SkyCoord(result_table["RA"][0], result_table["DEC"][0], unit="deg", frame="fk5") except (RemoteServiceError, KeyError, ConnectionError): log.info("NED could not resolve this target") if target_resolver == "ALL" or target_resolver == "VIZIER": try: result_table = Vizier.query_object(target_name, catalog="II/336/apass9")[0] # Sorted to use the record with the least uncertainty result_table.sort(["e_RAJ2000", "e_DEJ2000"]) return SkyCoord(result_table["RAJ2000"][0], result_table["DEJ2000"][0], unit="deg", frame="fk5") except (IndexError, AttributeError, ConnectionError): log.info("VIZIER could not resolve this target") if result_table is None: raise ValueError(f"This target name cannot be determined with" f" this resolver: {target_resolver}")
[docs] def remove_jobs(self, jobs_list, *, verbose=False): """Removes the specified jobs TAP+ Parameters ---------- jobs_list : str, mandatory jobs identifiers to be removed verbose : bool, optional, default 'False' flag to display information about the process """ return self.__jwsttap.remove_jobs(jobs_list=jobs_list, verbose=verbose)
[docs] def save_results(self, job, *, verbose=False): """Saves job results TAP & TAP+ Parameters ---------- job : Job, mandatory job verbose : bool, optional, default 'False' flag to display information about the process """ return self.__jwsttap.save_results(job=job, verbose=verbose)
[docs] def login(self, *, user=None, password=None, credentials_file=None, token=None, verbose=False): """Performs a login. TAP+ only User and password can be used or a file that contains user name and password (2 lines: one for user name and the following one for the password) Parameters ---------- user : str, mandatory if 'file' is not provided, default None login name password : str, mandatory if 'file' is not provided, default None user password credentials_file : str, mandatory if no 'user' & 'password' are provided file containing user and password in two lines token: str, optional MAST token to have access to propietary data verbose : bool, optional, default 'False' flag to display information about the process """ self.__jwsttap.login(user=user, password=password, credentials_file=credentials_file, verbose=verbose) if token: self.set_token(token=token)
[docs] def logout(self, *, verbose=False): """Performs a logout TAP+ only Parameters ---------- verbose : bool, optional, default 'False' flag to display information about the process """ return self.__jwsttap.logout(verbose=verbose)
[docs] def set_token(self, token): """Links a MAST token to the logged user Parameters ---------- token: str, mandatory MAST token to have access to propietary data """ subContext = conf.JWST_TOKEN data = urlencode({"token": token}) connHandler = self.__jwsttap._TapPlus__getconnhandler() response = connHandler.execute_secure(subcontext=subContext, data=data, verbose=True) if response.status == 403: print("ERROR: MAST tokens cannot be assigned or requested by anonymous users") elif response.status == 500: print("ERROR: Server error when setting the token") else: print("MAST token has been set successfully")
[docs] def get_status_messages(self): """Retrieve the messages to inform users about the status of JWST TAP """ try: subContext = conf.JWST_MESSAGES connHandler = self.__jwsttap._TapPlus__getconnhandler() response = connHandler.execute_tapget(subContext, verbose=False) if response.status == 200: for line in response: string_message = line.decode("utf-8") print(string_message[string_message.index('=') + 1:]) except OSError: print("Status messages could not be retrieved")
[docs] def get_product_list(self, *, observation_id=None, cal_level="ALL", product_type=None): """Get the list of products of a given JWST observation_id. Parameters ---------- observation_id : str, mandatory Observation identifier. cal_level : str or int, optional Calibration level. Default value is 'ALL', to download all the products associated to this observation_id and lower processing levels. Requesting more accurate levels than the one associated to the observation_id is not allowed (as level 3 observations are composite products based on level 2 products). To request upper levels, please use get_related_observations functions first. Possible values: 'ALL', 3, 2, 1, -1 product_type : str, optional, default None List only products of the given type. If None, all products are listed. Possible values: 'thumbnail', 'preview', 'info', 'auxiliary', 'science'. Returns ------- The list of products (astropy.table). """ self.__validate_cal_level(cal_level=cal_level) if observation_id is None: raise ValueError(self.REQUESTED_OBSERVATION_ID) plane_ids, max_cal_level = self._get_plane_id(observation_id=observation_id) if cal_level == 3 and cal_level > max_cal_level: raise ValueError("Requesting upper levels is not allowed") list = self._get_associated_planes(plane_ids=plane_ids, cal_level=cal_level, max_cal_level=max_cal_level, is_url=False) query = (f"select distinct a.uri, a.artifactid, a.filename, " f"a.contenttype, a.producttype, p.calibrationlevel, " f"p.public FROM {conf.JWST_PLANE_TABLE} p JOIN " f"{conf.JWST_ARTIFACT_TABLE} a ON (p.planeid=a.planeid) " f"WHERE a.planeid IN {list}" f"{self.__get_artifact_producttype_condition(product_type=product_type)};") job = self.__jwsttap.launch_job(query=query) return job.get_results()
def __validate_cal_level(self, cal_level): if (cal_level not in self.CAL_LEVELS): raise ValueError("This calibration level is not valid") def _get_associated_planes(self, plane_ids, cal_level, max_cal_level, is_url): if (cal_level == max_cal_level): if (not is_url): list = "('{}')".format("', '".join(plane_ids)) else: list = "{}".format(",".join(plane_ids)) return list else: plane_list = [] for plane_id in plane_ids: siblings = self.__get_sibling_planes(planeid=plane_id, cal_level=cal_level) members = self.__get_member_planes(planeid=plane_id, cal_level=cal_level) plane_id_table = vstack([siblings, members]) plane_list.extend(plane_id_table['product_planeid']) if (not is_url): list = "('{}')".format("', '".join(plane_list)) else: list = "{}".format(",".join(plane_list)) return list def _get_plane_id(self, observation_id): try: planeids = [] query_plane = (f"select distinct m.planeid, m.calibrationlevel " f"from {conf.JWST_MAIN_TABLE} m where " f"m.observationid = '{observation_id}'") job = self.__jwsttap.launch_job(query=query_plane) job.get_results().sort(["calibrationlevel"]) job.get_results().reverse() max_cal_level = job.get_results()["calibrationlevel"][0] for row in job.get_results(): if row["calibrationlevel"] == max_cal_level: planeids.append( JwstClass.get_decoded_string(row["planeid"])) return planeids, max_cal_level except Exception: raise ValueError("This observation_id does not exist in " "JWST database") def __get_sibling_planes(self, planeid, *, cal_level='ALL'): where_clause = "" if (cal_level == "ALL"): where_clause = "WHERE sp.calibrationlevel<=p.calibrationlevel "\ "AND p.planeid =" else: where_clause = (f"WHERE sp.calibrationlevel={cal_level} AND " f"p.planeid =") try: query_siblings = (f"SELECT o.observationuri, p.planeid, " f"p.calibrationlevel, sp.planeid as " f"product_planeid, sp.calibrationlevel as " f"product_level FROM " f"{conf.JWST_OBSERVATION_TABLE} o JOIN " f"{conf.JWST_PLANE_TABLE} p ON " f"p.obsid=o.obsid JOIN " f"{conf.JWST_PLANE_TABLE} sp ON " f"sp.obsid=o.obsid {where_clause}'{planeid}'") job = self.__jwsttap.launch_job(query=query_siblings) return job.get_results() except Exception as e: raise ValueError(e) def __get_member_planes(self, planeid, *, cal_level='ALL'): where_clause = "" if (cal_level == "ALL"): where_clause = "WHERE p.planeid =" else: where_clause = (f"WHERE mp.calibrationlevel={cal_level} AND " f"p.planeid =") try: query_members = (f"SELECT o.observationuri, p.planeid, " f"p.calibrationlevel, mp.planeid as " f"product_planeid, mp.calibrationlevel as " f"product_level FROM " f"{conf.JWST_OBSERVATION_TABLE} o JOIN " f"{conf.JWST_PLANE_TABLE} p on " f"o.obsid=p.obsid JOIN " f"{conf.JWST_OBS_MEMBER_TABLE} m on " f"o.obsid=m.parentid JOIN " f"{conf.JWST_OBSERVATION_TABLE} " f"mo on m.memberid=mo.observationuri JOIN " f"{conf.JWST_PLANE_TABLE} mp on " f"mo.obsid=mp.obsid " f"{where_clause}'{planeid}'") job = self.__jwsttap.launch_job(query=query_members) return job.get_results() except Exception as e: raise ValueError(e)
[docs] def get_product(self, *, artifact_id=None, file_name=None): """Get a JWST product given its Artifact ID or File name. Parameters ---------- artifact_id : str, mandatory (if no file_name is provided) Artifact ID of the product. file_name : str, mandatory (if no artifact_id is provided) Returns ------- local_path : str Returns the local path that the file was downloaded to. """ params_dict = {} params_dict['RETRIEVAL_TYPE'] = 'PRODUCT' params_dict['TAPCLIENT'] = 'ASTROQUERY' self.__check_product_input(artifact_id=artifact_id, file_name=file_name) if file_name is None: try: output_file_name = self._query_get_product(artifact_id=artifact_id) err_msg = str(artifact_id) except Exception as exx: raise ValueError(f"Cannot retrieve product for artifact_id {artifact_id}: {exx}") else: output_file_name = str(file_name) err_msg = str(file_name) if artifact_id is not None: params_dict['ARTIFACTID'] = str(artifact_id) else: try: params_dict['ARTIFACTID'] = (self._query_get_product( file_name=file_name)) except Exception as exx: raise ValueError(f"Cannot retrieve product for file_name {file_name}: {exx}") try: self.__jwsttap.load_data(params_dict=params_dict, output_file=output_file_name) except Exception as exx: log.info("error") raise ValueError(f"Error retrieving product for {err_msg}: {exx}") return esautils.check_rename_to_gz(filename=output_file_name)
def _query_get_product(self, *, artifact_id=None, file_name=None): if file_name: query_artifactid = (f"select * from {conf.JWST_ARTIFACT_TABLE} " f"a where a.filename = " f"'{file_name}'") job = self.__jwsttap.launch_job(query=query_artifactid) return JwstClass.get_decoded_string( job.get_results()['artifactid'][0]) else: query_filename = (f"select * from {conf.JWST_ARTIFACT_TABLE} a " f"where a.artifactid = " f"'{artifact_id}'") job = self.__jwsttap.launch_job(query=query_filename) return JwstClass.get_decoded_string( job.get_results()['filename'][0]) def __check_product_input(self, artifact_id, file_name): if artifact_id is None and file_name is None: raise ValueError("Missing required argument: " "'artifact_id' or 'file_name'")
[docs] def get_obs_products(self, *, observation_id=None, cal_level="ALL", product_type=None, output_file=None): """Get a JWST product given its observation ID. Parameters ---------- observation_id : str, mandatory Observation identifier. cal_level : str or int, optional Calibration level. Default value ia 'ALL', to download all the products associated to this observation_id and lower levels. Requesting more accurate levels than the one associated to the observation_id is not allowed (as level 3 observations are composite products based on level 2 products). To request upper levels, please use get_related_observations functions first. Possible values: 'ALL', 3, 2, 1, -1 product_type : str, optional, default None List only products of the given type. If None, all products are \ listed. Possible values: 'thumbnail', 'preview', 'auxiliary', \ 'science'. output_file : str, optional Output file. If no value is provided, a temporary one is created. Returns ------- local_path : str Returns the local path where the product(s) are saved. """ if observation_id is None: raise ValueError(self.REQUESTED_OBSERVATION_ID) plane_ids, max_cal_level = self._get_plane_id(observation_id=observation_id) if (cal_level == 3 and cal_level > max_cal_level): raise ValueError("Requesting upper levels is not allowed") params_dict = {} params_dict['RETRIEVAL_TYPE'] = 'OBSERVATION' params_dict['TAPCLIENT'] = 'ASTROQUERY' plane_ids = self._get_associated_planes(plane_ids=plane_ids, cal_level=cal_level, max_cal_level=max_cal_level, is_url=True) params_dict['planeid'] = plane_ids self.__set_additional_parameters(param_dict=params_dict, cal_level=cal_level, max_cal_level=max_cal_level, product_type=product_type) output_file_full_path, output_dir = self.__set_dirs(output_file=output_file, observation_id=observation_id) # Get file name only output_file_name = os.path.basename(output_file_full_path) try: self.__jwsttap.load_data(params_dict=params_dict, output_file=output_file_full_path) except Exception as exx: raise ValueError(f"Cannot retrieve products for observation {observation_id}: {exx}") files = [] self.__extract_file(output_file_full_path=output_file_full_path, output_dir=output_dir, files=files) if (files): return files self.__check_file_number(output_dir=output_dir, output_file_name=output_file_name, output_file_full_path=output_file_full_path, files=files) return files
def __check_file_number(self, output_dir, output_file_name, output_file_full_path, files): num_files_in_dir = len(os.listdir(output_dir)) if num_files_in_dir == 1: if output_file_name.endswith("_all_products"): p = output_file_name.rfind('_all_products') output_f = output_file_name[0:p] else: output_f = output_file_name output_full_path = output_dir + os.sep + output_f os.rename(output_file_full_path, output_full_path) files.append(output_full_path) else: # r=root, d=directories, f = files for r, d, f in os.walk(output_dir): for file in f: if file != output_file_name: files.append(os.path.join(r, file)) def __extract_file(self, output_file_full_path, output_dir, files): if esatar.is_tarfile(output_file_full_path): with esatar.open(output_file_full_path) as tar_ref: tar_ref.extractall(path=output_dir) elif zipfile.is_zipfile(output_file_full_path): with zipfile.ZipFile(output_file_full_path, 'r') as zip_ref: zip_ref.extractall(output_dir) elif not JwstClass.is_gz_file(output_file_full_path): # single file: return it files.append(output_file_full_path) return files def __set_dirs(self, output_file, observation_id): if output_file is None: now = datetime.now(timezone.utc) formatted_now = now.strftime("%Y%m%d_%H%M%S") output_dir = os.getcwd() + os.sep + "temp_" + \ formatted_now output_file_full_path = output_dir + os.sep + observation_id +\ "_all_products" else: output_file_full_path = output_file output_dir = os.path.dirname(output_file_full_path) try: os.makedirs(output_dir, exist_ok=True) except OSError as err: raise OSError(f"Creation of the directory {output_dir} failed: {err.strerror}") return output_file_full_path, output_dir def __set_additional_parameters(self, param_dict, cal_level, max_cal_level, product_type): if cal_level is not None: self.__validate_cal_level(cal_level=cal_level) if (cal_level == max_cal_level or cal_level == 2): param_dict['calibrationlevel'] = 'SELECTED' elif (cal_level == 1): param_dict['calibrationlevel'] = 'LEVEL1ONLY' else: param_dict['calibrationlevel'] = cal_level if product_type is not None: param_dict['product_type'] = str(product_type) def __get_quantity_input(self, value, msg): if value is None: raise ValueError(f"Missing required argument: '{msg}'") if not (isinstance(value, str) or isinstance(value, units.Quantity)): raise ValueError(f"{msg} must be either a string or units.Quantity") if isinstance(value, str): q = Quantity(value) return q else: return value def __get_coord_input(self, value, msg): if not (isinstance(value, str) or isinstance(value, commons.CoordClasses)): raise ValueError(f"{msg} must be either a string or astropy.coordinates") if isinstance(value, str): c = commons.parse_coordinates(value) return c else: return value def __get_observationid_condition(self, *, value=None): condition = "" if (value is not None): if (not isinstance(value, str)): raise ValueError("observation_id must be string") else: condition = f" AND observationid LIKE '{value.lower()}' " return condition def __get_callevel_condition(self, cal_level): condition = "" if (cal_level is not None): if (isinstance(cal_level, str) and cal_level == 'Top'): condition = " AND max_cal_level=calibrationlevel " elif (isinstance(cal_level, int)): condition = f" AND calibrationlevel={str(cal_level)} " else: raise ValueError("cal_level must be either " "'Top' or an integer") return condition def __get_public_condition(self, only_public): condition = "" if (not isinstance(only_public, bool)): raise ValueError("only_public must be boolean") elif (only_public is True): condition = " AND public='true' " return condition def __get_plane_dataproducttype_condition(self, *, prod_type=None): condition = "" if prod_type is not None: if not isinstance(prod_type, str): raise ValueError("prod_type must be string") elif str(prod_type).lower() not in self.PLANE_DATAPRODUCT_TYPES: raise ValueError("prod_type must be one of: {str(', '.join(self.PLANE_DATAPRODUCT_TYPES))}") else: condition = f" AND dataproducttype ILIKE '%{prod_type.lower()}%' " return condition def __get_instrument_name_condition(self, *, value=None): condition = "" if (value is not None): if (not isinstance(value, str)): raise ValueError("instrument_name must be string") elif (str(value).upper() not in self.INSTRUMENT_NAMES): raise ValueError(f"instrument_name must be one of: {str(', '.join(self.INSTRUMENT_NAMES))}") else: condition = f" AND instrument_name ILIKE '%{value.upper()}%' " return condition def __get_filter_name_condition(self, *, value=None): condition = "" if (value is not None): if (not isinstance(value, str)): raise ValueError("filter_name must be string") else: condition = f" AND energy_bandpassname ILIKE '%{value}%' " return condition def __get_proposal_id_condition(self, *, value=None): condition = "" if (value is not None): if (not isinstance(value, str)): raise ValueError("proposal_id must be string") else: condition = f" AND proposal_id ILIKE '%{value}%' " return condition def __get_artifact_producttype_condition(self, *, product_type=None): condition = "" if (product_type is not None): if (not isinstance(product_type, str)): raise ValueError("product_type must be string") elif (product_type not in self.ARTIFACT_PRODUCT_TYPES): raise ValueError(f"product_type must be one of: {str(', '.join(self.ARTIFACT_PRODUCT_TYPES))}") else: condition = f" AND producttype ILIKE '%{product_type}%'" return condition
[docs] @staticmethod def is_gz_file(filepath): with open(filepath, 'rb') as test_f: return binascii.hexlify(test_f.read(2)) == b'1f8b'
[docs] @staticmethod def gzip_uncompress(input_file, output_file): with open(output_file, 'wb') as f_out, gzip.open(input_file, 'rb') as f_in: shutil.copyfileobj(f_in, f_out)
[docs] @staticmethod def gzip_uncompress_and_rename_single_file(input_file): output_dir = os.path.dirname(input_file) file = os.path.basename(input_file) output_decompressed_file = output_dir + os.sep + file + "_decompressed" JwstClass.gzip_uncompress(input_file=input_file, output_file=output_decompressed_file) # Remove uncompressed file and rename decompressed file to the # original one os.remove(input_file) if file.lower().endswith(".gz"): # remove .gz new_file_name = file[:len(file) - 3] output = output_dir + os.sep + new_file_name else: output = input_file os.rename(output_decompressed_file, output) return output
[docs] @staticmethod def get_decoded_string(str): try: return str.decode('utf-8') # return str except (UnicodeDecodeError, AttributeError): return str
Jwst = JwstClass()