From 9b5155d0a343863e8a3227b74b3b84d6579f6ce0 Mon Sep 17 00:00:00 2001 From: Shane A Date: Mon, 22 Jan 2024 11:45:18 -0800 Subject: [PATCH 1/2] Track S3 upload progress by file --- scripts/storage_cleaner.py | 22 +++++++++------------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/scripts/storage_cleaner.py b/scripts/storage_cleaner.py index 22a412d93..b63d1932d 100644 --- a/scripts/storage_cleaner.py +++ b/scripts/storage_cleaner.py @@ -8,7 +8,6 @@ from argparse import ArgumentParser, _SubParsersAction from dataclasses import dataclass from enum import Enum, auto -from functools import partial from pathlib import Path from typing import Any, Dict, List, Optional, Tuple from urllib.parse import urlparse @@ -22,7 +21,7 @@ from cached_path.schemes import S3Client from google.api_core.exceptions import NotFound from omegaconf import OmegaConf as om -from rich.progress import Progress, TaskID, track +from rich.progress import track from olmo import util from olmo.aliases import PathOrStr @@ -579,30 +578,27 @@ def download_folder(self, directory_path: str, local_dest_folder: PathOrStr): else: raise ValueError(f"Path {directory_path} is not a valid directory") + def _upload_file(self, local_filepath: str, bucket_name: str, key: str): + self._s3_client.upload_file(local_filepath, bucket_name, key) + def upload(self, local_src: PathOrStr, dest_path: str): if self.local_fs_adapter.is_file(str(local_src)): bucket_name, key = self._get_bucket_name_and_key(dest_path) - self._s3_client.upload_file(str(local_src), bucket_name, key) + self._upload_file(str(local_src), bucket_name, key) elif self.local_fs_adapter.is_dir(str(local_src)): local_src = Path(local_src) - def upload_callback(progress: Progress, upload_task: TaskID, bytes_uploaded: int): - progress.update(upload_task, advance=bytes_uploaded) - - for file_local_path in local_src.rglob("*"): + local_file_paths = list(local_src.rglob("*")) + for file_local_path in track(local_file_paths, description=f"Uploading to {dest_path}"): if file_local_path.is_dir(): continue file_dest_path = str(file_local_path).replace(str(local_src).rstrip("/"), dest_path.rstrip("/")) bucket_name, key = self._get_bucket_name_and_key(file_dest_path) - with Progress(transient=True) as progress: - size_in_bytes = file_local_path.stat().st_size - upload_task = progress.add_task(f"Uploading {key}", total=size_in_bytes) - callback = partial(upload_callback, progress, upload_task) - - self._s3_client.upload_file(str(file_local_path), bucket_name, key, Callback=callback) + if not self._is_file(bucket_name, key): + self._upload_file(str(file_local_path), bucket_name, key) else: raise ValueError(f"Local source {local_src} does not correspond to a valid file or directory") From 5c7d9c69e497e1a7908f4bac58b1cdf42cd31762 Mon Sep 17 00:00:00 2001 From: Shane A Date: Mon, 22 Jan 2024 11:47:04 -0800 Subject: [PATCH 2/2] Reduce number of concurrent S3 uploads to reduce throtlting --- scripts/storage_cleaner.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/scripts/storage_cleaner.py b/scripts/storage_cleaner.py index b63d1932d..ca6e61a05 100644 --- a/scripts/storage_cleaner.py +++ b/scripts/storage_cleaner.py @@ -17,6 +17,7 @@ import google.cloud.storage as gcs import torch import wandb +from boto3.s3.transfer import TransferConfig from cached_path import add_scheme_client, cached_path, set_cache_dir from cached_path.schemes import S3Client from google.api_core.exceptions import NotFound @@ -579,7 +580,8 @@ def download_folder(self, directory_path: str, local_dest_folder: PathOrStr): raise ValueError(f"Path {directory_path} is not a valid directory") def _upload_file(self, local_filepath: str, bucket_name: str, key: str): - self._s3_client.upload_file(local_filepath, bucket_name, key) + transfer_config = TransferConfig(max_concurrency=4) + self._s3_client.upload_file(local_filepath, bucket_name, key, Config=transfer_config) def upload(self, local_src: PathOrStr, dest_path: str): if self.local_fs_adapter.is_file(str(local_src)):