From ebd94242e203db3b713dd1eb44b35f1f8a3f0e3a Mon Sep 17 00:00:00 2001 From: Ankit Gola Date: Tue, 22 Oct 2024 14:16:51 +0300 Subject: [PATCH 1/2] Add checkpointing tests --- tests/conftest.py | 6 + tests/test_pytorch/strategies/test_fsdp.py | 140 ++++++++++++- tests/test_pytorch/test_checkpointing.py | 224 +++++++++++++++++++++ tests/test_pytorch/test_compile.py | 69 +++++++ tests/test_pytorch/test_profiler.py | 6 - 5 files changed, 437 insertions(+), 8 deletions(-) create mode 100644 tests/test_pytorch/test_checkpointing.py diff --git a/tests/conftest.py b/tests/conftest.py index de09b43d..a358cde3 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -41,3 +41,9 @@ def device_count(pytestconfig): return 1 assert arg_hpus <= HPUAccelerator.auto_device_count(), "More hpu devices asked than present" return arg_hpus + + +@pytest.fixture() +def _check_distributed(device_count): + if device_count <= 1: + pytest.skip("Distributed test does not run on single HPU") diff --git a/tests/test_pytorch/strategies/test_fsdp.py b/tests/test_pytorch/strategies/test_fsdp.py index 232780b4..f33515cb 100644 --- a/tests/test_pytorch/strategies/test_fsdp.py +++ b/tests/test_pytorch/strategies/test_fsdp.py @@ -33,6 +33,7 @@ from pytorch_lightning import Trainer, seed_everything from pytorch_lightning.demos.boring_classes import BoringDataModule, BoringModel +import habana_frameworks.torch.hpu as hthpu from lightning_habana.pytorch.accelerator import HPUAccelerator from lightning_habana.pytorch.plugins.fsdp_precision import HPUFSDPPrecision, HPUPrecisionPlugin from lightning_habana.pytorch.strategies import HPUDDPStrategy, HPUFSDPStrategy @@ -264,8 +265,8 @@ def test_fsdp_simple_model_activation_cp_mixed_precision(strategy, arg_hpus): @pytest.mark.xfail(run=False, reason="To be fixed.Failure post 1.17 upgrade.") -@pytest.mark.skipif(HPUAccelerator.auto_device_count() <= 1, reason="Test requires multiple HPU devices.") @pytest.mark.standalone() +@pytest.mark.usefixtures("_check_distributed") def test_fsdp_strategy_simple_model_compile(tmpdir, arg_hpus): """Test to ensure that sync_batchnorm works when using FSDP and HPU.""" if arg_hpus <= 1: @@ -664,7 +665,7 @@ def test_fsdp_strategy_load_optimizer_states(tmpdir, wrap_min_params, arg_hpus): trainer.strategy.barrier() -def test_dummy_fsdp_string_init(tmpdir): +def test_fsdp_dummy_string_init(tmpdir): """Test that TorchMetrics get moved to the device despite not having any parameters.""" class DummyFSDPStrategy(HPUFSDPStrategy): @@ -806,3 +807,138 @@ def test_hpu_fsdp_strategy_device_not_hpu(tmpdir): ) with pytest.raises(AssertionError, match="HPUFSDPStrategy requires HPUAccelerator"): trainer.fit(BoringModel()) + + +@pytest.mark.standalone() +@pytest.mark.parametrize( + ("ckpt", "expected_memory"), + [ + (True, 5679.0), + (False, 5674.25), + ], +) +def test_hpu_fsdp_activation_checkpointing_memory_usage(tmpdir, ckpt, expected_memory): + """Test memory usage difference with and without checkpointing.""" + + class TestMemoryModel(TestFSDPModel): + def _init_model(self) -> None: + self.layer = torch.nn.Sequential( + torch.nn.Linear(32, 32), + torch.nn.Linear(32, 32), + torch.nn.Linear(32, 2), + ) + # Number of activations for Linear: out_features * batch_size(32) + # https://discuss.pytorch.org/t/number-of-activations-for-linear-and-conv2d-layer-comparison/48528/2 + # Memory without checkpointing: (32 + 32 + 2) * 32 * 4 = 8.25KB + # Memory with checkpointing: (32 + 2) * 32 * 4 = 4.25KB + # Memory savings: 8.25-4.25 = 4KB (~ 5697KB - 5674.25KB) + # Note that these are estimated numbers, device may have other memory allocations. + self.peak_memory = 0 + self.current_step = 0 + + def on_train_batch_start(self, batch, batch_idx): + if self.current_step == 1: + hthpu.reset_peak_memory_stats() + + def on_train_batch_end(self, outputs, batch, batch_idx): + self.current_step += 1 + if self.current_step <= 1: + return + self.peak_memory = hthpu.max_memory_allocated() / 1024 + + seed_everything(42) + model = TestMemoryModel() + dm = BoringDataModule() + trainer = Trainer( + default_root_dir=tmpdir, + accelerator=HPUAccelerator(), + devices=1, + strategy=HPUFSDPStrategy( + parallel_devices=[torch.device("hpu")], + auto_wrap_policy={nn.Linear} if ckpt else None, + activation_checkpointing_policy={nn.Linear} if ckpt else None, + ), + max_steps=2, + ) + trainer.fit(model, dm) + assert torch.allclose(torch.tensor(model.peak_memory), torch.tensor(expected_memory), atol=1, rtol=1) + + +def test_hpu_fsdp_gradient_computation(tmpdir): + """Test that gradients are computed correctly with checkpointing.""" + grads = {} + for ckpt in [True, False]: + seed_everything(42) + model = TestFSDPModel() + dm = BoringDataModule() + trainer = Trainer( + default_root_dir=tmpdir, + accelerator=HPUAccelerator(), + devices=1, + strategy=HPUFSDPStrategy( + parallel_devices=[torch.device("hpu")], + auto_wrap_policy={nn.Linear} if ckpt else None, + activation_checkpointing_policy={nn.Linear} if ckpt else None, + ), + max_steps=1, + ) + trainer.fit(model, dm) + _grads = {} + for name, param in model.named_parameters(): + name = name.replace("._fsdp_wrapped_module._checkpoint_wrapped_module", "") + if param.grad is not None: + _grads[name] = param.grad.mean().item() + grads[f"{ckpt=}"] = _grads + assert grads["ckpt=True"].keys() == grads["ckpt=False"].keys() + for key in grads["ckpt=True"]: + assert grads["ckpt=True"][key] == grads["ckpt=False"][key] + + +@pytest.mark.standalone() +@pytest.mark.usefixtures("_check_distributed") +def test_hpu_fsdp_dist_checkpoint_save(tmpdir): + model = TestFSDPModel() + + trainer = Trainer( + default_root_dir=tmpdir, + accelerator=HPUAccelerator(), + devices=2, + strategy=HPUFSDPStrategy(parallel_devices=[torch.device("hpu")] * 2, state_dict_type="sharded"), + max_steps=1, + ) + trainer.fit(model) + + if trainer.global_rank == 0: + checkpoint_dir = os.path.join(tmpdir, "lightning_logs", "version_0", "checkpoints", "epoch=0-step=1.ckpt") + for rank in range(2): + assert os.path.isfile(os.path.join(checkpoint_dir, f"__{rank}_0.distcp")) + assert os.path.getsize(os.path.join(checkpoint_dir, f"__{rank}_0.distcp")) > 0 + trainer.strategy.barrier() + + +@pytest.mark.standalone() +@pytest.mark.usefixtures("_check_distributed") +def test_hpu_fsdp_dist_checkpoint_load(tmpdir): + model = TestFSDPModel() + + # Save ckpts + trainer = Trainer( + default_root_dir=tmpdir, + accelerator=HPUAccelerator(), + devices=2, + strategy=HPUFSDPStrategy(parallel_devices=[torch.device("hpu")] * 2, state_dict_type="sharded"), + max_steps=1, + ) + trainer.fit(model) + + # load and resume training from ckpt + trainer = Trainer( + default_root_dir=tmpdir, + accelerator=HPUAccelerator(), + devices=2, + strategy=HPUFSDPStrategy(parallel_devices=[torch.device("hpu")] * 2, state_dict_type="sharded"), + max_steps=1, + ) + trainer.fit( + model, ckpt_path=os.path.join(tmpdir, "lightning_logs", "version_0", "checkpoints", "epoch=0-step=1.ckpt") + ) diff --git a/tests/test_pytorch/test_checkpointing.py b/tests/test_pytorch/test_checkpointing.py new file mode 100644 index 00000000..6b024797 --- /dev/null +++ b/tests/test_pytorch/test_checkpointing.py @@ -0,0 +1,224 @@ +# Copyright The Lightning AI team. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import os + +import pytest +import torch +from lightning_utilities import module_available + +if module_available("lightning"): + from lightning.pytorch import Callback, Trainer + from lightning.pytorch.accelerators.cpu import CPUAccelerator + from lightning.pytorch.callbacks import ModelCheckpoint + from lightning.pytorch.demos.boring_classes import BoringModel + from lightning.pytorch.strategies.single_device import SingleDeviceStrategy +elif module_available("pytorch_lightning"): + from pytorch_lightning import Callback, Trainer + from pytorch_lightning.accelerators.cpu import CPUAccelerator + from pytorch_lightning.callbacks import ModelCheckpoint + from pytorch_lightning.demos.boring_classes import BoringModel + from pytorch_lightning.strategies.single_device import SingleDeviceStrategy + +from lightning_habana.pytorch.accelerator import HPUAccelerator +from lightning_habana.pytorch.strategies import HPUDDPStrategy, SingleHPUStrategy + + +@pytest.mark.parametrize( + "checkpointing", + [True, False], +) +def test_hpu_checkpointing_trainer_init(tmpdir, checkpointing): + trainer = Trainer( + default_root_dir=tmpdir, + accelerator=HPUAccelerator(), + devices=1, + enable_checkpointing=checkpointing, + ) + if checkpointing: + assert isinstance(trainer.checkpoint_callback, ModelCheckpoint) + else: + assert trainer.checkpoint_callback is None + + +@pytest.mark.parametrize( + ("strategy", "devices"), + [ + (SingleHPUStrategy, 1), + pytest.param(HPUDDPStrategy, 2, marks=pytest.mark.standalone_only()), + ], +) +def test_hpu_checkpoint_save(tmpdir, strategy, devices): + """Tests checkpoint files are created.""" + model = BoringModel() + trainer = Trainer( + default_root_dir=tmpdir, + accelerator=HPUAccelerator(), + strategy=strategy(), + devices=devices, + max_steps=1, + ) + trainer.fit(model) + assert model.device.type == "cpu" + + ckpt_file = os.path.join(tmpdir, "lightning_logs", "version_0", "checkpoints", "epoch=0-step=1.ckpt") + assert os.path.isfile(ckpt_file) + assert os.path.getsize(ckpt_file) > 0 + + +def test_hpu_checkpointing_disabled(tmpdir): + """Tests checkpoint files are created.""" + model = BoringModel() + trainer = Trainer( + default_root_dir=tmpdir, + accelerator=HPUAccelerator(), + strategy=SingleHPUStrategy(), + devices=1, + max_steps=1, + enable_checkpointing=False, + ) + trainer.fit(model) + + ckpt_file = os.path.join(tmpdir, "lightning_logs", "version_0", "checkpoints", "epoch=0-step=1.ckpt") + assert not os.path.exists(ckpt_file) + + +def test_hpu_checkpointing_manual_save(tmpdir): + """Tests checkpoint files are created.""" + model = BoringModel() + trainer = Trainer( + default_root_dir=tmpdir, + accelerator=HPUAccelerator(), + strategy=SingleHPUStrategy(), + devices=1, + max_steps=1, + enable_checkpointing=False, + ) + trainer.fit(model) + + ckpt_file = os.path.join(tmpdir, "lightning_logs", "version_0", "checkpoints", "epoch=0-step=1.ckpt") + assert not os.path.exists(ckpt_file) # ckpt file not created due to `enable_checkpoining=False` + + trainer.save_checkpoint(filepath=ckpt_file) # manual save + assert os.path.isfile(ckpt_file) + assert os.path.getsize(ckpt_file) > 0 + + +def test_hpu_modelcheckpoint(tmpdir): + """Tests checkpoint created by ModelCheckpoint callback.""" + model = BoringModel() + trainer = Trainer( + default_root_dir=tmpdir, + accelerator=HPUAccelerator(), + strategy=SingleHPUStrategy(), + devices=1, + max_steps=1, + callbacks=ModelCheckpoint(dirpath=tmpdir, filename="callback-{epoch}-{step}"), + ) + trainer.fit(model) + + ckpt_file = os.path.join(tmpdir, "callback-epoch=0-step=1.ckpt") + assert os.path.isfile(ckpt_file) + assert os.path.getsize(ckpt_file) > 0 + + +def test_hpu_modelcheckpoint_save_resume(tmpdir): + """Tests checkpoint created by ModelCheckpoint callback.""" + + class TestCheckpointCallback(Callback): + def on_train_step_end(self, trainer, pl_module, outputs): + """Check for the checkpoint file after every step.""" + ckpt_file = (os.path.join(tmpdir, f"callback-epoch=0-step={trainer.global_step}.ckpt"),) + assert os.path.isfile(ckpt_file) + assert os.path.getsize(ckpt_file) > 0 + + model = BoringModel() + + trainer = Trainer( + default_root_dir=tmpdir, + accelerator=HPUAccelerator(), + strategy=SingleHPUStrategy(), + devices=1, + max_steps=2, + callbacks=[ + ModelCheckpoint(dirpath=tmpdir, filename="callback-{epoch}-{step}", every_n_train_steps=1), + TestCheckpointCallback(), + ], + ) + trainer.fit(model) + + +@pytest.mark.skip("Test fails in lazy mode.") +def test_hpu_model_weights_after_saving_and_loading_checkpoint(tmpdir): + """Tests model weights are same after saving and loading checkpoint file.""" + model = BoringModel() + trainer = Trainer( + default_root_dir=tmpdir, + accelerator=HPUAccelerator(), + devices=1, + strategy=SingleHPUStrategy(), + max_steps=1, + ) + trainer.fit(model) + + ckpt_file = os.path.join(tmpdir, "lightning_logs", "version_0", "checkpoints", "epoch=0-step=1.ckpt") + loaded_model = BoringModel.load_from_checkpoint(ckpt_file) + + for param_original, param_loaded in zip(model.parameters(), loaded_model.parameters()): + assert torch.equal(param_original, param_loaded), "Model weights do not match after loading!" + + +@pytest.mark.skip("Test fails in lazy mode.") +@pytest.mark.parametrize( + ("accelerator", "strategy", "devices"), + [ + (HPUAccelerator, SingleHPUStrategy, 1), + (CPUAccelerator, SingleDeviceStrategy, 1), + pytest.param( + HPUAccelerator, + HPUDDPStrategy, + 2, + marks=[ + pytest.mark.standalone_only(), + pytest.mark.skip("Test may fail in multi tenent scenario"), + ], + ), + ], +) +def test_hpu_resume_training_from_checkpoint(tmpdir, accelerator, strategy, devices): + """Tests checkpoint save, load and resume training.""" + model = BoringModel() + + # save checkpoint + trainer = Trainer( + default_root_dir=tmpdir, + accelerator=HPUAccelerator(), + strategy=SingleHPUStrategy(), + devices=1, + max_steps=1, + ) + trainer.fit(model) + + # load checkpoint and resume training + ckpt_file = os.path.join(tmpdir, "lightning_logs", "version_0", "checkpoints", "epoch=0-step=1.ckpt") + model = BoringModel.load_from_checkpoint(ckpt_file) + trainer = Trainer( + default_root_dir=tmpdir, + accelerator=accelerator(), + strategy=strategy(), + devices=devices, + max_steps=1, + ) + trainer.fit(model) diff --git a/tests/test_pytorch/test_compile.py b/tests/test_pytorch/test_compile.py index 00bad179..34964507 100644 --- a/tests/test_pytorch/test_compile.py +++ b/tests/test_pytorch/test_compile.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import os import pytest import torch @@ -21,11 +22,13 @@ if module_available("lightning"): from lightning.pytorch import LightningModule, Trainer + from lightning.pytorch.callbacks import ModelCheckpoint from lightning.pytorch.demos.boring_classes import BoringModel from lightning.pytorch.demos.mnist_datamodule import MNISTDataModule from lightning.pytorch.utilities.compile import from_compiled, to_uncompiled elif module_available("pytorch_lightning"): from pytorch_lightning import LightningModule, Trainer + from pytorch_lightning.callbacks import ModelCheckpoint from pytorch_lightning.demos.boring_classes import BoringModel from pytorch_lightning.demos.mnist_datamodule import MNISTDataModule @@ -294,3 +297,69 @@ def test_hpu_compile_precision_plugin(tmpdir, precision, trainer_fn, params): ) fn = getattr(trainer, trainer_fn) fn(compiled_model) + + +@pytest.mark.usefixtures("_is_compile_allowed") +def test_hpu_compile_checkpoint_save(tmpdir): + """Tests checkpoint files are created.""" + model = torch.compile(BoringModel(), backend="hpu_backend") + trainer = Trainer( + default_root_dir=tmpdir, + accelerator=HPUAccelerator(), + strategy=SingleHPUStrategy(), + devices=1, + max_steps=1, + ) + trainer.fit(model) + assert model.device.type == "cpu" + + ckpt_file = os.path.join(tmpdir, "lightning_logs", "version_0", "checkpoints", "epoch=0-step=1.ckpt") + assert os.path.isfile(ckpt_file) + assert os.path.getsize(ckpt_file) > 0 + + +@pytest.mark.usefixtures("_is_compile_allowed") +def test_hpu_compile_resume_training_from_checkpoint(tmpdir): + """Tests checkpoint save, load and resume training.""" + model = torch.compile(BoringModel(), backend="hpu_backend") + + # save checkpoint + trainer = Trainer( + default_root_dir=tmpdir, + accelerator=HPUAccelerator(), + strategy=SingleHPUStrategy(), + devices=1, + max_steps=1, + ) + trainer.fit(model) + + # load checkpoint and resume training + ckpt_file = os.path.join(tmpdir, "lightning_logs", "version_0", "checkpoints", "epoch=0-step=1.ckpt") + model = torch.compile(BoringModel.load_from_checkpoint(ckpt_file), backend="hpu_backend") + trainer = Trainer( + default_root_dir=tmpdir, + accelerator=HPUAccelerator(), + strategy=SingleHPUStrategy(), + devices=1, + max_steps=1, + ) + trainer.fit(model) + + +@pytest.mark.usefixtures("_is_compile_allowed") +def test_hpu_compile_modelcheckpoint(tmpdir): + """Tests checkpoint created by ModelCheckpoint callback.""" + model = torch.compile(BoringModel(), backend="hpu_backend") + trainer = Trainer( + default_root_dir=tmpdir, + accelerator=HPUAccelerator(), + strategy=SingleHPUStrategy(), + devices=1, + max_steps=1, + callbacks=ModelCheckpoint(dirpath=tmpdir, filename="callback-{epoch}-{step}"), + ) + trainer.fit(model) + + ckpt_file = os.path.join(tmpdir, "callback-epoch=0-step=1.ckpt") + assert os.path.isfile(ckpt_file) + assert os.path.getsize(ckpt_file) > 0 diff --git a/tests/test_pytorch/test_profiler.py b/tests/test_pytorch/test_profiler.py index c4ee8eae..f35ed486 100644 --- a/tests/test_pytorch/test_profiler.py +++ b/tests/test_pytorch/test_profiler.py @@ -48,12 +48,6 @@ from lightning_habana.pytorch.profiler.profiler import HPUProfiler -@pytest.fixture() -def _check_distributed(device_count): - if device_count <= 1: - pytest.skip("Distributed test does not run on single HPU") - - @pytest.mark.parametrize( ("profiler_str", "profiler_class", "expectation"), [ From 6a4eaaa73493329713cc507529d4185333b862bf Mon Sep 17 00:00:00 2001 From: Ankit Gola Date: Mon, 4 Nov 2024 16:00:39 +0200 Subject: [PATCH 2/2] Enable checkpoint tests --- .azure/hpu-tests.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.azure/hpu-tests.yml b/.azure/hpu-tests.yml index 6b153644..53647bce 100644 --- a/.azure/hpu-tests.yml +++ b/.azure/hpu-tests.yml @@ -115,6 +115,7 @@ jobs: tests/test_pytorch/test_datamodule.py \ tests/test_pytorch/test_profiler.py \ tests/test_pytorch/test_precision.py \ + tests/test_pytorch/test_checkpointing.py \ tests/test_pytorch/strategies/test_hpu_parallel.py \ tests/test_pytorch/strategies/test_hpu_ddp.py \ --hpus 1 -W ignore::FutureWarning -m "not standalone_only" \ @@ -155,6 +156,7 @@ jobs: bash tests/run_standalone_tests.sh --hpus 1 -m standalone_only -f \ tests/test_pytorch/strategies/test_hpu_parallel.py \ tests/test_pytorch/test_precision.py \ + tests/test_pytorch/test_checkpointing.py \ tests/test_pytorch/test_dynamic_shapes.py displayName: Standalone-only single card tests