Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring #21

Open
wants to merge 50 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
7335bef
Pyproject creation
Jun 11, 2024
385700d
Pyproject creation
Jun 11, 2024
0b6fe1e
Pyproject creation
Jun 11, 2024
3c760fe
Human readable config
Jun 14, 2024
1e5b711
Package like structure
Jun 15, 2024
c7e812f
Package like structure
Jun 17, 2024
6a70e03
Package like structure
Jun 17, 2024
975a61e
Package like structure
Jun 18, 2024
c8d5023
Small features
Andrey170170 Jun 18, 2024
0212e82
Small features
Andrey170170 Jun 21, 2024
0f64ff8
Filtering scripts
Andrey170170 Jun 24, 2024
a932fe5
Filtering scripts
Andrey170170 Jun 24, 2024
a447bef
Added image verification scripts
Andrey170170 Jun 26, 2024
3633d22
Filtration and image verification is completed
Andrey170170 Jun 28, 2024
31e9020
Data transfer script
Andrey170170 Jun 29, 2024
a951460
Data transfer script
Andrey170170 Jun 29, 2024
6f57351
Data transfer script
Andrey170170 Jun 29, 2024
ddc3525
Data transfer script
Andrey170170 Jul 4, 2024
48aad06
Schedule creation fix
Jul 5, 2024
0e344f7
Downloader consistency fix
Jul 6, 2024
fb24372
Done condition check for main function
Jul 6, 2024
03f64c0
Data merging workflow
Andrey170170 Jul 12, 2024
eab2053
Merge remote-tracking branch 'origin/filtering' into filtering
Andrey170170 Jul 12, 2024
35ae6bb
Data merging workflow
Andrey170170 Jul 15, 2024
1781524
Filter refactoring
Andrey170170 Jul 18, 2024
b34525c
Filter refactoring
Andrey170170 Jul 19, 2024
900ef3d
Filter refactoring
Andrey170170 Jul 20, 2024
5daaf7b
Merge branch 'refs/heads/main' into filtering
Andrey170170 Jul 20, 2024
8f8a38f
Filter refactoring
Andrey170170 Jul 22, 2024
3164852
Filter refactoring
Andrey170170 Jul 23, 2024
7453ec6
Refactoring
Jul 23, 2024
2140150
Refactoring
Jul 23, 2024
28fb563
Small fixes after refactoring
Andrey170170 Jul 25, 2024
9c7b54e
Small fixes after refactoring, more Config logic
Andrey170170 Jul 27, 2024
17f1d4a
Refactoring
Jul 29, 2024
83b5ef5
Refactoring
Jul 30, 2024
f0dbde6
Refactoring
Jul 30, 2024
4dc1742
Documentation
Jul 30, 2024
5798ac5
Merge pull request #22 from Imageomics/documentation
Andrey170170 Jul 31, 2024
c1589ba
Fixed API exposed by package
Jul 31, 2024
abf224d
Add trailing newlines where missing
thompsonmj Jul 31, 2024
cf7641c
Dependency management
Aug 5, 2024
f34fc54
Dependency management
Aug 5, 2024
c419409
Dependency management
Aug 5, 2024
89cfad0
Fixed tools import bug
Andrey170170 Aug 6, 2024
0079c5c
Readme fix
Aug 6, 2024
9fffb3e
Readme fix
Aug 6, 2024
46d0ee8
Merge remote-tracking branch 'origin/refactoring' into refactoring
Andrey170170 Aug 8, 2024
55b22b6
Fixed tools
Andrey170170 Aug 8, 2024
7bd68ad
Update README.md
Andrey170170 Aug 9, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -161,3 +161,7 @@ cython_debug/

# Mac System
.DS_Store

app-*/
spark-*
/column_name_change/
148 changes: 108 additions & 40 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,57 +1,125 @@
# Distributed Downloader

MPI-based distributed downloading tool for retrieving data from diverse domains.

## Background

This MPI-based distributed downloader was initially designed for the purpose of downloading all images from the monthly [GBIF occurrence snapshot](https://www.gbif.org/occurrence-snapshots). The overall setup is general enough that it could be transformed into a functional tool beyond just our use; it should work on any list of URLs. We chose to build this tool instead of using something like [img2dataset](https://github.com/rom1504/img2dataset) to better avoid overloading source servers (GBIF documents approximately 200M images across 545 servers) and have more control over the final dataset construction and metadata management (e.g., using `HDF5` as discussed in [issue #1](https://github.com/Imageomics/distributed-downloader/issues/1)).


## How to Use

`distributed-downloader` utilizes multiple nodes on a High Performance Computing (HPC) system (specifically, an HPC with `slurm` workload manager) to download a collection of images specified in a given tab-delimited text file. There are three manual steps to get the downloader running as designed; the first two function as a preprocessing step (to be done once with the initial file), and the third initiates the download (this step may be run multiple times for pre-established periods and each will pick up where the last left off).
This MPI-based distributed downloader was initially designed for the purpose of downloading all images from the
monthly [GBIF occurrence snapshot](https://www.gbif.org/occurrence-snapshots). The overall setup is general enough that
it could be transformed into a functional tool beyond just our use; it should work on any list of URLs. We chose to
build this tool instead of using something like [img2dataset](https://github.com/rom1504/img2dataset) to better avoid
overloading source servers (GBIF documents approximately 200M images across 545 servers) and have more control over the
final dataset construction and metadata management (e.g., using `HDF5` as discussed
in [issue #1](https://github.com/Imageomics/distributed-downloader/issues/1)).

1. The first step is to run the file through `src/server_prep.py`. This includes partitioning the dataset by server to generate batches of 10K URLs per server. Servers are determined by the URL in the input file. Additionally, it adds a UUID to each entry in the file to ensure preservation of provenance throughout the download and processing and beyond. This processing is still GBIF occurrence snapshot-specific in that it includes filters on the input file by media type (`StillImage`), checks for missing `gbifID`, and checks that the record indeed contains an image (through the `format` column).

2. After the partitioning and filtering, `MPI_download_prep.py` must be run to establish the rate limits (by server) for the download. An "average" rate limit is established and then scaled based on the number of batches/simultaneous downloads per server (to avoid overloading a server while running simultaneous downloads). After the download is initialized, manual adjustments can be made based on results. Additionally, if a server returns any retry error (`429, 500, 501, 502, 503, 504`), the request rate for that server is reduced.

3. Finally, `submitter.py` is run with the path to the `.env` file for various download settings and paths. This can be run for set periods of time and will restart where it has left off on the next run. Timing for batches is set in the `slurm` scripts passed through the `.env`.
## Installation Instructions

If you want the downloader to ignore some of the servers, you can add them to the `ignored_servers.csv` file. Then you need to rerun the `MPI_download_prep.py` script to update the schedules for the changes to take effect.
### Conda installation

### Running on other systems
1. Install [Miniconda](https://docs.conda.io/en/latest/miniconda.html)
2. Create a new conda environment:
```commandline
conda env create -f environment.yaml --solver=libmamba -y
```

The parameters for step 3 can all be set in the configuration file. This includes information about your HPC account and paths to various files, as well as distribution of work and download settings; be sure to fill in your information.
The configuration file (`config/hpc.env`) should be in this location relative to the root of the directory from which these files are being run.
### Pip installation

Note that the current default is to download images such that the longest side is 720 pixels. The original and resized sizes are recorded in the metadata; the aspect ratio is preserved when resizing images.
1. Install Python 3.10 or higher
2. Install MPI, any MPI should work, tested with OpenMPI and IntelMPI. Installation instructions can be found on
official websites:
- [OpenMPI](https://docs.open-mpi.org/en/v5.0.x/installing-open-mpi/quickstart.html)
- [IntelMPI](https://www.intel.com/content/www/us/en/docs/mpi-library/developer-guide-linux/2021-6/installation.html)
3. Install required package:
- For general use:
```commandline
pip install git+https://github.com/Imageomics/distributed-downloader
```
- For development:
```commandline
pip install -e .[dev]
```

The provided `slurm` scripts for running steps 1 and 2 (`scripts/server_downloading_prep.slurm` and `scripts/server_profiling.slurm`) must have the account info changed at the top of their files (`#SBATCH --account=<your account here>`). These are each only run once at the start of the project
## How to Use

`distributed-downloader` utilizes multiple nodes on a High Performance Computing (HPC) system (specifically, an HPC
with `slurm` workload manager) to download a collection of images specified in a given tab-delimited text file.

## Note on files
### Main script

`resize_mpi` (`py` and `slurm`) and `resizer_scheduler.py` are scripts intended to resize the images after download. For instance, in the case that the initial download size is set higher than intended, these can be used to adjust the size within the given structure and repackage it. They have not been generalized to fit in with the remaining package infrastructure and are simply extra tools that we used; they may be generalized in the future.
There are one manual step to get the downloader running as designed:
You need to call function `download_images` from package `distributed_downloader` with the `config_path` as an argument.
This will initialize filestructure in the output folder, partition the input file, profile the servers for their
possible download speed, and start downloading images. If downloading didn't finish, you can call the same function with
the same `config_path` argument to continue downloading.

Downloader has two logging profiles:
- "INFO" - logs only the most important information, for example when a batch is started and finished. It also logs out any error that occurred during download, image decoding, or writing batch to the filesystem
- "DEBUG" - logs all information, for example logging start and finish of each downloaded image.

## Installation Instructions
1. Install Python 3.10 or higher
2. Install MPI, any MPI should work, tested with OpenMPI and IntelMPI.
3. Install Parallel HDF5, tested with version 1.12.2
4. Install/Update pip, setuptools, and wheel
```
pip install -U wheel setuptools pip Cython
```
5. Install h5py:
```
export CC=/path/to/mpicc
export HDF5_MPI="ON"
export HDF5_DIR=/path/to/hdf5
pip install --no-cache-dir --no-binary=h5py h5py
```
6. Install required packages:
```
pip install -r requirements.txt
```
- `INFO` - logs only the most important information, for example when a batch is started and finished. It also logs out
any error that occurred during download, image decoding, or writing batch to the filesystem
- `DEBUG` - logs all information, for example logging start and finish of each downloaded image.

### Tools script

After downloading is finished, you can use the `tools` package perform various operations on them.
To do this, you need to call the function `apply_tools` from package `distributed_downloader` with the `config_path`
and `tool_name` as an argument.
Following tools are available:

- `resize` - resizes images to a new size
- `image_verification` - verifies images by checking if they are corrupted
- `duplication_based` - removes duplicate images
- `size_based` - removes images that are too small

You can also add your own tool, the instructions are in the section below.

### Creating a new tool

You can also add your own tool by creating 3 classes and registering them with respective decorators.

- Each tool's output will be saved in separate folder in `{config.output_structure.tools_folder}/{tool_name}`
- There are 3 steps in the tool pipeline: `filter`, `scheduler` and `runner`.
- `filter` - filters the images that should be processed by the tool and creates csv files with them
- `scheduler` - creates a schedule for processing the images for MPI
- `runner` - processes the images using MPI
- Each step should be implemented in a separate class.
- Tool name should be the same across all classes.
- Each tool should inherit from `ToolsBase` class.
- Each tool should have a `run` method that will be called by the main script.
- Each tool should be registered with a decorator from a respective package (`FilterRegister` from `filters` etc.)

## Rules for scripts:

All scripts can expect to have the following custom environment variables, specific variables are only initialized
when respective tool is called:

- General parameters
- `CONFIG_PATH`
- `ACCOUNT`
- `PATH_TO_INPUT`
- `PATH_TO_OUTPUT`
- `OUTPUT_URLS_FOLDER`
- `OUTPUT_LOGS_FOLDER`
- `OUTPUT_IMAGES_FOLDER`
- `OUTPUT_SCHEDULES_FOLDER`
- `OUTPUT_PROFILES_TABLE`
- `OUTPUT_IGNORED_TABLE`
- `OUTPUT_INNER_CHECKPOINT_FILE`
- `OUTPUT_TOOLS_FOLDER`
- Specific for downloader
- `DOWNLOADER_NUM_DOWNLOADS`
- `DOWNLOADER_MAX_NODES`
- `DOWNLOADER_WORKERS_PER_NODE`
- `DOWNLOADER_CPU_PER_WORKER`
- `DOWNLOADER_HEADER`
- `DOWNLOADER_IMAGE_SIZE`
- `DOWNLOADER_LOGGER_LEVEL`
- `DOWNLOADER_BATCH_SIZE`
- `DOWNLOADER_RATE_MULTIPLIER`
- `DOWNLOADER_DEFAULT_RATE_LIMIT`
- Specific for tools
- `TOOLS_NUM_WORKERS`
- `TOOLS_MAX_NODES`
- `TOOLS_WORKERS_PER_NODE`
- `TOOLS_CPU_PER_WORKER`
- `TOOLS_THRESHOLD_SIZE`
- `TOOLS_NEW_RESIZE_SIZE`
64 changes: 64 additions & 0 deletions config/example_config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
account: "account_name" # Account name for the cluster
path_to_input: "path_to_input_file" # Path to the input file with the list of servers
path_to_output_folder: "path_to_output_folder" # Path to the output folder

scripts:
# Wrapper scripts to submit jobs to the cluster
general_submitter: "path_to_general_submitter_script.sh"
tools_submitter: "path_to_tools_submitter_script.sh"
mpi_submitter: "path_to_mpi_submitter_script.sh"
schedule_creator_submitter: "path_to_schedule_creator_submitter_script.sh"
# Cluster job's scripts
initialization_script: "path_to_initialization_script.slurm"
profiling_script: "path_to_profiling_script.slurm"
schedule_creation_script: "path_to_schedule_creation_script.slurm"
verify_script: "path_to_verify_script.slurm"
download_script: "path_to_download_script.slurm"
# tools scripts
tools_filter_script: "path_to_tools_filter_script.slurm"
tools_scheduling_script: "path_to_tools_scheduling_script.slurm"
tools_worker_script: "path_to_tools_worker_script.slurm"
tools_verification_script: "path_to_tools_verification_script.slurm"

# Rules for the schedule creation
# They determine how many simultaneous downloader instances can be run on the same server
# Rules are based on the number of batches required to be downloaded from the server
# Rule is: key - number of batches, value - number of instances; if server has more than key batches, value instances can be run
# Server with 0 batches is considered to be downloaded and are ignored
# Default value is 1
# Order of the rules does not matter
schedule_rules:
1: 1

# Structure of the output folder that will be created automatically
output_structure:
urls_folder: "servers_batched" # Folder where the servers will be split into batches
logs_folder: "logs" # Folder for the logs
images_folder: "downloaded_images" # Folder for the downloaded images
schedules_folder: "schedules" # Folder for the schedules
profiles_table: "servers_profiles.csv" # Table with the servers profiles
ignored_table: "ignored_servers.csv" # Table with the servers that were ignored, you can find an example in examples/ignored_servers.csv
inner_checkpoint_file: "inner_checkpoint.yaml" # Inner checkpoint file
tools_folder: "tools" # Folder for the tools

# Parameters for the downloader
suppress_unchanged_error: False # Suppress the error if two consecutive downloads do not change the number of batches completed
downloader_parameters:
num_downloads: 0 # Number of downloads to be performed
max_nodes: 0 # Maximum number of nodes to be used
workers_per_node: 0 # Number of workers per node
cpu_per_worker: 0 # Number of CPUs per worker
header: "" # Header for the requests
image_size: 0 # Size of the image to be downloaded
logger_level: "INFO" # Logger level
batch_size: 0 # Batch size, default is 10000
rate_multiplier: 1 # Rate multiplier for the rate limit
default_rate_limit: 10 # Default rate limit for the profiler

tools_parameters:
num_workers: 0
max_nodes: 0
workers_per_node: 0
cpu_per_worker: 0
threshold_size: 224 # Threshold size for the images, images with size less than this value will filtered out
new_resize_size: 720 # New size for the images in resize tool
37 changes: 0 additions & 37 deletions config/hpc.env

This file was deleted.

53 changes: 53 additions & 0 deletions environment.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
name: distributed-downloader
channels:
- conda-forge
- defaults
dependencies:
- openmpi
- python
- uv
- opencv
- pyspark
- attrs
- brotli
- certifi
- charset-normalizer
- cramjam
- cython
- exceptiongroup
- fsspec
- hatchling
- idna
- inflate64
- iniconfig
- mpi4py
- multivolumefile
- numpy
- packaging
- pandas
- pathspec
- pillow
- pip
- pluggy
- psutil
- py4j
- pyarrow
- pybcj
- pycryptodomex
- pyppmd
- pytest
- python-dateutil
- python-dotenv
- pytz
- pyyaml
- pyzstd
- requests
- setuptools
- six
- texttable
- tomli
- trove-classifiers
- typing-extensions
- tzdata
- urllib3
- wheel
3 changes: 3 additions & 0 deletions examples/ignored_servers.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
ServerName
server_name_1
server_name_2
48 changes: 48 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
[build-system]
requires = ["hatchling", "hatch-requirements-txt"]
build-backend = "hatchling.build"

[tool.hatch.build.targets.wheel]
packages = ["src/distributed_downloader"]

[project]
name = "distributed_downloader"
dynamic = ["dependencies", "version"]
authors = [
{ name = "Andrey Kopanev", email = "[email protected]" },
{ name = "Elizabeth G. Campolongo", email = "[email protected]" },
{ name = "Matthew J. Thompson", email = "[email protected]" },
]
description = "A tool for downloading files from a list of URLs in parallel."
readme = "README.md"
requires-python = ">=3.8"
classifiers = [
"Programming Language :: Python :: 3",
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
]

[tool.hatch.metadata.hooks.requirements_txt]
files = ["requirements.txt"]

[project.optional-dependencies]
dev = ["pytest"]

keywords = [
"parallel",
"distributed",
"download",
"url",
]

[project.urls]
Homepage = "https://github.com/Imageomics/distributed-downloader"
Repository = "https://github.com/Imageomics/distributed-downloader.git"
"Bug Tracker" = "https://github.com/Imageomics/distributed-downloader/issues"

[project.scripts]
distributed_downloader = "distributed_downloader.main:main"
distributed_downloader_tools = "distributed_downloader.tools:main"

[tool.hatch.version]
path = "src/distributed_downloader/core/__about__.py"
Loading