diff --git a/tex2pdf_service/bin/compile_submissions.py b/tex2pdf_service/bin/compile_submissions.py index adb82d8..9521e70 100644 --- a/tex2pdf_service/bin/compile_submissions.py +++ b/tex2pdf_service/bin/compile_submissions.py @@ -24,20 +24,18 @@ """ import hashlib +import json +import logging import os -import time -import typing +import sqlite3 +import tarfile +import threading +from multiprocessing.pool import ThreadPool from sqlite3 import Connection import click -import requests -import logging -import sqlite3 +from tex2pdf.remote_call import submit_tarball from tqdm import tqdm -from multiprocessing.pool import ThreadPool -import tarfile -import json -import threading logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s: %(message)s') @@ -54,16 +52,15 @@ def score_db(score_path: str) -> Connection: def cli() -> None: pass - def tarball_to_outcome_path(tarball: str) -> str: - """Map tarball to outcome file path""" + """Map tarball to outcome file path.""" parent_dir, filename = os.path.split(tarball) stem = filename[:-7] return os.path.join(parent_dir, "outcomes", "outcome-" + stem + ".tar.gz") -def get_outcome_meta(outcome_file: str) -> typing.Tuple[dict, typing.List[str], typing.List[str], typing.List[str], str]: - """Open a compressed outcome tar archive and get the metadata""" +def get_outcome_meta_and_files_info(outcome_file: str) -> tuple[dict, list[str], list[str], list[str], str]: + """Open a compressed outcome tar archive and get the metadata.""" meta = {} files = set() clsfiles = set() @@ -112,49 +109,21 @@ def get_outcome_meta(outcome_file: str) -> typing.Tuple[dict, typing.List[str], @click.option('--post-timeout', default=600, help='timeout for the complete post') @click.option('--threads', default=64, help='Number of threads requested for threadpool') def compile(submissions: str, service: str, score: str, tex2pdf_timeout: int, post_timeout: int, threads: int) -> None: - """Compile submissions in a directory""" + """Compile submissions in a directory.""" - def submit_tarball(tarball: str) -> None: + def local_submit_tarball(tarball: str) -> None: outcome_file = tarball_to_outcome_path(tarball) - if os.path.exists(outcome_file): - return - os.makedirs(os.path.dirname(outcome_file), exist_ok=True) - logging.info("File: %s", os.path.basename(tarball)) - meta = {} - status_code = None - - with open(tarball, "rb") as data_fd: - uploading = {'incoming': (os.path.basename(tarball), data_fd, 'application/gzip')} - while True: - try: - res = requests.post(service + f"?timeout={tex2pdf_timeout}", files=uploading, timeout=post_timeout, allow_redirects=False) - status_code = res.status_code - if status_code == 504: - logging.warning("Got 504 for %s", service) - time.sleep(1) - continue - - if status_code == 200: - if res.content: - with open(outcome_file, "wb") as out: - out.write(res.content) - meta, lines, clsfiles, styfiles, pdfchecksum = get_outcome_meta(outcome_file) - except TimeoutError: - logging.warning("%s: Connection timed out", tarball) - - except Exception as exc: - logging.warning("%s: %s", tarball, str(exc)) - break - - success = meta.get("status") == "success" - logging.log(logging.INFO if success else logging.WARNING, - "submit: %s (%s) %s", os.path.basename(tarball), str(status_code), success) + try: + submit_tarball(service, tarball, outcome_file, tex2pdf_timeout, post_timeout) + except FileExistsError: + logging.info(f"Not recreating already existing {outcome_file}.") + pass source_dir = os.path.expanduser(submissions) tarballs = [os.path.join(source_dir, tarball) for tarball in os.listdir(source_dir) if tarball.endswith(".tar.gz") and not tarball.startswith("outcome-")] logging.info("Got %d tarballs", len(tarballs)) with ThreadPool(processes=int(threads)) as pool: - pool.map(submit_tarball, tarballs) + pool.map(local_submit_tarball, tarballs) logging.info("Finished") @@ -164,7 +133,7 @@ def submit_tarball(tarball: str) -> None: @click.option('--update', default=False, help="Update scores") @click.option('--purge-failed', default=False, help="Purge failed outcomes") def register_outcomes(submissions: str, score: str, update: bool, purge_failed: bool) -> None: - """Register the outcomes to a score card db""" + """Register the outcomes to a score card db.""" submissions = os.path.expanduser(submissions) sdb = score_db(score) outcomes_dir = os.path.join(submissions, "outcomes") @@ -194,7 +163,7 @@ def register_outcomes(submissions: str, score: str, update: bool, purge_failed: outcome_file = tarball_to_outcome_path(tarball_path) if os.path.exists(outcome_file): try: - meta, files, clsfiles, styfiles, pdfchecksum = get_outcome_meta(outcome_file) + meta, files, clsfiles, styfiles, pdfchecksum = get_outcome_meta_and_files_info(outcome_file) pdf_file = meta.get("pdf_file") except Exception as exc: logging.warning("%s: %s - deleting outcome", outcome_file, str(exc)) diff --git a/tex2pdf_service/poetry.lock b/tex2pdf_service/poetry.lock index 94ad1a0..356fd68 100644 --- a/tex2pdf_service/poetry.lock +++ b/tex2pdf_service/poetry.lock @@ -772,8 +772,8 @@ pydantic = "==1.10.*" [package.source] type = "git" url = "https://github.com/arXiv/submission-tools.git" -reference = "ARXIVCE-2542-use-new-preflight" -resolved_reference = "eeb2327ef88cb77dcdd4e67099b58efb0effd6f2" +reference = "HEAD" +resolved_reference = "e665aed7daa77794115fee23e306c5e148e2dcf2" subdirectory = "preflight_parser" [[package]] @@ -1153,8 +1153,8 @@ toml = "^0.10.2" [package.source] type = "git" url = "https://github.com/arXiv/submission-tools.git" -reference = "ARXIVCE-2542-use-new-preflight" -resolved_reference = "eeb2327ef88cb77dcdd4e67099b58efb0effd6f2" +reference = "HEAD" +resolved_reference = "e665aed7daa77794115fee23e306c5e148e2dcf2" subdirectory = "tex_inspection" [[package]] @@ -1359,7 +1359,7 @@ files = [] develop = false [package.dependencies] -preflight_parser = {git = "https://github.com/arXiv/submission-tools.git", branch = "ARXIVCE-2542-use-new-preflight", subdirectory = "preflight_parser"} +preflight_parser = {git = "https://github.com/arXiv/submission-tools.git", subdirectory = "preflight_parser"} ruamel-yaml = "^0.18.5" toml = "^0.10.2" tomli_w = "^1.0" @@ -1367,11 +1367,11 @@ tomli_w = "^1.0" [package.source] type = "git" url = "https://github.com/arXiv/submission-tools.git" -reference = "ARXIVCE-2542-use-new-preflight" -resolved_reference = "eeb2327ef88cb77dcdd4e67099b58efb0effd6f2" +reference = "HEAD" +resolved_reference = "e665aed7daa77794115fee23e306c5e148e2dcf2" subdirectory = "zerozeroreadme" [metadata] lock-version = "2.0" python-versions = "^3.11" -content-hash = "0c1af3c635b1a4126ef297582f6ccc758c7c291c42687cf562148b571d7fde19" +content-hash = "8654f6602bc896c175fbf7d94828b2210804256c32baac483ec17a1266b0cf08" diff --git a/tex2pdf_service/pyproject.toml b/tex2pdf_service/pyproject.toml index de1da0d..666fd61 100644 --- a/tex2pdf_service/pyproject.toml +++ b/tex2pdf_service/pyproject.toml @@ -17,11 +17,12 @@ ruamel-yaml = "^0.18.5" pillow = "^10.4.0" python-multipart = "^0.0.6" psutil = "^5.9.8" -tex_inspection = {git = "https://github.com/arXiv/submission-tools.git", subdirectory = "tex_inspection", branch = "ARXIVCE-2542-use-new-preflight" } -preflight_parser = {git = "https://github.com/arXiv/submission-tools.git", subdirectory = "preflight_parser", branch = "ARXIVCE-2542-use-new-preflight" } -zerozeroreadme = {git = "https://github.com/arXiv/submission-tools.git", subdirectory = "zerozeroreadme", branch = "ARXIVCE-2542-use-new-preflight" } +tex_inspection = {git = "https://github.com/arXiv/submission-tools.git", subdirectory = "tex_inspection" } +preflight_parser = {git = "https://github.com/arXiv/submission-tools.git", subdirectory = "preflight_parser" } +zerozeroreadme = {git = "https://github.com/arXiv/submission-tools.git", subdirectory = "zerozeroreadme" } hypercorn = {extras = ["h2"], version = "^0.16.0"} pymupdf = "^1.24.10" +requests = "^2.32.0" [tool.poetry.group.dev.dependencies] pytest = "^8.3.3" @@ -31,7 +32,6 @@ mypy = "*" mypy-extensions = "*" tqdm = "^4.66.2" uvicorn = "^0.29.0" -requests = "^2.32.0" [build-system] requires = ["poetry-core>=1.0.0"] diff --git a/tex2pdf_service/tests/test_docker.py b/tex2pdf_service/tests/test_docker.py index 006b655..3d7ca04 100644 --- a/tex2pdf_service/tests/test_docker.py +++ b/tex2pdf_service/tests/test_docker.py @@ -1,13 +1,16 @@ +import json +import logging import os +import shutil +import subprocess import tempfile import time -import requests -import subprocess import urllib.parse + import pytest -from bin.compile_submissions import get_outcome_meta -import logging -import json +import requests +from bin.compile_submissions import get_outcome_meta_and_files_info +from tex2pdf.converter_driver import RemoteConverterDriver PORT = 33031 @@ -36,7 +39,7 @@ def submit_tarball(service: str, tarball: str, outcome_file: str, tex2pdf_timeou else: with open(outcome_file, "wb") as out: out.write(res.content) - meta, lines, clsfiles, styfiles, pdfchecksum = get_outcome_meta(outcome_file) + meta, lines, clsfiles, styfiles, pdfchecksum = get_outcome_meta_and_files_info(outcome_file) else: logging.warning(f"%s: status code %d", url, status_code) @@ -176,3 +179,19 @@ def test_api_preflight(docker_container): assert [f["filename"] for f in meta.get("detected_toplevel_files")] == ['fake-file-1.tex', 'fake-file-2.tex', 'fake-file-3.tex'] +@pytest.mark.integration +def test_remote2023(docker_container) -> None: + tarball = os.path.join(os.getcwd(), "tests", "fixture", "tarballs", "test1", "test1.tar.gz") + out_dir = os.path.join(os.getcwd(), "tests", "test-output", "test1-remote") + url = docker_container + "/convert/" + + logging.debug("Before instantiating the RemoteConverterDriver") + + shutil.rmtree(out_dir, ignore_errors=True) + + converter = RemoteConverterDriver(url, 600, out_dir, tarball, + use_addon_tree=False, + auto_detect=True) + logging.debug("Calling generate_pdf") + pdf = converter.generate_pdf() + assert pdf == "test1.pdf" diff --git a/tex2pdf_service/tex2pdf/converter_driver.py b/tex2pdf_service/tex2pdf/converter_driver.py index 7c20d8d..cd6af47 100644 --- a/tex2pdf_service/tex2pdf/converter_driver.py +++ b/tex2pdf_service/tex2pdf/converter_driver.py @@ -24,6 +24,7 @@ ) from tex2pdf.doc_converter import combine_documents, strip_to_basename from tex2pdf.pdf_watermark import Watermark, add_watermark_text_to_pdf +from tex2pdf.remote_call import submit_tarball, get_outcome_meta from tex2pdf.service_logger import get_logger from tex2pdf.tarball import ZZRMUnsupportedCompiler, ZZRMUnderspecified, chmod_775, unpack_tarball from tex2pdf.tex_patching import fix_tex_sources @@ -552,3 +553,33 @@ def unpack_outcome(self) -> dict[str, str | int | float | dict] | None: except Exception as _exc: pass return meta + +class RemoteConverterDriver(ConverterDriver): + """Uses compilation service for conversion.""" + + service: str + post_timeout: int + + def __init__(self, service: str, post_timeout: int, work_dir: str, source: str, **kwargs: typing.Any): + super().__init__(work_dir, source, **kwargs) + self.service = service + self.post_timeout = post_timeout + + def generate_pdf(self) -> str|None: + """We have the beef.""" + logger = get_logger() + self.t0 = time.perf_counter() + + local_tarball = os.path.join(self.work_dir, self.source) + outcome_file = os.path.join(self.work_dir, "outcome.tar.gz") + + # hard coded post_timeout = 600 as of now + # not sure I want to make this another init option + logger.debug("Submitting %s to %s with output to %s", local_tarball, self.service, outcome_file) + submit_tarball(self.service, local_tarball, outcome_file, int(self.max_time_budget), self.post_timeout, self.auto_detect) + logger.debug("Returned from posting") + self.outcome = get_outcome_meta(outcome_file) + + # TODO + # - discard (is this necessary?) the stuff we have created + return self.outcome.get("pdf_file") diff --git a/tex2pdf_service/tex2pdf/remote_call.py b/tex2pdf_service/tex2pdf/remote_call.py new file mode 100644 index 0000000..984fe67 --- /dev/null +++ b/tex2pdf_service/tex2pdf/remote_call.py @@ -0,0 +1,73 @@ +"""Compile a tarball via the service URL to an outcome file.""" + +import json +import logging +import os +import tarfile +import time + +import requests + + +def get_outcome_meta(outcome_file: str) -> dict: + """Open a compressed outcome tar archive and get the metadata.""" + meta = {} + with tarfile.open(outcome_file, "r:gz") as outcome: + for name in outcome.getnames(): + if name.startswith("outcome-") and name.endswith(".json"): + meta_contents = outcome.extractfile(name) + if meta_contents: + meta.update(json.load(meta_contents)) + return meta + + +def submit_tarball(service: str, tarball: str, outcome_file: str, tex2pdf_timeout: int, post_timeout: int, auto_detect: bool = False) -> bool: + """Submit tarball to compilation service.""" + if os.path.exists(outcome_file): + raise FileExistsError(f"Outcome file {outcome_file} already exists!") + os.makedirs(os.path.dirname(outcome_file), exist_ok=True) + logging.info("File: %s", os.path.basename(tarball)) + meta = {} + status_code = None + + with open(tarball, "rb") as data_fd: + uploading = {"incoming": (os.path.basename(tarball), data_fd, "application/gzip")} + while True: + try: + post_url = service + f"?timeout={tex2pdf_timeout}&auto_detect={auto_detect}" + logging.debug("POST URL: %s", post_url) + logging.debug("uploading = %s", uploading) + res = requests.post( + post_url, + files=uploading, + timeout=post_timeout, + allow_redirects=False, + ) + status_code = res.status_code + if status_code == 504: + logging.warning("Got 504 for %s", service) + time.sleep(1) + continue + + if status_code == 200: + if res.content: + with open(outcome_file, "wb") as out: + out.write(res.content) + meta = get_outcome_meta(outcome_file) + except TimeoutError: + logging.warning("%s: Connection timed out", tarball) + + except Exception as exc: + logging.warning("Exception submitting tarball: %s", exc) + logging.warning("%s: %s", tarball, str(exc)) + break + + success = meta.get("status") == "success" + logging.log( + logging.INFO if success else logging.WARNING, + "submit: %s (%s) %s", + os.path.basename(tarball), + str(status_code), + success, + ) + return success