diff --git a/examples/convert_markdown_to_jsonl.py b/examples/convert_markdown_to_jsonl.py index 11415eb..30bd461 100644 --- a/examples/convert_markdown_to_jsonl.py +++ b/examples/convert_markdown_to_jsonl.py @@ -1,4 +1,5 @@ """Converts a directory of markdown files to JSONL files.""" + from __future__ import annotations import json diff --git a/examples/convert_parquet_to_jsonl.py b/examples/convert_parquet_to_jsonl.py index 47e0313..08e9d10 100644 --- a/examples/convert_parquet_to_jsonl.py +++ b/examples/convert_parquet_to_jsonl.py @@ -1,4 +1,5 @@ """Converts a directory of markdown files to JSONL files.""" + from __future__ import annotations import json diff --git a/examples/oreo/polaris-scaling/ian-imports2.oreo.yaml b/examples/oreo/polaris-scaling/ian-imports2.oreo.yaml index b1ef758..f06d76c 100644 --- a/examples/oreo/polaris-scaling/ian-imports2.oreo.yaml +++ b/examples/oreo/polaris-scaling/ian-imports2.oreo.yaml @@ -62,4 +62,3 @@ compute_settings: queue: R1800955 # The amount of time to request for your job walltime: "24:00:00" - diff --git a/examples/oreo/polaris-scaling/oreo.polaris.nodes128.yaml b/examples/oreo/polaris-scaling/oreo.polaris.nodes128.yaml index a39abaa..1b5a7fe 100644 --- a/examples/oreo/polaris-scaling/oreo.polaris.nodes128.yaml +++ b/examples/oreo/polaris-scaling/oreo.polaris.nodes128.yaml @@ -65,4 +65,3 @@ compute_settings: queue: run_next # The amount of time to request for your job walltime: "1:00:00" - diff --git a/examples/oreo/polaris-scaling/oreo.polaris.nodes2.yaml b/examples/oreo/polaris-scaling/oreo.polaris.nodes2.yaml index 434a054..9814326 100644 --- a/examples/oreo/polaris-scaling/oreo.polaris.nodes2.yaml +++ b/examples/oreo/polaris-scaling/oreo.polaris.nodes2.yaml @@ -65,4 +65,3 @@ compute_settings: queue: demand # The amount of time to request for your job walltime: "1:00:00" - diff --git a/examples/oreo/polaris-scaling/oreo.polaris.nodes256.yaml b/examples/oreo/polaris-scaling/oreo.polaris.nodes256.yaml index 4085f6e..8d509f4 100644 --- a/examples/oreo/polaris-scaling/oreo.polaris.nodes256.yaml +++ b/examples/oreo/polaris-scaling/oreo.polaris.nodes256.yaml @@ -65,4 +65,3 @@ compute_settings: queue: run_next # The amount of time to request for your job walltime: "1:00:00" - diff --git a/examples/oreo/polaris-scaling/oreo.polaris.nodes32.yaml b/examples/oreo/polaris-scaling/oreo.polaris.nodes32.yaml index 5c3d561..fec1347 100644 --- a/examples/oreo/polaris-scaling/oreo.polaris.nodes32.yaml +++ b/examples/oreo/polaris-scaling/oreo.polaris.nodes32.yaml @@ -65,4 +65,3 @@ compute_settings: queue: demand # The amount of time to request for your job walltime: "1:00:00" - diff --git a/examples/oreo/polaris-scaling/oreo.polaris.nodes450.yaml b/examples/oreo/polaris-scaling/oreo.polaris.nodes450.yaml index 080b198..21b3896 100644 --- a/examples/oreo/polaris-scaling/oreo.polaris.nodes450.yaml +++ b/examples/oreo/polaris-scaling/oreo.polaris.nodes450.yaml @@ -65,4 +65,3 @@ compute_settings: queue: run_next # The amount of time to request for your job walltime: "1:00:00" - diff --git a/examples/oreo/polaris-scaling/oreo.polaris.nodes64.yaml b/examples/oreo/polaris-scaling/oreo.polaris.nodes64.yaml index 0b747f0..cac7d9d 100644 --- a/examples/oreo/polaris-scaling/oreo.polaris.nodes64.yaml +++ b/examples/oreo/polaris-scaling/oreo.polaris.nodes64.yaml @@ -65,4 +65,3 @@ compute_settings: queue: run_next # The amount of time to request for your job walltime: "1:00:00" - diff --git a/pdfwf/__init__.py b/pdfwf/__init__.py index 34ead82..6899403 100644 --- a/pdfwf/__init__.py +++ b/pdfwf/__init__.py @@ -1,4 +1,5 @@ """pdfwf package.""" + from __future__ import annotations import importlib.metadata as importlib_metadata diff --git a/pdfwf/balance.py b/pdfwf/balance.py index b294748..66e0253 100644 --- a/pdfwf/balance.py +++ b/pdfwf/balance.py @@ -1,4 +1,5 @@ """Balance output jsonl files from a workflow run.""" + from __future__ import annotations import functools diff --git a/pdfwf/cli.py b/pdfwf/cli.py index f08e871..aa63f50 100644 --- a/pdfwf/cli.py +++ b/pdfwf/cli.py @@ -1,4 +1,5 @@ """CLI for the PDF workflow package.""" + from __future__ import annotations from pathlib import Path @@ -434,6 +435,74 @@ def parse_timers( pd.DataFrame(time_stats).to_csv(csv_path, index=False) +@app.command() +def zip_pdfs( + root_dir: Path = typer.Option( # noqa: B008 + ..., + '--input_dir', + '-i', + help='Path to the root directory containing pdfs.', + ), + output_dir: Path = typer.Option( # noqa: B008 + ..., + '--output_dir', + '-o', + help='Path to the output directory.', + ), + chunk_size: int = typer.Option( + 10, + '--chunk_size', + '-c', + help='Number of PDF files per chunk.', + ), + glob_pattern: str = typer.Option( + '**/*.pdf', + '--glob_pattern', + '-g', + help='Glob pattern to search the root directory for.', + ), + num_cpus: int = typer.Option( + 1, '--num_cpus', '-n', help="Number of cpu's to use for zipping." + ), +) -> None: + """Zip PDF files in chunks.""" + import json + from concurrent.futures import ProcessPoolExecutor + + from pdfwf.utils import batch_data + from pdfwf.utils import zip_worker + + # Make output directory if it does not already exist + output_dir.mkdir(exist_ok=True, parents=True) + + # Get all PDF files in the directory + pdf_files = list(root_dir.glob(glob_pattern)) + total_files = len(pdf_files) + print(f'Found {total_files} PDF files.') + + # Get batched data + batched_data = batch_data(pdf_files, chunk_size=chunk_size) + + # Get output files + output_files = [ + output_dir / f'chunk_{i}.zip' for i in range(len(batched_data)) + ] + + # Setup manifest and save + manifest = { + str(output_path.resolve()): [str(f.resolve()) for f in batch] + for output_path, batch in zip(output_files, batched_data) + } + # Save a log that saves which zip file contains which pdf's + with open(output_dir / 'manifest.json', 'w') as f: + json.dump(manifest, f) + + with ProcessPoolExecutor(max_workers=num_cpus) as pool: + pool.map(zip_worker, batched_data, output_files) + + print(f'Zipped files to {output_dir}') + + def main() -> None: """Entry point for CLI.""" app() diff --git a/pdfwf/convert.py b/pdfwf/convert.py index 670da1a..d312f23 100644 --- a/pdfwf/convert.py +++ b/pdfwf/convert.py @@ -1,4 +1,5 @@ """PDF conversion workflow.""" + from __future__ import annotations import functools @@ -69,6 +70,9 @@ def parse_pdfs( with open(output_dir / f'{parser.unique_id}.jsonl', 'a+') as f: f.write(lines) + # Sometimes parsl won't flush the stdout, so this is necessary for logs + print('', end='', flush=True) + def parse_zip( zip_file: str, @@ -133,8 +137,49 @@ def parse_zip( return None finally: - # Stop the timer to log the worker time - timer.stop() + # Stop the timer to log the worker time and flush the buffer + timer.stop(flush=True) + + +def parse_checkpoint(checkpoint_path: str) -> set[str]: + """Parse which input paths have been completed from a pdfwf output dir. + + NOTE: This function currently only is possible if the input is parsed with + zip files. The raw pdf parsing logging does not log each individual pdf + file parsed. If we need this functionality we need to explicitly log each + parsed pdf instead of grepping the timing logs. + + Parameters + ---------- + checkpoint_path : str + Path to root pdfwf directory. Should contain a `parsl` directory + + Returns + ------- + set[str] + A set of paths that have already been parsed in previous runs + """ + # Grab time logger for parsing functionality + from pdfwf.timer import TimeLogger + + # get all *.stdout files + stdout_files = Path(checkpoint_path).glob('**/*.stdout') + + # Find out which files have been successfully parsed by the workflow in + # previous runs + parsed_files = set() + for stdout_file in stdout_files: + time_stats = TimeLogger().parse_logs(stdout_file) + for log_elem in time_stats: + tags = log_elem.tags + if 'finished-parsing' in tags: + # This is will add everything after the tag type (first elem) + # to the set. Currently there should only be one element after + # but this will extend to more types of logs if they occur + for elem in tags[1:]: + parsed_files.add(elem) + + return parsed_files class WorkflowConfig(BaseModel): @@ -190,11 +235,26 @@ class WorkflowConfig(BaseModel): # Save the configuration to the output directory config.write_yaml(config.out_dir / 'config.yaml') + # If we have run before, find all previously parsed files + # else we use a empty set to check against + # NOTE: this function assumes the input file paths have not changed from + # run to run. If they have this method will fail and there will be + # duplicated parses. Similarly, if you switch from parsing zips to parsing + # pdfs it will fail. + if (config.out_dir / 'parsl').exists(): + already_parsed_files = parse_checkpoint(str(config.out_dir / 'parsl')) + else: + already_parsed_files = set() + # File extension for the input files file_ext = 'zip' if config.iszip else 'pdf' - # Collect files in batches for more efficient processing - files = [p.as_posix() for p in config.pdf_dir.glob(f'**/*.{file_ext}')] + # Collect files and check if already parsed before + files = [ + p.as_posix() + for p in config.pdf_dir.glob(f'**/*.{file_ext}') + if p.as_posix() not in already_parsed_files + ] # Limit the number of conversions for debugging if len(files) >= config.num_conversions: diff --git a/pdfwf/parsers/__init__.py b/pdfwf/parsers/__init__.py index 0c3a8da..cbd0057 100644 --- a/pdfwf/parsers/__init__.py +++ b/pdfwf/parsers/__init__.py @@ -1,4 +1,5 @@ """The parsers module storing different PDF parsers.""" + from __future__ import annotations from typing import Any diff --git a/pdfwf/parsers/marker.py b/pdfwf/parsers/marker.py index e8442a7..44e8076 100644 --- a/pdfwf/parsers/marker.py +++ b/pdfwf/parsers/marker.py @@ -1,4 +1,5 @@ """The marker PDF parser.""" + from __future__ import annotations from typing import Any diff --git a/pdfwf/parsers/nougat_.py b/pdfwf/parsers/nougat_.py index 6e5e60a..b9de1ac 100644 --- a/pdfwf/parsers/nougat_.py +++ b/pdfwf/parsers/nougat_.py @@ -1,4 +1,5 @@ """The Nougat PDF parser.""" + from __future__ import annotations import re diff --git a/pdfwf/parsers/oreo/oreo.py b/pdfwf/parsers/oreo/oreo.py index 020b03c..5bae3e9 100644 --- a/pdfwf/parsers/oreo/oreo.py +++ b/pdfwf/parsers/oreo/oreo.py @@ -1,4 +1,5 @@ """The Oreo parser for extracting text and visual content from PDFs.""" + from __future__ import annotations import sys diff --git a/pdfwf/parsers/oreo/tensor_utils.py b/pdfwf/parsers/oreo/tensor_utils.py index b51ff14..8b023a7 100644 --- a/pdfwf/parsers/oreo/tensor_utils.py +++ b/pdfwf/parsers/oreo/tensor_utils.py @@ -1,4 +1,5 @@ """Tensor-based utilities for Oreo.""" + from __future__ import annotations import gc diff --git a/pdfwf/parsl.py b/pdfwf/parsl.py index 36aca1f..6f8987a 100644 --- a/pdfwf/parsl.py +++ b/pdfwf/parsl.py @@ -1,4 +1,5 @@ """Utilities to build Parsl configurations.""" + from __future__ import annotations from abc import ABC diff --git a/pdfwf/timer.py b/pdfwf/timer.py index ecc1403..c7b33fe 100644 --- a/pdfwf/timer.py +++ b/pdfwf/timer.py @@ -109,8 +109,14 @@ def start(self) -> Self: self._start_unix = time.time() return self - def stop(self) -> None: - """Stop the timer.""" + def stop(self, flush: bool = False) -> None: + """Stop the timer. + + Parameters + ---------- + flush : bool, optional + Flush the buffer (usually stdout), by default False + """ self._end = time.perf_counter_ns() self._end_unix = time.time() self._running = False @@ -120,7 +126,7 @@ def stop(self) -> None: start_unix=self._start_unix, end_unix=self._end_unix, ) - TimeLogger().log(time_stats) + TimeLogger().log(time_stats, flush=flush) class TimeLogger: @@ -153,10 +159,19 @@ def parse_logs(self, log_path: PathLike) -> list[TimeStats]: return time_stats - def log(self, ts: TimeStats) -> None: - """Log the timer information.""" + def log(self, ts: TimeStats, flush: bool = False) -> None: + """Log the timer information. + + Parameters + ---------- + ts : TimeStats + The time statistics object to be logged + flush : bool, optional + Flush the buffer (usually stdout), by default False + """ print( f'[timer] [{" ".join(map(str, ts.tags))}]' f' in [{ts.elapsed_s:.2f}] seconds.', f' start: [{ts.start_unix:.2f}], end: [{ts.end_unix:.2f}]', + flush=flush, ) diff --git a/pdfwf/utils.py b/pdfwf/utils.py index b3ca34d..2c6fdb7 100644 --- a/pdfwf/utils.py +++ b/pdfwf/utils.py @@ -1,10 +1,12 @@ """Utilities for the PDF workflow.""" + from __future__ import annotations import json import logging import sys import traceback +import zipfile from pathlib import Path from typing import Any from typing import Callable @@ -155,3 +157,21 @@ def batch_data(data: list[T], chunk_size: int) -> list[list[T]]: if len(data) > chunk_size * len(batches): batches.append(data[len(batches) * chunk_size :]) return batches + + +def zip_worker(files: list[Path], output_path: Path) -> Path: + """Worker function to zip together a group of pdfs. + + Parameters + ---------- + files : list[Path] + List of files to zip together + output_path : Path + Output zip file to create + """ + with zipfile.ZipFile(output_path, 'w') as zipf: + for infile in files: + # Add each file to the ZIP + zipf.write(infile, infile.name) + + return output_path diff --git a/pyproject.toml b/pyproject.toml index c774db4..2397c56 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "pdfwf" -version = "0.1.5" +version = "0.1.6" authors = [ {name = "Kyle Hippe", email = "khippe@anl.gov"}, {name = "Alexander Brace", email = "abrace@anl.gov"}, @@ -95,7 +95,7 @@ allow_untyped_defs = true [tool.ruff] # See all rules here: https://beta.ruff.rs/docs/rules -select = [ +lint.select = [ # pyflakes "F", # pycodestyle @@ -130,14 +130,14 @@ select = [ "RUF", ] line-length = 79 -extend-ignore = [] +lint.extend-ignore = [] target-version = "py38" -ignore = ["COM812", "ISC001"] # silence warning +lint.ignore = ["COM812", "ISC001"] # silence warning -[tool.ruff.flake8-pytest-style] +[tool.ruff.lint.flake8-pytest-style] parametrize-values-type = "tuple" -[tool.ruff.flake8-quotes] +[tool.ruff.lint.flake8-quotes] inline-quotes = "single" multiline-quotes = "double" # silence warning @@ -145,17 +145,17 @@ multiline-quotes = "double" # silence warning indent-style = "space" quote-style = "single" -[tool.ruff.isort] +[tool.ruff.lint.isort] force-single-line = true known-first-party = ["pdfwf", "test", "testing"] order-by-type = false required-imports = ["from __future__ import annotations"] -[tool.ruff.per-file-ignores] +[tool.ruff.lint.per-file-ignores] "*/__init__.py" = ["F401"] "*/*_test.py" = ["D10"] -[tool.ruff.pydocstyle] +[tool.ruff.lint.pydocstyle] convention = "numpy" [tool.setuptools.packages.find] diff --git a/testing/__init__.py b/testing/__init__.py index 7a15201..70addeb 100644 --- a/testing/__init__.py +++ b/testing/__init__.py @@ -1,2 +1,3 @@ """Utilities for unit tests.""" + from __future__ import annotations diff --git a/testing/data.py b/testing/data.py index a8d7b66..7319cef 100644 --- a/testing/data.py +++ b/testing/data.py @@ -1,4 +1,5 @@ """Example data for testing.""" + from __future__ import annotations DATA: list[tuple[list[int], int]] = [([1, 2, 3, 4], 10)] diff --git a/tests/__init__.py b/tests/__init__.py index 2f2a50c..09e529f 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -1,2 +1,3 @@ """pdfwf unit tests.""" + from __future__ import annotations