Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] ARXIVCE-2614 Create remote converter driver #66

Draft
wants to merge 8 commits into
base: master
Choose a base branch
from
71 changes: 20 additions & 51 deletions tex2pdf_service/bin/compile_submissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,18 @@
"""

import hashlib
import json
import logging
import os
import time
import typing
import sqlite3
import tarfile
import threading
from multiprocessing.pool import ThreadPool
from sqlite3 import Connection

import click
import requests
import logging
import sqlite3
from tex2pdf.remote_call import submit_tarball
from tqdm import tqdm
from multiprocessing.pool import ThreadPool
import tarfile
import json
import threading

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s: %(message)s')

Expand All @@ -54,16 +52,15 @@ def score_db(score_path: str) -> Connection:
def cli() -> None:
pass


def tarball_to_outcome_path(tarball: str) -> str:
"""Map tarball to outcome file path"""
"""Map tarball to outcome file path."""
parent_dir, filename = os.path.split(tarball)
stem = filename[:-7]
return os.path.join(parent_dir, "outcomes", "outcome-" + stem + ".tar.gz")


def get_outcome_meta(outcome_file: str) -> typing.Tuple[dict, typing.List[str], typing.List[str], typing.List[str], str]:
"""Open a compressed outcome tar archive and get the metadata"""
def get_outcome_meta_and_files_info(outcome_file: str) -> tuple[dict, list[str], list[str], list[str], str]:
"""Open a compressed outcome tar archive and get the metadata."""
meta = {}
files = set()
clsfiles = set()
Expand Down Expand Up @@ -112,49 +109,21 @@ def get_outcome_meta(outcome_file: str) -> typing.Tuple[dict, typing.List[str],
@click.option('--post-timeout', default=600, help='timeout for the complete post')
@click.option('--threads', default=64, help='Number of threads requested for threadpool')
def compile(submissions: str, service: str, score: str, tex2pdf_timeout: int, post_timeout: int, threads: int) -> None:
"""Compile submissions in a directory"""
"""Compile submissions in a directory."""

def submit_tarball(tarball: str) -> None:
def local_submit_tarball(tarball: str) -> None:
outcome_file = tarball_to_outcome_path(tarball)
if os.path.exists(outcome_file):
return
os.makedirs(os.path.dirname(outcome_file), exist_ok=True)
logging.info("File: %s", os.path.basename(tarball))
meta = {}
status_code = None

with open(tarball, "rb") as data_fd:
uploading = {'incoming': (os.path.basename(tarball), data_fd, 'application/gzip')}
while True:
try:
res = requests.post(service + f"?timeout={tex2pdf_timeout}", files=uploading, timeout=post_timeout, allow_redirects=False)
status_code = res.status_code
if status_code == 504:
logging.warning("Got 504 for %s", service)
time.sleep(1)
continue

if status_code == 200:
if res.content:
with open(outcome_file, "wb") as out:
out.write(res.content)
meta, lines, clsfiles, styfiles, pdfchecksum = get_outcome_meta(outcome_file)
except TimeoutError:
logging.warning("%s: Connection timed out", tarball)

except Exception as exc:
logging.warning("%s: %s", tarball, str(exc))
break

success = meta.get("status") == "success"
logging.log(logging.INFO if success else logging.WARNING,
"submit: %s (%s) %s", os.path.basename(tarball), str(status_code), success)
try:
submit_tarball(service, tarball, outcome_file, tex2pdf_timeout, post_timeout)
except FileExistsError:
logging.info(f"Not recreating already existing {outcome_file}.")
pass

source_dir = os.path.expanduser(submissions)
tarballs = [os.path.join(source_dir, tarball) for tarball in os.listdir(source_dir) if tarball.endswith(".tar.gz") and not tarball.startswith("outcome-")]
logging.info("Got %d tarballs", len(tarballs))
with ThreadPool(processes=int(threads)) as pool:
pool.map(submit_tarball, tarballs)
pool.map(local_submit_tarball, tarballs)
logging.info("Finished")


Expand All @@ -164,7 +133,7 @@ def submit_tarball(tarball: str) -> None:
@click.option('--update', default=False, help="Update scores")
@click.option('--purge-failed', default=False, help="Purge failed outcomes")
def register_outcomes(submissions: str, score: str, update: bool, purge_failed: bool) -> None:
"""Register the outcomes to a score card db"""
"""Register the outcomes to a score card db."""
submissions = os.path.expanduser(submissions)
sdb = score_db(score)
outcomes_dir = os.path.join(submissions, "outcomes")
Expand Down Expand Up @@ -194,7 +163,7 @@ def register_outcomes(submissions: str, score: str, update: bool, purge_failed:
outcome_file = tarball_to_outcome_path(tarball_path)
if os.path.exists(outcome_file):
try:
meta, files, clsfiles, styfiles, pdfchecksum = get_outcome_meta(outcome_file)
meta, files, clsfiles, styfiles, pdfchecksum = get_outcome_meta_and_files_info(outcome_file)
pdf_file = meta.get("pdf_file")
except Exception as exc:
logging.warning("%s: %s - deleting outcome", outcome_file, str(exc))
Expand Down
16 changes: 8 additions & 8 deletions tex2pdf_service/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions tex2pdf_service/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@ ruamel-yaml = "^0.18.5"
pillow = "^10.4.0"
python-multipart = "^0.0.6"
psutil = "^5.9.8"
tex_inspection = {git = "https://github.com/arXiv/submission-tools.git", subdirectory = "tex_inspection", branch = "ARXIVCE-2542-use-new-preflight" }
preflight_parser = {git = "https://github.com/arXiv/submission-tools.git", subdirectory = "preflight_parser", branch = "ARXIVCE-2542-use-new-preflight" }
zerozeroreadme = {git = "https://github.com/arXiv/submission-tools.git", subdirectory = "zerozeroreadme", branch = "ARXIVCE-2542-use-new-preflight" }
tex_inspection = {git = "https://github.com/arXiv/submission-tools.git", subdirectory = "tex_inspection" }
preflight_parser = {git = "https://github.com/arXiv/submission-tools.git", subdirectory = "preflight_parser" }
zerozeroreadme = {git = "https://github.com/arXiv/submission-tools.git", subdirectory = "zerozeroreadme" }
hypercorn = {extras = ["h2"], version = "^0.16.0"}
pymupdf = "^1.24.10"
requests = "^2.32.0"

[tool.poetry.group.dev.dependencies]
pytest = "^8.3.3"
Expand All @@ -31,7 +32,6 @@ mypy = "*"
mypy-extensions = "*"
tqdm = "^4.66.2"
uvicorn = "^0.29.0"
requests = "^2.32.0"

[build-system]
requires = ["poetry-core>=1.0.0"]
Expand Down
31 changes: 25 additions & 6 deletions tex2pdf_service/tests/test_docker.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
import json
import logging
import os
import shutil
import subprocess
import tempfile
import time
import requests
import subprocess
import urllib.parse

import pytest
from bin.compile_submissions import get_outcome_meta
import logging
import json
import requests
from bin.compile_submissions import get_outcome_meta_and_files_info
from tex2pdf.converter_driver import RemoteConverterDriver

PORT = 33031

Expand Down Expand Up @@ -36,7 +39,7 @@ def submit_tarball(service: str, tarball: str, outcome_file: str, tex2pdf_timeou
else:
with open(outcome_file, "wb") as out:
out.write(res.content)
meta, lines, clsfiles, styfiles, pdfchecksum = get_outcome_meta(outcome_file)
meta, lines, clsfiles, styfiles, pdfchecksum = get_outcome_meta_and_files_info(outcome_file)
else:
logging.warning(f"%s: status code %d", url, status_code)

Expand Down Expand Up @@ -176,3 +179,19 @@ def test_api_preflight(docker_container):
assert [f["filename"] for f in meta.get("detected_toplevel_files")] == ['fake-file-1.tex', 'fake-file-2.tex', 'fake-file-3.tex']


@pytest.mark.integration
def test_remote2023(docker_container) -> None:
tarball = os.path.join(os.getcwd(), "tests", "fixture", "tarballs", "test1", "test1.tar.gz")
out_dir = os.path.join(os.getcwd(), "tests", "test-output", "test1-remote")
url = docker_container + "/convert/"

logging.debug("Before instantiating the RemoteConverterDriver")

shutil.rmtree(out_dir, ignore_errors=True)

converter = RemoteConverterDriver(url, 600, out_dir, tarball,
use_addon_tree=False,
auto_detect=True)
logging.debug("Calling generate_pdf")
pdf = converter.generate_pdf()
assert pdf == "test1.pdf"
31 changes: 31 additions & 0 deletions tex2pdf_service/tex2pdf/converter_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
)
from tex2pdf.doc_converter import combine_documents, strip_to_basename
from tex2pdf.pdf_watermark import Watermark, add_watermark_text_to_pdf
from tex2pdf.remote_call import submit_tarball, get_outcome_meta
from tex2pdf.service_logger import get_logger
from tex2pdf.tarball import ZZRMUnsupportedCompiler, ZZRMUnderspecified, chmod_775, unpack_tarball
from tex2pdf.tex_patching import fix_tex_sources
Expand Down Expand Up @@ -552,3 +553,33 @@ def unpack_outcome(self) -> dict[str, str | int | float | dict] | None:
except Exception as _exc:
pass
return meta

class RemoteConverterDriver(ConverterDriver):
"""Uses compilation service for conversion."""

service: str
post_timeout: int

def __init__(self, service: str, post_timeout: int, work_dir: str, source: str, **kwargs: typing.Any):
super().__init__(work_dir, source, **kwargs)
self.service = service
self.post_timeout = post_timeout

def generate_pdf(self) -> str|None:
"""We have the beef."""
logger = get_logger()
self.t0 = time.perf_counter()

local_tarball = os.path.join(self.work_dir, self.source)
outcome_file = os.path.join(self.work_dir, "outcome.tar.gz")

# hard coded post_timeout = 600 as of now
# not sure I want to make this another init option
logger.debug("Submitting %s to %s with output to %s", local_tarball, self.service, outcome_file)
submit_tarball(self.service, local_tarball, outcome_file, int(self.max_time_budget), self.post_timeout, self.auto_detect)
logger.debug("Returned from posting")
self.outcome = get_outcome_meta(outcome_file)

# TODO
# - discard (is this necessary?) the stuff we have created
return self.outcome.get("pdf_file")
73 changes: 73 additions & 0 deletions tex2pdf_service/tex2pdf/remote_call.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
"""Compile a tarball via the service URL to an outcome file."""

import json
import logging
import os
import tarfile
import time

import requests


def get_outcome_meta(outcome_file: str) -> dict:
"""Open a compressed outcome tar archive and get the metadata."""
meta = {}
with tarfile.open(outcome_file, "r:gz") as outcome:
for name in outcome.getnames():
if name.startswith("outcome-") and name.endswith(".json"):
meta_contents = outcome.extractfile(name)
if meta_contents:
meta.update(json.load(meta_contents))
return meta


def submit_tarball(service: str, tarball: str, outcome_file: str, tex2pdf_timeout: int, post_timeout: int, auto_detect: bool = False) -> bool:
"""Submit tarball to compilation service."""
if os.path.exists(outcome_file):
raise FileExistsError(f"Outcome file {outcome_file} already exists!")
os.makedirs(os.path.dirname(outcome_file), exist_ok=True)
logging.info("File: %s", os.path.basename(tarball))
meta = {}
status_code = None

with open(tarball, "rb") as data_fd:
uploading = {"incoming": (os.path.basename(tarball), data_fd, "application/gzip")}
while True:
try:
post_url = service + f"?timeout={tex2pdf_timeout}&auto_detect={auto_detect}"
logging.debug("POST URL: %s", post_url)
logging.debug("uploading = %s", uploading)
res = requests.post(
post_url,
files=uploading,
timeout=post_timeout,
allow_redirects=False,
)
status_code = res.status_code
if status_code == 504:
logging.warning("Got 504 for %s", service)
time.sleep(1)
continue

if status_code == 200:
if res.content:
with open(outcome_file, "wb") as out:
out.write(res.content)
meta = get_outcome_meta(outcome_file)
except TimeoutError:
logging.warning("%s: Connection timed out", tarball)

except Exception as exc:
logging.warning("Exception submitting tarball: %s", exc)
logging.warning("%s: %s", tarball, str(exc))
break

success = meta.get("status") == "success"
logging.log(
logging.INFO if success else logging.WARNING,
"submit: %s (%s) %s",
os.path.basename(tarball),
str(status_code),
success,
)
return success
Loading