Skip to content

Commit

Permalink
sync_published_to_gcp.py: Adds retrys to PDF get and upload to GS
Browse files Browse the repository at this point in the history
  • Loading branch information
bdc34 committed Oct 10, 2023
1 parent 31cba01 commit 6972fb7
Showing 1 changed file with 61 additions and 39 deletions.
100 changes: 61 additions & 39 deletions script/sync_prod_to_gcp/sync_published_to_gcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,11 @@

from pathlib import Path

from google.api_core import retry
from google.cloud.storage.retry import DEFAULT_RETRY as STORAGE_RETRY

from identifier import Identifier

from digester import get_file_mtime

overall_start = perf_counter()
Expand All @@ -59,6 +63,25 @@

import logging_json


class Overloaded503Exception(Exception):
"""Raised when the response to /pdf is a 503, indicating a need to slow down calls to server."""
pass


class WaitTimeout(Exception):
"""The wait for the PDF to show up in /cache/ps_cache greater than PDF_WAIT_SEC."""
pass


class NoPdfFile(Exception):
"""Raised when the PDF is not in /cache/ps_cache"""
pass


PDF_RETRY_EXCEPTIONS = [Overloaded503Exception, WaitTimeout, NoPdfFile,
requests.exceptions.ConnectionError,
requests.exceptions.Timeout]
CATEGORY = "category"

GS_BUCKET = 'arxiv-production-data'
Expand Down Expand Up @@ -267,6 +290,7 @@ def path_to_bucket_key(pdf) -> str:
raise ValueError(f"Cannot convert PDF path {pdf} to a GS key")


@retry.Retry(predicate=retry.if_exception_type(PDF_RETRY_EXCEPTIONS))
def ensure_pdf(session, host, arxiv_id):
"""Ensures PDF exits for arxiv_id.
Expand All @@ -282,59 +306,57 @@ def ensure_pdf(session, host, arxiv_id):
This does not check if the arxiv_id is PDF source.
"""
archive = ('arxiv' if not arxiv_id.is_old_id else arxiv_id.archive)
pdf_file = Path(f"{PS_CACHE_PREFIX}/{archive}/pdf/{arxiv_id.yymm}/{arxiv_id.filename}v{arxiv_id.version}.pdf")
url = f"https://{host}/pdf/{arxiv_id.filename}v{arxiv_id.version}.pdf"

def pdf_cache_path(arxiv_id) -> Path:
"""Gets the PDF file in the ps_cache. Returns Path object."""
archive = ('arxiv' if not arxiv_id.is_old_id else arxiv_id.archive)
return Path(f"{PS_CACHE_PREFIX}/{archive}/pdf/{arxiv_id.yymm}/{arxiv_id.filename}v{arxiv_id.version}.pdf")

def arxiv_pdf_url(host, arxiv_id) -> str:
"""Gets the URL that would be used to request the pdf for the arxiv_id"""
return f"https://{host}/pdf/{arxiv_id.filename}v{arxiv_id.version}.pdf"
start = perf_counter()

pdf_file, url = pdf_cache_path(arxiv_id), arxiv_pdf_url(host, arxiv_id)
if pdf_file.exists():
logger.debug(f"ensure_file_url_exists: {str(pdf_file)} already exists")
return pdf_file, url, "already exists", ms_since(start)

start = perf_counter()

if not pdf_file.exists():
start = perf_counter()
headers = {'User-Agent': ENSURE_UA}
logger.debug("Getting %s", url)
resp = session.get(url, headers=headers, stream=True, verify=ENSURE_CERT_VERIFY)
# noinspection PyStatementEffect
[line for line in resp.iter_lines()] # Consume resp in hopes of keeping alive session
if resp.status_code != 200:
msg = f"ensure_pdf: GET status {resp.status_code} {url}"
headers = {'User-Agent': ENSURE_UA}
logger.debug("Getting %s", url)
resp = session.get(url, headers=headers, stream=True, verify=ENSURE_CERT_VERIFY)
# noinspection PyStatementEffect
[line for line in resp.iter_lines()] # Consume resp in hopes of keeping alive session
if resp.status_code == 503:
msg = f"ensure_pdf: GET status 503, server overloaded {url}"
logger.warning(msg,
extra={CATEGORY: "download",
"url": url, "status_code": resp.status_code, "pdf_file": str(pdf_file)})
raise Overloaded503Exception(msg)
if resp.status_code != 200:
msg = f"ensure_pdf: GET status {resp.status_code} {url}"
logger.warning(msg,
extra={CATEGORY: "download",
"url": url, "status_code": resp.status_code, "pdf_file": str(pdf_file)})
raise (Exception(msg))
start_wait = perf_counter()
while not pdf_file.exists():
if perf_counter() - start_wait > PDF_WAIT_SEC:
msg = f"No PDF, waited longer than {PDF_WAIT_SEC} sec {url}"
logger.warning(msg,
extra={CATEGORY: "download",
"url": url, "status_code": resp.status_code, "pdf_file": str(pdf_file)})
raise (Exception(msg))
start_wait = perf_counter()
while not pdf_file.exists():
if perf_counter() - start_wait > PDF_WAIT_SEC:
msg = f"No PDF, waited longer than {PDF_WAIT_SEC} sec {url}"
logger.warning(msg,
extra={CATEGORY: "download",
"url": url, "pdf_file": str(pdf_file)})
raise (Exception(msg))
else:
sleep(0.2)
if pdf_file.exists():
logger.debug(
f"ensure_file_url_exists: {str(pdf_file)} requested {url} status_code {resp.status_code} {ms_since(start)} ms")
return (pdf_file, url, None, ms_since(start))
"url": url, "pdf_file": str(pdf_file)})
raise (WaitTimeout(msg))
else:
raise (Exception(f"ensure_pdf: Could not create {pdf_file}. {url} {ms_since(start)} ms"))
sleep(0.2)
if pdf_file.exists():
logger.debug(
f"ensure_file_url_exists: {str(pdf_file)} requested {url} status_code {resp.status_code} {ms_since(start)} ms")
return pdf_file, url, None, ms_since(start)
else:
logger.debug(f"ensure_file_url_exists: {str(pdf_file)} already exists")
return (pdf_file, url, "already exists", ms_since(start))
raise (NoPdfFile(f"ensure_pdf: Could not create {pdf_file}. {url} {ms_since(start)} ms"))


def upload_pdf(gs_client, ensure_tuple):
"""Uploads a PDF from ps_cache to GS_BUCKET"""
return upload(gs_client, ensure_tuple[0], path_to_bucket_key(ensure_tuple[0])) + ensure_tuple


@STORAGE_RETRY
def upload(gs_client, localpath, key):
"""Upload a file to GS_BUCKET"""

Expand Down

0 comments on commit 6972fb7

Please sign in to comment.