Source code for astroquery.utils.progressbar

# Licensed under a 3-clause BSD style license - see LICENSE.rst
import gzip
import sys
from io import StringIO
from urllib.request import build_opener
from astropy.io import fits


__all__ = ['chunk_report', 'chunk_read']


[docs] def chunk_report(bytes_so_far, chunk_size, total_size): if total_size > 0: percent = float(bytes_so_far) / total_size percent = round(percent * 100, 2) sys.stdout.write("Downloaded %12.2g of %12.2g Mb (%6.2f%%)\r" % (bytes_so_far / 1024. ** 2, total_size / 1024. ** 2, percent)) else: sys.stdout.write("Downloaded %10.2g Mb\r" % (bytes_so_far / 1024. ** 2))
[docs] def chunk_read(response, *, chunk_size=1024, report_hook=None): content_length = response.info().get('Content-Length') if content_length is None: total_size = 0 else: total_size = content_length.strip() total_size = int(total_size) bytes_so_far = 0 result_string = b"" # sys.stdout.write("Beginning download.\n") while True: chunk = response.read(chunk_size) result_string += chunk bytes_so_far += len(chunk) if not chunk: if report_hook: sys.stdout.write('\n') break if report_hook: report_hook(bytes_so_far, chunk_size, total_size) return result_string
def retrieve(url, outfile, *, opener=None, overwrite=False): """ "retrieve" (i.e., download to file) a URL. """ if opener is None: opener = build_opener() page = opener.open(url) results = chunk_read(page, report_hook=chunk_report) S = StringIO(results) try: fitsfile = fits.open(S, ignore_missing_end=True) except OSError: S.seek(0) G = gzip.GzipFile(fileobj=S) fitsfile = fits.open(G, ignore_missing_end=True) fitsfile.writeto(outfile, clobber=overwrite)