Skip to content

Commit

Permalink
Merge pull request #22 from ramanathanlab/develop
Browse files Browse the repository at this point in the history
Adding checkpointing for .zip file parsing + utility for zipping input
  • Loading branch information
KPHippe authored May 3, 2024
2 parents 19bf1bf + 63532a9 commit 76fabf3
Show file tree
Hide file tree
Showing 25 changed files with 195 additions and 25 deletions.
1 change: 1 addition & 0 deletions examples/convert_markdown_to_jsonl.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Converts a directory of markdown files to JSONL files."""

from __future__ import annotations

import json
Expand Down
1 change: 1 addition & 0 deletions examples/convert_parquet_to_jsonl.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Converts a directory of markdown files to JSONL files."""

from __future__ import annotations

import json
Expand Down
1 change: 0 additions & 1 deletion examples/oreo/polaris-scaling/ian-imports2.oreo.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,3 @@ compute_settings:
queue: R1800955
# The amount of time to request for your job
walltime: "24:00:00"

1 change: 0 additions & 1 deletion examples/oreo/polaris-scaling/oreo.polaris.nodes128.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,3 @@ compute_settings:
queue: run_next
# The amount of time to request for your job
walltime: "1:00:00"

1 change: 0 additions & 1 deletion examples/oreo/polaris-scaling/oreo.polaris.nodes2.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,3 @@ compute_settings:
queue: demand
# The amount of time to request for your job
walltime: "1:00:00"

1 change: 0 additions & 1 deletion examples/oreo/polaris-scaling/oreo.polaris.nodes256.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,3 @@ compute_settings:
queue: run_next
# The amount of time to request for your job
walltime: "1:00:00"

1 change: 0 additions & 1 deletion examples/oreo/polaris-scaling/oreo.polaris.nodes32.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,3 @@ compute_settings:
queue: demand
# The amount of time to request for your job
walltime: "1:00:00"

1 change: 0 additions & 1 deletion examples/oreo/polaris-scaling/oreo.polaris.nodes450.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,3 @@ compute_settings:
queue: run_next
# The amount of time to request for your job
walltime: "1:00:00"

1 change: 0 additions & 1 deletion examples/oreo/polaris-scaling/oreo.polaris.nodes64.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,3 @@ compute_settings:
queue: run_next
# The amount of time to request for your job
walltime: "1:00:00"

1 change: 1 addition & 0 deletions pdfwf/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""pdfwf package."""

from __future__ import annotations

import importlib.metadata as importlib_metadata
Expand Down
1 change: 1 addition & 0 deletions pdfwf/balance.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Balance output jsonl files from a workflow run."""

from __future__ import annotations

import functools
Expand Down
69 changes: 69 additions & 0 deletions pdfwf/cli.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""CLI for the PDF workflow package."""

from __future__ import annotations

from pathlib import Path
Expand Down Expand Up @@ -434,6 +435,74 @@ def parse_timers(
pd.DataFrame(time_stats).to_csv(csv_path, index=False)


@app.command()
def zip_pdfs(
root_dir: Path = typer.Option( # noqa: B008
...,
'--input_dir',
'-i',
help='Path to the root directory containing pdfs.',
),
output_dir: Path = typer.Option( # noqa: B008
...,
'--output_dir',
'-o',
help='Path to the output directory.',
),
chunk_size: int = typer.Option(
10,
'--chunk_size',
'-c',
help='Number of PDF files per chunk.',
),
glob_pattern: str = typer.Option(
'**/*.pdf',
'--glob_pattern',
'-g',
help='Glob pattern to search the root directory for.',
),
num_cpus: int = typer.Option(
1, '--num_cpus', '-n', help="Number of cpu's to use for zipping."
),
) -> None:
"""Zip PDF files in chunks."""
import json
from concurrent.futures import ProcessPoolExecutor

from pdfwf.utils import batch_data
from pdfwf.utils import zip_worker

# Make output directory if it does not already exist
output_dir.mkdir(exist_ok=True, parents=True)

# Get all PDF files in the directory
pdf_files = list(root_dir.glob(glob_pattern))
total_files = len(pdf_files)
print(f'Found {total_files} PDF files.')

# Get batched data
batched_data = batch_data(pdf_files, chunk_size=chunk_size)

# Get output files
output_files = [
output_dir / f'chunk_{i}.zip' for i in range(len(batched_data))
]

# Setup manifest and save
manifest = {
str(output_path.resolve()): [str(f.resolve()) for f in batch]
for output_path, batch in zip(output_files, batched_data)
}
# Save a log that saves which zip file contains which pdf's
with open(output_dir / 'manifest.json', 'w') as f:
json.dump(manifest, f)

with ProcessPoolExecutor(max_workers=num_cpus) as pool:
pool.map(zip_worker, batched_data, output_files)

print(f'Zipped files to {output_dir}')


def main() -> None:
"""Entry point for CLI."""
app()
Expand Down
68 changes: 64 additions & 4 deletions pdfwf/convert.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""PDF conversion workflow."""

from __future__ import annotations

import functools
Expand Down Expand Up @@ -69,6 +70,9 @@ def parse_pdfs(
with open(output_dir / f'{parser.unique_id}.jsonl', 'a+') as f:
f.write(lines)

# Sometimes parsl won't flush the stdout, so this is necessary for logs
print('', end='', flush=True)


def parse_zip(
zip_file: str,
Expand Down Expand Up @@ -133,8 +137,49 @@ def parse_zip(
return None

finally:
# Stop the timer to log the worker time
timer.stop()
# Stop the timer to log the worker time and flush the buffer
timer.stop(flush=True)


def parse_checkpoint(checkpoint_path: str) -> set[str]:
"""Parse which input paths have been completed from a pdfwf output dir.
NOTE: This function currently only is possible if the input is parsed with
zip files. The raw pdf parsing logging does not log each individual pdf
file parsed. If we need this functionality we need to explicitly log each
parsed pdf instead of grepping the timing logs.
Parameters
----------
checkpoint_path : str
Path to root pdfwf directory. Should contain a `parsl` directory
Returns
-------
set[str]
A set of paths that have already been parsed in previous runs
"""
# Grab time logger for parsing functionality
from pdfwf.timer import TimeLogger

# get all *.stdout files
stdout_files = Path(checkpoint_path).glob('**/*.stdout')

# Find out which files have been successfully parsed by the workflow in
# previous runs
parsed_files = set()
for stdout_file in stdout_files:
time_stats = TimeLogger().parse_logs(stdout_file)
for log_elem in time_stats:
tags = log_elem.tags
if 'finished-parsing' in tags:
# This is will add everything after the tag type (first elem)
# to the set. Currently there should only be one element after
# but this will extend to more types of logs if they occur
for elem in tags[1:]:
parsed_files.add(elem)

return parsed_files


class WorkflowConfig(BaseModel):
Expand Down Expand Up @@ -190,11 +235,26 @@ class WorkflowConfig(BaseModel):
# Save the configuration to the output directory
config.write_yaml(config.out_dir / 'config.yaml')

# If we have run before, find all previously parsed files
# else we use a empty set to check against
# NOTE: this function assumes the input file paths have not changed from
# run to run. If they have this method will fail and there will be
# duplicated parses. Similarly, if you switch from parsing zips to parsing
# pdfs it will fail.
if (config.out_dir / 'parsl').exists():
already_parsed_files = parse_checkpoint(str(config.out_dir / 'parsl'))
else:
already_parsed_files = set()

# File extension for the input files
file_ext = 'zip' if config.iszip else 'pdf'

# Collect files in batches for more efficient processing
files = [p.as_posix() for p in config.pdf_dir.glob(f'**/*.{file_ext}')]
# Collect files and check if already parsed before
files = [
p.as_posix()
for p in config.pdf_dir.glob(f'**/*.{file_ext}')
if p.as_posix() not in already_parsed_files
]

# Limit the number of conversions for debugging
if len(files) >= config.num_conversions:
Expand Down
1 change: 1 addition & 0 deletions pdfwf/parsers/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""The parsers module storing different PDF parsers."""

from __future__ import annotations

from typing import Any
Expand Down
1 change: 1 addition & 0 deletions pdfwf/parsers/marker.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""The marker PDF parser."""

from __future__ import annotations

from typing import Any
Expand Down
1 change: 1 addition & 0 deletions pdfwf/parsers/nougat_.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""The Nougat PDF parser."""

from __future__ import annotations

import re
Expand Down
1 change: 1 addition & 0 deletions pdfwf/parsers/oreo/oreo.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""The Oreo parser for extracting text and visual content from PDFs."""

from __future__ import annotations

import sys
Expand Down
1 change: 1 addition & 0 deletions pdfwf/parsers/oreo/tensor_utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Tensor-based utilities for Oreo."""

from __future__ import annotations

import gc
Expand Down
1 change: 1 addition & 0 deletions pdfwf/parsl.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Utilities to build Parsl configurations."""

from __future__ import annotations

from abc import ABC
Expand Down
25 changes: 20 additions & 5 deletions pdfwf/timer.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,14 @@ def start(self) -> Self:
self._start_unix = time.time()
return self

def stop(self) -> None:
"""Stop the timer."""
def stop(self, flush: bool = False) -> None:
"""Stop the timer.
Parameters
----------
flush : bool, optional
Flush the buffer (usually stdout), by default False
"""
self._end = time.perf_counter_ns()
self._end_unix = time.time()
self._running = False
Expand All @@ -120,7 +126,7 @@ def stop(self) -> None:
start_unix=self._start_unix,
end_unix=self._end_unix,
)
TimeLogger().log(time_stats)
TimeLogger().log(time_stats, flush=flush)


class TimeLogger:
Expand Down Expand Up @@ -153,10 +159,19 @@ def parse_logs(self, log_path: PathLike) -> list[TimeStats]:

return time_stats

def log(self, ts: TimeStats) -> None:
"""Log the timer information."""
def log(self, ts: TimeStats, flush: bool = False) -> None:
"""Log the timer information.
Parameters
----------
ts : TimeStats
The time statistics object to be logged
flush : bool, optional
Flush the buffer (usually stdout), by default False
"""
print(
f'[timer] [{" ".join(map(str, ts.tags))}]'
f' in [{ts.elapsed_s:.2f}] seconds.',
f' start: [{ts.start_unix:.2f}], end: [{ts.end_unix:.2f}]',
flush=flush,
)
20 changes: 20 additions & 0 deletions pdfwf/utils.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
"""Utilities for the PDF workflow."""

from __future__ import annotations

import json
import logging
import sys
import traceback
import zipfile
from pathlib import Path
from typing import Any
from typing import Callable
Expand Down Expand Up @@ -155,3 +157,21 @@ def batch_data(data: list[T], chunk_size: int) -> list[list[T]]:
if len(data) > chunk_size * len(batches):
batches.append(data[len(batches) * chunk_size :])
return batches


def zip_worker(files: list[Path], output_path: Path) -> Path:
"""Worker function to zip together a group of pdfs.
Parameters
----------
files : list[Path]
List of files to zip together
output_path : Path
Output zip file to create
"""
with zipfile.ZipFile(output_path, 'w') as zipf:
for infile in files:
# Add each file to the ZIP
zipf.write(infile, infile.name)

return output_path
Loading

0 comments on commit 76fabf3

Please sign in to comment.