Skip to content

Commit

Permalink
refactor(benchmarks): improve jobs and runs saves, add write to Dynam…
Browse files Browse the repository at this point in the history
…oDB (#280)

Create a "constants.py" file. Improve Hydra's callback behaviour and
save run results to DynamoDB (optionally). Improve shell scripts used to
run the benchmarks. Add a script to prepare an EC2 instance for
benchmarking.
  • Loading branch information
matthieu-d4r authored Dec 11, 2024
1 parent 1109d79 commit a7667c1
Show file tree
Hide file tree
Showing 15 changed files with 318 additions and 136 deletions.
4 changes: 4 additions & 0 deletions s3torchbenchmarking/conf/dcp.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@ path: ???
epochs: 4

hydra:
job:
name: dcp
mode: MULTIRUN
sweep:
dir: multirun/dcp/${now:%Y-%m-%d_%H-%M-%S}
sweeper:
params:
+model: vit-base, T0_3B
Expand Down
4 changes: 4 additions & 0 deletions s3torchbenchmarking/conf/lightning_checkpointing.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@ epochs: 5
save_one_in: 1

hydra:
job:
name: lightning_checkpointing
mode: MULTIRUN
sweep:
dir: multirun/lightning/${now:%Y-%m-%d_%H-%M-%S}
sweeper:
params:
+model: vit-base, whisper, clip-vit, T0_3B, T0pp
Expand Down
8 changes: 1 addition & 7 deletions s3torchbenchmarking/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ dependencies = [
"boto3",
"prefixed",
"click",
"omegaconf",
"accelerate",
"pandas",
"requests",
]

[project.optional-dependencies]
Expand All @@ -38,9 +38,3 @@ test = [
[project.scripts]
s3torch-benchmark = "s3torchbenchmarking.benchmark:run_experiment"
s3torch-datagen = "s3torchbenchmarking.datagen:synthesize_dataset"
s3torch-benchmark-lightning = "s3torchbenchmarking.lightning_checkpointing.benchmark:run_benchmark"
s3torch-benchmark-dcp = "s3torchbenchmarking.dcp.benchmark:run_benchmark"

[tool.setuptools.packages]
# Pure Python packages/modules and configuration files
find = { where = ["src"] }
40 changes: 40 additions & 0 deletions s3torchbenchmarking/src/s3torchbenchmarking/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# // SPDX-License-Identifier: BSD

from typing import TypedDict, Union, Any, List

JOB_RESULTS_FILENAME = "job_results.json"
RUN_FILENAME = "run.json"

# URLs for EC2 metadata retrieval (IMDSv2)
URL_IMDS_TOKEN = "http://169.254.169.254/latest/api/token"
URL_IMDS_DOCUMENT = "http://169.254.169.254/latest/dynamic/instance-identity/document"


class Versions(TypedDict):
python: str
pytorch: str
hydra: str
s3torchconnector: str


class EC2Metadata(TypedDict):
architecture: str
image_id: str
instance_type: str
region: str


class Run(TypedDict):
"""Information about a Hydra run.
Also, a :class:`Run` object will be inserted as-is in DynamoDB."""

run_id: str # PK (Partition Key)
timestamp_utc: float # SK (Sort Key)
scenario: str
versions: Versions
ec2_metadata: Union[EC2Metadata, None]
run_elapsed_time_s: float
number_of_jobs: int
all_job_results: List[Any]
13 changes: 10 additions & 3 deletions s3torchbenchmarking/src/s3torchbenchmarking/dcp/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@
from torch.nn import Module
from torch.nn.parallel import DistributedDataParallel

from s3torchbenchmarking.benchmark_utils import (
build_random_suffix,
build_checkpoint_uri,
)
from s3torchbenchmarking.job_results import save_job_results
from s3torchbenchmarking.models import get_benchmark_model
from s3torchconnector.dcp import S3StorageWriter
from ..benchmark_utils import build_random_suffix, build_checkpoint_uri
from ..job_results import save_job_results
from ..models import get_benchmark_model

Timestamps = Tuple[float, float]
logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -129,3 +132,7 @@ def run(
save_timestamps.put((begin_process, end_save - (begin_save - begin_process)))

dist.destroy_process_group()


if __name__ == "__main__":
run_benchmark()
175 changes: 104 additions & 71 deletions s3torchbenchmarking/src/s3torchbenchmarking/hydra_callback.py
Original file line number Diff line number Diff line change
@@ -1,106 +1,139 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# // SPDX-License-Identifier: BSD

import json
import logging
import re
import subprocess
import sys
from functools import lru_cache
import uuid
from datetime import datetime, timezone
from decimal import Decimal
from pathlib import Path
from time import perf_counter
from typing import Any, List, TypedDict, Union, Optional
from typing import Any, Union, Optional

import boto3
import requests
import torch
from botocore.exceptions import ClientError
from hydra.experimental.callback import Callback
from omegaconf import DictConfig

_COLLATED_RESULTS_FILENAME = "collated_results.json"
import s3torchconnector
from s3torchbenchmarking.constants import (
JOB_RESULTS_FILENAME,
RUN_FILENAME,
Run,
EC2Metadata,
URL_IMDS_TOKEN,
URL_IMDS_DOCUMENT,
)

logger = logging.getLogger(__name__)


class EC2Metadata(TypedDict):
instance_type: str
placement: str


class Metadata(TypedDict):
python_version: str
pytorch_version: str
hydra_version: str
ec2_metadata: Union[EC2Metadata, None]
run_elapsed_time_s: float
number_of_jobs: int


class CollatedResults(TypedDict):
metadata: Metadata
results: List[Any]
class ResultCollatingCallback(Callback):
"""Hydra callback (https://hydra.cc/docs/experimental/callbacks/).
Defines some routines to execute when a benchmark run is finished: namely, to merge all job results
("job_results.json" files) in one place ("run.json" file), augmented with some metadata.
"""

class ResultCollatingCallback(Callback):
def __init__(self) -> None:
self._multirun_dir: Optional[Path] = None
self._multirun_path: Optional[Path] = None
self._begin = 0
self._end = 0

def on_multirun_start(self, config: DictConfig, **kwargs: Any) -> None:
self._begin = perf_counter()

def on_job_start(self, config: DictConfig, **kwargs: Any) -> None:
# Runtime variables like the output directory are not available in `on_multirun_end` is called, but are
# available in `on_job_start`, so we collect the path here and refer to it later.
if not self._multirun_dir:
# should be something like "./multirun/2024-11-08/15-47-08/"
self._multirun_dir = Path(config.hydra.runtime.output_dir).parent
if not self._multirun_path:
# Hydra variables like `hydra.runtime.output_dir` are not available inside :func:`on_multirun_end`, so we
# get the information here.
self._multirun_path = Path(config.hydra.runtime.output_dir).parent

def on_multirun_end(self, config: DictConfig, **kwargs: Any) -> None:
self._end = perf_counter()
run_elapsed_time = self._end - self._begin
run_elapsed_time = perf_counter() - self._begin
run = self._build_run(config, run_elapsed_time)

self._save_to_disk(run)
if "dynamodb" in config:
self._write_to_dynamodb(config.dynamodb.region, config.dynamodb.table, run)
else:
logger.info("DynamoDB config not provided: skipping write to table...")

def _build_run(self, config: DictConfig, run_elapsed_time: float) -> Run:
all_job_results = []
for entry in self._multirun_path.glob(f"**/{JOB_RESULTS_FILENAME}"):
if entry.is_file():
all_job_results.append(json.loads(entry.read_text()))

logger.info("Collected %i job results", len(all_job_results))
return {
"run_id": str(uuid.uuid4()),
"timestamp_utc": datetime.now(timezone.utc).timestamp(),
"scenario": config.hydra.job.name,
"versions": {
"python": sys.version,
"pytorch": torch.__version__,
"hydra": config.hydra.runtime.version,
"s3torchconnector": s3torchconnector.__version__,
},
"ec2_metadata": _get_ec2_metadata(),
"run_elapsed_time_s": run_elapsed_time,
"number_of_jobs": len(all_job_results),
"all_job_results": all_job_results,
}

collated_results = self._collate_results(config, run_elapsed_time)
collated_results_path = self._multirun_dir / _COLLATED_RESULTS_FILENAME
def _save_to_disk(self, run: Run) -> None:
run_filepath = self._multirun_path / RUN_FILENAME

logger.info("Saving collated results to: %s", collated_results_path)
with open(collated_results_path, "w") as f:
json.dump(collated_results, f, ensure_ascii=False, indent=4)
logger.info("Collated results saved successfully")
logger.info("Saving run to: %s", run_filepath)
with open(run_filepath, "w") as f:
json.dump(run, f, ensure_ascii=False, indent=2)
logger.info("Run saved successfully")

def _collate_results(
self, config: DictConfig, run_elapsed_time: float
) -> CollatedResults:
collated_results = []
for file in self._multirun_dir.glob("*/**/result*.json"):
collated_results.append(json.loads(file.read_text()))
@staticmethod
def _write_to_dynamodb(region: str, table_name: str, run: Run) -> None:
dynamodb = boto3.resource("dynamodb", region_name=region)
table = dynamodb.Table(table_name)

logger.info("Collated %i result files", len(collated_results))
return {
"metadata": {
"python_version": sys.version,
"pytorch_version": torch.__version__,
"hydra_version": config.hydra.runtime.version,
"ec2_metadata": get_ec2_metadata(),
"run_elapsed_time_s": run_elapsed_time,
"number_of_jobs": len(collated_results),
},
"results": collated_results,
}
# `parse_float=Decimal` is required for DynamoDB (the latter does not work with floats), so we perform that
# (strange) conversion through dumping then loading again the :class:`Run` object.
run_json = json.loads(json.dumps(run), parse_float=Decimal)

try:
logger.info("Putting item into table: %s", table_name)
table.put_item(Item=run_json)
logger.info("Put item into table successfully")
except ClientError:
logger.error("Couldn't put item into table %s", table, exc_info=True)

@lru_cache
def get_ec2_metadata() -> Union[EC2Metadata, None]:
"""Get some EC2 metadata by running the `/opt/aws/bin/ec2-metadata` command.

The command's output is a single string of text, in a JSON-like format (_but not quite JSON_): hence, its content
is parsed using regex.
def _get_ec2_metadata() -> Union[EC2Metadata, None]:
"""Get some EC2 metadata.
The function's call is cached, so we don't execute the command multiple times per runs.
See also https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/instancedata-data-retrieval.html#instancedata-inside-access.
"""
result = subprocess.run(
"/opt/aws/bin/ec2-metadata", capture_output=True, text=True, timeout=5
token = requests.put(
URL_IMDS_TOKEN,
headers={"X-aws-ec2-metadata-token-ttl-seconds": "21600"},
timeout=5.0,
)
if token.status_code != 200:
logger.warning("Failed to get EC2 metadata (acquiring token): %s", token)
return None

document = requests.get(
URL_IMDS_DOCUMENT, headers={"X-aws-ec2-metadata-token": token.text}, timeout=5.0
)
if result.returncode == 0:
metadata = result.stdout
instance_type = re.search("instance-type: (.*)", metadata).group(1)
placement = re.search("placement: (.*)", metadata).group(1)
if instance_type and placement:
return {"instance_type": instance_type, "placement": placement}
return None
if document.status_code != 200:
logger.warning("Failed to get EC2 metadata (fetching document): %s", document)
return None

payload = document.json()
return {
"architecture": payload["architecture"],
"image_id": payload["imageId"],
"instance_type": payload["instanceType"],
"region": payload["region"],
}
25 changes: 6 additions & 19 deletions s3torchbenchmarking/src/s3torchbenchmarking/job_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,14 @@
from hydra.core.hydra_config import HydraConfig
from omegaconf import DictConfig, OmegaConf

from .models import BenchmarkModel
from s3torchbenchmarking.constants import JOB_RESULTS_FILENAME
from s3torchbenchmarking.models import BenchmarkModel

logger = logging.getLogger(__name__)


def save_job_results(
cfg: DictConfig,
model: BenchmarkModel,
metrics: Any,
):
"""Save a Hydra job results to a local JSON file."""

def save_job_results(cfg: DictConfig, model: BenchmarkModel, metrics: Any) -> None:
"""Save a single Hydra job results to a JSON file."""
results = {
"model": {
"name": model.name,
Expand All @@ -30,19 +26,10 @@ def save_job_results(
"metrics": metrics,
}

tasks = HydraConfig.get().overrides.task

# extract only sweeper values (i.e., ones starting with '+')
tasks = [task for task in tasks if task.startswith("+")]
# turn ["foo=4", "bar=small", "baz=1"] into "4_small_1"
suffix = "_".join([task.split("=")[-1] for task in tasks]) if tasks else ""

# Save the results in the corresponding Hydra job directory (e.g., multirun/2024-11-08/15-47-08/0/<filename>.json).
results_filename = f"results{'_' + suffix if suffix else ''}.json"
results_dir = HydraConfig.get().runtime.output_dir
results_path = Path(results_dir, results_filename)
results_path = Path(results_dir, JOB_RESULTS_FILENAME)

logger.info("Saving job results to: %s", results_path)
with open(results_path, "w") as f:
json.dump(results, f, ensure_ascii=False, indent=4)
json.dump(results, f, ensure_ascii=False, indent=2)
logger.info("Job results saved successfully")
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,17 @@
from torch.utils.data import DataLoader
from torchdata.datapipes.iter import IterableWrapper # type: ignore

from s3torchconnector.lightning import S3LightningCheckpoint
from .checkpoint_profiler import CheckpointProfiler
from ..benchmark_utils import (
from s3torchbenchmarking.benchmark_utils import (
ResourceMonitor,
build_checkpoint_uri,
build_random_suffix,
)
from ..job_results import save_job_results
from ..models import get_benchmark_model, LightningAdapter
from s3torchbenchmarking.job_results import save_job_results
from s3torchbenchmarking.lightning_checkpointing.checkpoint_profiler import (
CheckpointProfiler,
)
from s3torchbenchmarking.models import get_benchmark_model, LightningAdapter
from s3torchconnector.lightning import S3LightningCheckpoint

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -70,3 +72,7 @@ def run_benchmark(config: DictConfig):
}

save_job_results(config, benchmark_model, metrics)


if __name__ == "__main__":
run_benchmark()
Loading

0 comments on commit a7667c1

Please sign in to comment.