From 6972fb76fc81577e58a6a1da272d46c6d5fb6a1b Mon Sep 17 00:00:00 2001 From: "Brian D. Caruso" Date: Tue, 10 Oct 2023 17:56:10 -0400 Subject: [PATCH] sync_published_to_gcp.py: Adds retrys to PDF get and upload to GS --- .../sync_prod_to_gcp/sync_published_to_gcp.py | 100 +++++++++++------- 1 file changed, 61 insertions(+), 39 deletions(-) diff --git a/script/sync_prod_to_gcp/sync_published_to_gcp.py b/script/sync_prod_to_gcp/sync_published_to_gcp.py index e1d2c5183..e1ed2e65d 100644 --- a/script/sync_prod_to_gcp/sync_published_to_gcp.py +++ b/script/sync_prod_to_gcp/sync_published_to_gcp.py @@ -43,7 +43,11 @@ from pathlib import Path +from google.api_core import retry +from google.cloud.storage.retry import DEFAULT_RETRY as STORAGE_RETRY + from identifier import Identifier + from digester import get_file_mtime overall_start = perf_counter() @@ -59,6 +63,25 @@ import logging_json + +class Overloaded503Exception(Exception): + """Raised when the response to /pdf is a 503, indicating a need to slow down calls to server.""" + pass + + +class WaitTimeout(Exception): + """The wait for the PDF to show up in /cache/ps_cache greater than PDF_WAIT_SEC.""" + pass + + +class NoPdfFile(Exception): + """Raised when the PDF is not in /cache/ps_cache""" + pass + + +PDF_RETRY_EXCEPTIONS = [Overloaded503Exception, WaitTimeout, NoPdfFile, + requests.exceptions.ConnectionError, + requests.exceptions.Timeout] CATEGORY = "category" GS_BUCKET = 'arxiv-production-data' @@ -267,6 +290,7 @@ def path_to_bucket_key(pdf) -> str: raise ValueError(f"Cannot convert PDF path {pdf} to a GS key") +@retry.Retry(predicate=retry.if_exception_type(PDF_RETRY_EXCEPTIONS)) def ensure_pdf(session, host, arxiv_id): """Ensures PDF exits for arxiv_id. @@ -282,59 +306,57 @@ def ensure_pdf(session, host, arxiv_id): This does not check if the arxiv_id is PDF source. """ + archive = ('arxiv' if not arxiv_id.is_old_id else arxiv_id.archive) + pdf_file = Path(f"{PS_CACHE_PREFIX}/{archive}/pdf/{arxiv_id.yymm}/{arxiv_id.filename}v{arxiv_id.version}.pdf") + url = f"https://{host}/pdf/{arxiv_id.filename}v{arxiv_id.version}.pdf" - def pdf_cache_path(arxiv_id) -> Path: - """Gets the PDF file in the ps_cache. Returns Path object.""" - archive = ('arxiv' if not arxiv_id.is_old_id else arxiv_id.archive) - return Path(f"{PS_CACHE_PREFIX}/{archive}/pdf/{arxiv_id.yymm}/{arxiv_id.filename}v{arxiv_id.version}.pdf") - - def arxiv_pdf_url(host, arxiv_id) -> str: - """Gets the URL that would be used to request the pdf for the arxiv_id""" - return f"https://{host}/pdf/{arxiv_id.filename}v{arxiv_id.version}.pdf" + start = perf_counter() - pdf_file, url = pdf_cache_path(arxiv_id), arxiv_pdf_url(host, arxiv_id) + if pdf_file.exists(): + logger.debug(f"ensure_file_url_exists: {str(pdf_file)} already exists") + return pdf_file, url, "already exists", ms_since(start) start = perf_counter() - - if not pdf_file.exists(): - start = perf_counter() - headers = {'User-Agent': ENSURE_UA} - logger.debug("Getting %s", url) - resp = session.get(url, headers=headers, stream=True, verify=ENSURE_CERT_VERIFY) - # noinspection PyStatementEffect - [line for line in resp.iter_lines()] # Consume resp in hopes of keeping alive session - if resp.status_code != 200: - msg = f"ensure_pdf: GET status {resp.status_code} {url}" + headers = {'User-Agent': ENSURE_UA} + logger.debug("Getting %s", url) + resp = session.get(url, headers=headers, stream=True, verify=ENSURE_CERT_VERIFY) + # noinspection PyStatementEffect + [line for line in resp.iter_lines()] # Consume resp in hopes of keeping alive session + if resp.status_code == 503: + msg = f"ensure_pdf: GET status 503, server overloaded {url}" + logger.warning(msg, + extra={CATEGORY: "download", + "url": url, "status_code": resp.status_code, "pdf_file": str(pdf_file)}) + raise Overloaded503Exception(msg) + if resp.status_code != 200: + msg = f"ensure_pdf: GET status {resp.status_code} {url}" + logger.warning(msg, + extra={CATEGORY: "download", + "url": url, "status_code": resp.status_code, "pdf_file": str(pdf_file)}) + raise (Exception(msg)) + start_wait = perf_counter() + while not pdf_file.exists(): + if perf_counter() - start_wait > PDF_WAIT_SEC: + msg = f"No PDF, waited longer than {PDF_WAIT_SEC} sec {url}" logger.warning(msg, extra={CATEGORY: "download", - "url": url, "status_code": resp.status_code, "pdf_file": str(pdf_file)}) - raise (Exception(msg)) - start_wait = perf_counter() - while not pdf_file.exists(): - if perf_counter() - start_wait > PDF_WAIT_SEC: - msg = f"No PDF, waited longer than {PDF_WAIT_SEC} sec {url}" - logger.warning(msg, - extra={CATEGORY: "download", - "url": url, "pdf_file": str(pdf_file)}) - raise (Exception(msg)) - else: - sleep(0.2) - if pdf_file.exists(): - logger.debug( - f"ensure_file_url_exists: {str(pdf_file)} requested {url} status_code {resp.status_code} {ms_since(start)} ms") - return (pdf_file, url, None, ms_since(start)) + "url": url, "pdf_file": str(pdf_file)}) + raise (WaitTimeout(msg)) else: - raise (Exception(f"ensure_pdf: Could not create {pdf_file}. {url} {ms_since(start)} ms")) + sleep(0.2) + if pdf_file.exists(): + logger.debug( + f"ensure_file_url_exists: {str(pdf_file)} requested {url} status_code {resp.status_code} {ms_since(start)} ms") + return pdf_file, url, None, ms_since(start) else: - logger.debug(f"ensure_file_url_exists: {str(pdf_file)} already exists") - return (pdf_file, url, "already exists", ms_since(start)) + raise (NoPdfFile(f"ensure_pdf: Could not create {pdf_file}. {url} {ms_since(start)} ms")) def upload_pdf(gs_client, ensure_tuple): """Uploads a PDF from ps_cache to GS_BUCKET""" return upload(gs_client, ensure_tuple[0], path_to_bucket_key(ensure_tuple[0])) + ensure_tuple - +@STORAGE_RETRY def upload(gs_client, localpath, key): """Upload a file to GS_BUCKET"""