From a28603e38b8a83eca8eaf037542b5a167915eb40 Mon Sep 17 00:00:00 2001 From: Krishna Kumar <3963513+kks32@users.noreply.github.com> Date: Fri, 28 Jun 2024 06:27:30 -0600 Subject: [PATCH] Hydra yaml config (#81) * Initial implementation with hydra * Test CI for config yaml * Testing CI for hydra * Try pip requirements.txt * Try pip instead of conda for docker * Docker build on GitHub * Copy requirements.txt file before installing on Docker container * Copy requirements.txt * Copy requirements.txt * Trying with user flag * Trying Python 3.11 * Test CircleCI with ghcr container image * GitHub Actions workflow to test training GNS * Updated dockerfile with paths and env * Modify workflow to run training * Add at least one epoch to run when nsteps is fewer than 1 epoch steps * Train GNS action * Test without docker pull on CircleCI * Specify path and branches * Fix path to GNS sample output * Worflow runs on Github and remove conda on circleci * No black check * Only try to build container if specific files have changed * Fix resume training and README * Reduce number of steps to 100 for testing * Refactor constants to data * Remove on PR * Add config to tensorboard writer * Fix formatting and issue with cfg.data.path * Black linter --- .circleci/config.yml | 33 +--- .github/workflows/container.yml | 43 +++++ .github/workflows/train.yml | 29 +++ .gitignore | 1 + Dockerfile | 35 +++- README.md | 149 ++++++--------- config.yaml | 52 +++++ gns/args.py | 70 +++++++ gns/render_rollout.py | 8 +- gns/train.py | 329 ++++++++++++++------------------ gns/train_multinode.py | 24 +-- requirements.txt | 3 +- 12 files changed, 450 insertions(+), 326 deletions(-) create mode 100644 .github/workflows/container.yml create mode 100644 .github/workflows/train.yml create mode 100644 config.yaml create mode 100644 gns/args.py diff --git a/.circleci/config.yml b/.circleci/config.yml index fcb2ca6..4210044 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -2,40 +2,17 @@ version: 2.0 jobs: gns: docker: - - image: quay.io/geoelements/gns + - image: ghcr.io/geoelements/gns:config steps: - checkout - # GCC - run: name: Train & Test command: | - TMP_DIR="./gns-sample" - DATASET_NAME="WaterDropSample" - git clone https://github.com/geoelements/gns-sample - mkdir -p ${TMP_DIR}/${DATASET_NAME}/models/ - mkdir -p ${TMP_DIR}/${DATASET_NAME}/rollout/ - DATA_PATH="${TMP_DIR}/${DATASET_NAME}/dataset/" - MODEL_PATH="${TMP_DIR}/${DATASET_NAME}/models/" - ROLLOUT_PATH="${TMP_DIR}/${DATASET_NAME}/rollout/" - conda install -c anaconda absl-py -y - conda install -c conda-forge numpy -y - conda install -c conda-forge dm-tree -y - conda install -c conda-forge matplotlib-base -y - conda install -c conda-forge pyevtk -y - conda install -c conda-forge pytest -y - conda install -c conda-forge tensorboard -y + git clone https://github.com/geoelements/gns-sample ../gns-sample pytest test/ - echo "Test paths: ${DATA_PATH} ${MODEL_PATH}" - ls - python -m gns.train --data_path=${DATA_PATH} --model_path=${MODEL_PATH} --ntraining_steps=10 - echo "Predict rollout" - ls ./gns-sample/WaterDropSample/models/ - - - run: - name: Black check - command: | - conda install -c conda-forge black -y - black --check . + python -m gns.train + ls ../gns-sample/WaterDropSample/models/ + workflows: version: 2 diff --git a/.github/workflows/container.yml b/.github/workflows/container.yml new file mode 100644 index 0000000..061b0e3 --- /dev/null +++ b/.github/workflows/container.yml @@ -0,0 +1,43 @@ +name: Build and Push to GHCR + +on: + push: + paths: + - Dockerfile + - requirements.txt + +env: + REGISTRY: ghcr.io + IMAGE_NAME: ${{ github.repository }} + +jobs: + build-and-push: + runs-on: ubuntu-latest + permissions: + contents: read + packages: write + + steps: + - name: Checkout repository + uses: actions/checkout@v4 + + - name: Log in to the Container registry + uses: docker/login-action@v3 + with: + registry: ${{ env.REGISTRY }} + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + + - name: Extract metadata (tags, labels) for Docker + id: meta + uses: docker/metadata-action@v5 + with: + images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }} + + - name: Build and push Docker image + uses: docker/build-push-action@v5 + with: + context: . + push: true + tags: ${{ steps.meta.outputs.tags }} + labels: ${{ steps.meta.outputs.labels }} \ No newline at end of file diff --git a/.github/workflows/train.yml b/.github/workflows/train.yml new file mode 100644 index 0000000..3011ebf --- /dev/null +++ b/.github/workflows/train.yml @@ -0,0 +1,29 @@ +name: GNS Train and Test + +on: + push: + +jobs: + gns: + runs-on: ubuntu-latest + container: + image: ghcr.io/geoelements/gns:config + + steps: + - name: Checkout repository + uses: actions/checkout@v4 + + - name: Black linter check + run: | + black --check . + + - name: PyTest + run: | + pytest test/ + + - name: Train GNS + run: | + TMP_DIR="../gns-sample" + DATASET_NAME="WaterDropSample" + git clone https://github.com/geoelements/gns-sample ../gns-sample + python -m gns.train \ No newline at end of file diff --git a/.gitignore b/.gitignore index a06ba2d..d4c6b41 100644 --- a/.gitignore +++ b/.gitignore @@ -8,6 +8,7 @@ scratch log **/logs/* +outputs/* # Byte-compiled / optimized / DLL files __pycache__/ diff --git a/Dockerfile b/Dockerfile index 79c5b6f..2de8a2f 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,10 +1,25 @@ -FROM continuumio/anaconda3:latest -RUN conda install pytorch==1.12.1 torchvision==0.13.1 torchaudio==0.12.1 cpuonly -c pytorch -RUN conda install pyg -c pyg -RUN conda install -c anaconda absl-py -RUN conda install -c conda-forge numpy -RUN conda install -c conda-forge dm-tree -RUN conda install -c conda-forge matplotlib-base -RUN conda install -c conda-forge pyevtk -WORKDIR /home/gns -RUN /bin/bash \ No newline at end of file +FROM python:3.11 + +WORKDIR /app + +COPY requirements.txt . + +RUN pip3 install --upgrade pip && \ + pip3 install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cpu && \ + pip3 install torch_geometric && \ + pip3 install pyg_lib torch_scatter torch_sparse torch_cluster torch_spline_conv -f https://data.pyg.org/whl/torch-2.3.0+cpu.html && \ + pip3 install -r requirements.txt + +ENV PYTHONPATH=/app + +# Add Python path to PATH +ENV PATH="/usr/local/bin:${PATH}" + +# Create a bash script to set up the environment +RUN echo '#!/bin/bash\n\ +export PYTHONPATH=/app\n\ +export PATH="/usr/local/bin:$PATH"\n\ +exec "$@"' > /entrypoint.sh && chmod +x /entrypoint.sh + +ENTRYPOINT ["/entrypoint.sh"] +CMD ["/bin/bash"] \ No newline at end of file diff --git a/README.md b/README.md index 3cb5d7a..7027937 100644 --- a/README.md +++ b/README.md @@ -18,7 +18,7 @@ MeshNet is a scalable surrogate simulator for any mesh-based models like Finite > Training GNS/MeshNet on simulation data ```shell # For particulate domain, -python3 -m gns.train --data_path="" --model_path="" --ntraining_steps=100 +python3 -m gns.train mode="train" --config-path ./ --config-name config.yaml # For mesh-based domain, python3 -m meshnet.train --data_path="" --model_path="" --ntraining_steps=100 ``` @@ -29,7 +29,7 @@ To resume training specify `model_file` and `train_state_file`: ```shell # For particulate domain, -python3 -m gns.train --data_path="" --model_path="" --model_file="model.pt" --train_state_file="train_state.pt" --ntraining_steps=100 +python3 -m gns.train mode="train" training.resume=True # For mesh-based domain, python3 -m meshnet.train --data_path="" --model_path="" --model_file="model.pt" --train_state_file="train_state.pt" --ntraining_steps=100 ``` @@ -37,7 +37,7 @@ python3 -m meshnet.train --data_path="" --model_path=" > Rollout prediction ```shell # For particulate domain, -python3 -m gns.train --mode="rollout" --data_path="" --model_path="" --output_path="" --model_file="model.pt" --train_state_file="train_state.pt" +python3 -m gns.train mode="rollout" # For mesh-based domain, python3 -m meshnet.train --mode="rollout" --data_path="" --model_path="" --output_path="" --model_file="model.pt" --train_state_file="train_state.pt" ``` @@ -61,91 +61,64 @@ In mesh-based domain, the renderer writes `.gif` animation. > Meshnet GNS prediction of cylinder flow after training for 1 million steps. -## Command line arguments details +## Configuration file
-`train.py` in GNS (particulate domain) - -**mode (Enum)** - -This flag is used to set the operation mode for the script. It can take one of three values; 'train', 'valid', or 'rollout'. - -**batch_size (Integer)** - -Batch size for training. - -**noise_std (Float)** - -Standard deviation of the noise when training. - -**data_path (String)** - -Specifies the directory path where the dataset is located. -The dataset is expected to be in a specific format (e.g., .npz files). -It should contain `metadata.json`. -If `--mode` is training, the directory should contain `train.npz`. -If `--mode` is testing (rollout), the directory should contain `test.npz`. -If `--mode` is valid, the directory should contain `valid.npz`. - -**model_path (String)** - -The directory path where the trained model checkpoints are saved during training or loaded from during validation/rollout. - -**output_path (String)** - -Defines the directory where the outputs (e.g., rollouts) are saved, -when the `--mode` is set to rollout. -This is particularly relevant in the rollout mode where the predictions of the model are stored. - -**output_filename (String)** - -Base filename to use when saving outputs during rollout. -Default is "rollout", and the output will be saved as `rollout.pkl` in `output_path`. -It is not intended to include the file extension. - -**model_file (String)** - -The filename of the model checkpoint to load for validation or rollout (e.g., model-10000.pt). -It supports a special value "latest" to automatically select the newest checkpoint file. -This flexibility facilitates the evaluation of models at different stages of training. - -**train_state_file (String)** - -Similar to model_file, but for loading the training state (e.g., optimizer state). -It supports a special value "latest" to automatically select the newest checkpoint file. -(e.g., training_state-10000.pt) - -**ntraining_steps (Integer)** - -The total number of training steps to execute before stopping. - -**nsave_steps (Integer)** - -Interval at which the model and training state are saved. - -**lr_init (Float)** - -Initial learning rate. - -**lr_decay (Float)** - -How much the learning rate should decay over time. - -**lr_decay_steps (Integer)** - -Steps at which learning rate should decay. - -**cuda_device_number (Integer)** - -Base CUDA device (zero indexed). -Default is None so default CUDA device will be used. - -**n_gpus (Integer)** - -Number of GPUs to use for training. - -**tensorboard_log_dir (String)** - -Path to log info on training and validation and visualize via tensorboard. +GNS (particulate domain) + +```yaml +defaults: + - _self_ + - override hydra/hydra_logging: disabled + - override hydra/job_logging: disabled + +hydra: + output_subdir: null + run: + dir: . + +# Top-level configuration +mode: train + +# Data configuration +data: + path: ../gns-sample/WaterDropSample/dataset/ + batch_size: 2 + noise_std: 6.7e-4 + input_sequence_length: 6 + num_particle_types: 9 + kinematic_particle_id: 3 + +# Model configuration +model: + path: ../gns-sample/WaterDropSample/models/ + file: null + train_state_file: null + +# Output configuration +output: + path: ../gns-sample/WaterDropSample/rollouts/ + filename: rollout + +# Training configuration +training: + steps: 2000 + validation_interval: null + save_steps: 500 + resume: False + learning_rate: + initial: 1e-4 + decay: 0.1 + decay_steps: 50000 + +# Hardware configuration +hardware: + cuda_device_number: null + n_gpus: 1 + +# Logging configuration +logging: + tensorboard_dir: logs/ +```
@@ -254,7 +227,7 @@ The dataset is shared on [DesignSafe DataDepot](https://doi.org/10.17603/ds2-fzg GNS uses [pytorch geometric](https://www.pyg.org/) and [CUDA](https://developer.nvidia.com/cuda-downloads). These packages have specific requirements, please see [PyG installation]((https://pytorch-geometric.readthedocs.io/en/latest/notes/installation.html) for details. -> CPU-only installation on Linux +> CPU-only installation on Linux (Conda) ```shell conda install -y pytorch torchvision torchaudio cpuonly -c pytorch diff --git a/config.yaml b/config.yaml new file mode 100644 index 0000000..1231c8a --- /dev/null +++ b/config.yaml @@ -0,0 +1,52 @@ +defaults: + - _self_ + - override hydra/hydra_logging: disabled + - override hydra/job_logging: disabled + +hydra: + output_subdir: null + run: + dir: . + +# Top-level configuration +mode: train + +# Data configuration +data: + path: ../gns-sample/WaterDropSample/dataset/ + batch_size: 2 + noise_std: 6.7e-4 + input_sequence_length: 6 + num_particle_types: 9 + kinematic_particle_id: 3 + +# Model configuration +model: + path: ../gns-sample/WaterDropSample/models/ + file: model-1500.pt + train_state_file: train_state-1500.pt + +# Output configuration +output: + path: ../gns-sample/WaterDropSample/rollouts/ + filename: rollout + +# Training configuration +training: + steps: 100 + validation_interval: null + save_steps: 500 + resume: False + learning_rate: + initial: 1e-4 + decay: 0.1 + decay_steps: 50000 + +# Hardware configuration +hardware: + cuda_device_number: null + n_gpus: 1 + +# Logging configuration +logging: + tensorboard_dir: logs/ diff --git a/gns/args.py b/gns/args.py new file mode 100644 index 0000000..b059609 --- /dev/null +++ b/gns/args.py @@ -0,0 +1,70 @@ +from dataclasses import dataclass, field +from typing import Optional +from omegaconf import MISSING +from hydra.core.config_store import ConfigStore + + +@dataclass +class DataConfig: + path: str = MISSING + batch_size: int = 2 + noise_std: float = 6.7e-4 + input_sequence_length: int = 6 + num_particle_types: int = 9 + kinematic_particle_id: int = 3 + + +@dataclass +class ModelConfig: + path: str = "models/" + file: Optional[str] = None + train_state_file: Optional[str] = None + + +@dataclass +class OutputConfig: + path: str = "rollouts/" + filename: str = "rollout" + + +@dataclass +class LearningRateConfig: + initial: float = 1e-4 + decay: float = 0.1 + decay_steps: int = 50000 + + +@dataclass +class TrainingConfig: + steps: int = 2000 + validation_interval: Optional[int] = None + save_steps: int = 500 + resume: Optional[bool] = False + learning_rate: LearningRateConfig = field(default_factory=LearningRateConfig) + + +@dataclass +class HardwareConfig: + cuda_device_number: Optional[int] = None + n_gpus: int = 1 + + +@dataclass +class LoggingConfig: + tensorboard_dir: str = "logs/" + + +@dataclass +class Config: + mode: str = "train" + data: DataConfig = field(default_factory=DataConfig) + model: ModelConfig = field(default_factory=ModelConfig) + output: OutputConfig = field(default_factory=OutputConfig) + training: TrainingConfig = field(default_factory=TrainingConfig) + hardware: HardwareConfig = field(default_factory=HardwareConfig) + logging: LoggingConfig = field(default_factory=LoggingConfig) + + +# Hydra configuration +cs = ConfigStore.instance() +cs.store(name="base_config", node=Config) diff --git a/gns/render_rollout.py b/gns/render_rollout.py index aaa2571..29000a9 100644 --- a/gns/render_rollout.py +++ b/gns/render_rollout.py @@ -260,9 +260,11 @@ def write_vtk(self): f"{path}/points{i}", np.array(coord[:, 0]), np.array(coord[:, 1]), - np.zeros_like(coord[:, 1]) - if self.dims == 2 - else np.array(coord[:, 2]), + ( + np.zeros_like(coord[:, 1]) + if self.dims == 2 + else np.array(coord[:, 2]) + ), data={"displacement": disp}, ) print(f"vtk saved to: {self.output_dir}{self.output_name}...") diff --git a/gns/train.py b/gns/train.py index 0502efc..c722d7b 100644 --- a/gns/train.py +++ b/gns/train.py @@ -12,8 +12,8 @@ from torch.nn.parallel import DistributedDataParallel as DDP from tqdm import tqdm -from absl import flags -from absl import app +import hydra +from omegaconf import DictConfig, OmegaConf sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) from gns import learned_simulator @@ -21,77 +21,14 @@ from gns import reading_utils from gns import data_loader from gns import distribute - -flags.DEFINE_enum( - "mode", - "train", - ["train", "valid", "rollout"], - help="Train model, validation or rollout evaluation.", -) -flags.DEFINE_integer("batch_size", 2, help="The batch size.") -flags.DEFINE_float("noise_std", 6.7e-4, help="The std deviation of the noise.") -flags.DEFINE_string("data_path", None, help="The dataset directory.") -flags.DEFINE_string( - "model_path", "models/", help=("The path for saving checkpoints of the model.") -) -flags.DEFINE_string( - "output_path", "rollouts/", help="The path for saving outputs (e.g. rollouts)." -) -flags.DEFINE_string( - "output_filename", "rollout", help="Base name for saving the rollout" -) -flags.DEFINE_string( - "model_file", - None, - help=( - 'Model filename (.pt) to resume from. Can also use "latest" to default to newest file.' - ), -) -flags.DEFINE_string( - "train_state_file", - "train_state.pt", - help=( - 'Train state filename (.pt) to resume from. Can also use "latest" to default to newest file.' - ), -) - -flags.DEFINE_integer("ntraining_steps", int(2e7), help="Number of training steps.") -flags.DEFINE_integer( - "validation_interval", - None, - help="Validation interval. Set `None` if validation loss is not needed", -) -flags.DEFINE_integer( - "nsave_steps", int(5000), help="Number of steps at which to save the model." -) - -# Learning rate parameters -flags.DEFINE_float("lr_init", 1e-4, help="Initial learning rate.") -flags.DEFINE_float("lr_decay", 0.1, help="Learning rate decay.") -flags.DEFINE_integer("lr_decay_steps", int(5e6), help="Learning rate decay steps.") - -flags.DEFINE_integer( - "cuda_device_number", - None, - help="CUDA device (zero indexed), default is None so default CUDA device will be used.", -) -flags.DEFINE_integer("n_gpus", 1, help="The number of GPUs to utilize for training.") - -flags.DEFINE_string( - "tensorboard_log_dir", "logs/", help="Directory for TensorBoard logs" -) - -FLAGS = flags.FLAGS +from gns.args import Config Stats = collections.namedtuple("Stats", ["mean", "std"]) -INPUT_SEQUENCE_LENGTH = 6 # So we can calculate the last 5 velocities. -NUM_PARTICLE_TYPES = 9 -KINEMATIC_PARTICLE_ID = 3 - def rollout( simulator: learned_simulator.LearnedSimulator, + cfg: DictConfig, position: torch.tensor, particle_types: torch.tensor, material_property: torch.tensor, @@ -112,8 +49,8 @@ def rollout( device: torch device. """ - initial_positions = position[:, :INPUT_SEQUENCE_LENGTH] - ground_truth_positions = position[:, INPUT_SEQUENCE_LENGTH:] + initial_positions = position[:, : cfg.data.input_sequence_length] + ground_truth_positions = position[:, cfg.data.input_sequence_length :] current_positions = initial_positions predictions = [] @@ -129,7 +66,10 @@ def rollout( # Update kinematic particles from prescribed trajectory. kinematic_mask = ( - (particle_types == KINEMATIC_PARTICLE_ID).clone().detach().to(device) + (particle_types == cfg.data.kinematic_particle_id) + .clone() + .detach() + .to(device) ) next_position_ground_truth = ground_truth_positions[:, step] kinematic_mask = kinematic_mask.bool()[:, None].expand( @@ -157,54 +97,54 @@ def rollout( "predicted_rollout": predictions.cpu().numpy(), "ground_truth_rollout": ground_truth_positions.cpu().numpy(), "particle_types": particle_types.cpu().numpy(), - "material_property": material_property.cpu().numpy() - if material_property is not None - else None, + "material_property": ( + material_property.cpu().numpy() if material_property is not None else None + ), } return output_dict, loss -def predict(device: str): +def predict(device: str, cfg: DictConfig): """Predict rollouts. Args: - simulator: Trained simulator if not will undergo training. + device: 'cpu' or 'cuda'. + cfg: configuration dictionary. """ # Read metadata - metadata = reading_utils.read_metadata(FLAGS.data_path, "rollout") - simulator = _get_simulator(metadata, FLAGS.noise_std, FLAGS.noise_std, device) + metadata = reading_utils.read_metadata(cfg.data.path, "rollout") + simulator = _get_simulator( + metadata, + cfg.data.num_particle_types, + cfg.data.noise_std, + cfg.data.noise_std, + device, + ) # Load simulator - if os.path.exists(FLAGS.model_path + FLAGS.model_file): - simulator.load(FLAGS.model_path + FLAGS.model_file) + if os.path.exists(cfg.model.path + cfg.model.file): + simulator.load(cfg.model.path + cfg.model.file) else: - raise Exception( - f"Model does not exist at {FLAGS.model_path + FLAGS.model_file}" - ) + raise Exception(f"Model does not exist at {cfg.model.path + cfg.model.file}") simulator.to(device) simulator.eval() # Output path - if not os.path.exists(FLAGS.output_path): - os.makedirs(FLAGS.output_path) + if not os.path.exists(cfg.output.path): + os.makedirs(cfg.output.path) # Use `valid`` set for eval mode if not use `test` split = ( "test" - if ( - FLAGS.mode == "rollout" - or (not os.path.isfile("{FLAGS.data_path}valid.npz")) - ) + if (cfg.mode == "rollout" or (not os.path.isfile("{cfg.data.path}valid.npz"))) else "valid" ) # Get dataset - ds = data_loader.get_data_loader_by_trajectories( - path=f"{FLAGS.data_path}{split}.npz" - ) + ds = data_loader.get_data_loader_by_trajectories(path=f"{cfg.data.path}{split}.npz") # See if our dataset has material property as feature if ( len(ds.dataset._data[0]) == 3 @@ -222,11 +162,11 @@ def predict(device: str): positions = features[0].to(device) if metadata["sequence_length"] is not None: # If `sequence_length` is predefined in metadata, - nsteps = metadata["sequence_length"] - INPUT_SEQUENCE_LENGTH + nsteps = metadata["sequence_length"] - cfg.data.input_sequence_length else: # If no predefined `sequence_length`, then get the sequence length sequence_length = positions.shape[1] - nsteps = sequence_length - INPUT_SEQUENCE_LENGTH + nsteps = sequence_length - cfg.data.input_sequence_length particle_type = features[1].to(device) if material_property_as_feature: material_property = features[2].to(device) @@ -242,6 +182,7 @@ def predict(device: str): # Predict example rollout example_rollout, loss = rollout( simulator, + cfg, positions, particle_type, material_property, @@ -255,11 +196,11 @@ def predict(device: str): eval_loss.append(torch.flatten(loss)) # Save rollout in testing - if FLAGS.mode == "rollout": + if cfg.mode == "rollout": example_rollout["metadata"] = metadata example_rollout["loss"] = loss.mean() - filename = f"{FLAGS.output_filename}_ex{example_i}.pkl" - filename = os.path.join(FLAGS.output_path, filename) + filename = f"{cfg.output.filename}_ex{example_i}.pkl" + filename = os.path.join(cfg.output.path, filename) with open(filename, "wb") as f: pickle.dump(example_rollout, f) @@ -304,7 +245,7 @@ def save_model_and_train_state( rank, device, simulator, - flags, + cfg, step, epoch, optimizer, @@ -319,7 +260,7 @@ def save_model_and_train_state( rank: local rank device: torch device type simulator: Trained simulator if not will undergo training. - flags: flags + cfg: Configuration dictionary. step: step epoch: epoch optimizer: optimizer @@ -330,9 +271,9 @@ def save_model_and_train_state( """ if rank == 0 or device == torch.device("cpu"): if device == torch.device("cpu"): - simulator.save(flags["model_path"] + "model-" + str(step) + ".pt") + simulator.save(cfg.model.path + "model-" + str(step) + ".pt") else: - simulator.module.save(flags["model_path"] + "model-" + str(step) + ".pt") + simulator.module.save(cfg.model.path + "model-" + str(step) + ".pt") train_state = dict( optimizer_state=optimizer.state_dict(), @@ -344,10 +285,10 @@ def save_model_and_train_state( }, loss_history={"train": train_loss_hist, "valid": valid_loss_hist}, ) - torch.save(train_state, f'{flags["model_path"]}train_state-{step}.pt') + torch.save(train_state, f"{cfg.model.path}train_state-{step}.pt") -def train(rank, flags, world_size, device): +def train(rank, cfg, world_size, device): """Train the model. Args: @@ -362,25 +303,33 @@ def train(rank, flags, world_size, device): device_id = device # Read metadata - metadata = reading_utils.read_metadata(flags["data_path"], "train") + metadata = reading_utils.read_metadata(cfg.data.path, "train") # Get simulator and optimizer if device == torch.device("cuda"): serial_simulator = _get_simulator( - metadata, flags["noise_std"], flags["noise_std"], rank + metadata, + cfg.data.num_particle_types, + cfg.data.noise_std, + cfg.data.noise_std, + rank, ) simulator = DDP( serial_simulator.to(rank), device_ids=[rank], output_device=rank ) optimizer = torch.optim.Adam( - simulator.parameters(), lr=flags["lr_init"] * world_size + simulator.parameters(), lr=cfg.training.learning_rate.initial * world_size ) else: simulator = _get_simulator( - metadata, flags["noise_std"], flags["noise_std"], device + metadata, + cfg.data.num_particle_types, + cfg.data.noise_std, + cfg.data.noise_std, + device, ) optimizer = torch.optim.Adam( - simulator.parameters(), lr=flags["lr_init"] * world_size + simulator.parameters(), lr=cfg.training.learning_rate.initial * world_size ) # Initialize training state @@ -396,10 +345,10 @@ def train(rank, flags, world_size, device): valid_loss_hist = [] # If model_path does exist and model_file and train_state_file exist continue training. - if flags["model_file"] is not None: - if flags["model_file"] == "latest" and flags["train_state_file"] == "latest": + if cfg.model.file is not None and cfg.training.resume: + if cfg.model.file == "latest" and cfg.model.train_state_file == "latest": # find the latest model, assumes model and train_state files are in step. - fnames = glob.glob(f'{flags["model_path"]}*model*pt') + fnames = glob.glob(f"{cfg.model.path}*model*pt") max_model_number = 0 expr = re.compile(".*model-(\d+).pt") for fname in fnames: @@ -407,20 +356,20 @@ def train(rank, flags, world_size, device): if model_num > max_model_number: max_model_number = model_num # reset names to point to the latest. - flags["model_file"] = f"model-{max_model_number}.pt" - flags["train_state_file"] = f"train_state-{max_model_number}.pt" + cfg.model.file = f"model-{max_model_number}.pt" + cfg.model.train_state_file = f"train_state-{max_model_number}.pt" - if os.path.exists(flags["model_path"] + flags["model_file"]) and os.path.exists( - flags["model_path"] + flags["train_state_file"] + if os.path.exists(cfg.model.path + cfg.model.file) and os.path.exists( + cfg.model.path + cfg.model.train_state_file ): # load model if device == torch.device("cuda"): - simulator.module.load(flags["model_path"] + flags["model_file"]) + simulator.module.load(cfg.model.path + cfg.model.file) else: - simulator.load(flags["model_path"] + flags["model_file"]) + simulator.load(cfg.model.path + cfg.model.file) # load train state - train_state = torch.load(flags["model_path"] + flags["train_state_file"]) + train_state = torch.load(cfg.model.path + cfg.model.train_state_file) # set optimizer state optimizer = torch.optim.Adam( @@ -438,7 +387,7 @@ def train(rank, flags, world_size, device): valid_loss_hist = train_state["loss_history"]["valid"] else: - msg = f'Specified model_file {flags["model_path"] + flags["model_file"]} and train_state_file {flags["model_path"] + flags["train_state_file"]} not found.' + msg = f"Specified model_file {cfg.model.path + cfg.model.file} and train_state_file {cfg.model.path + cfg.model.train_state_file} not found." raise FileNotFoundError(msg) simulator.train() @@ -453,18 +402,18 @@ def train(rank, flags, world_size, device): # Load training data dl = get_data_loader( - path=f'{flags["data_path"]}train.npz', - input_length_sequence=INPUT_SEQUENCE_LENGTH, - batch_size=flags["batch_size"], + path=f"{cfg.data.path}train.npz", + input_length_sequence=cfg.data.input_sequence_length, + batch_size=cfg.data.batch_size, ) n_features = len(dl.dataset._data[0]) # Load validation data - if flags["validation_interval"] is not None: + if cfg.training.validation_interval is not None: dl_valid = get_data_loader( - path=f'{flags["data_path"]}valid.npz', - input_length_sequence=INPUT_SEQUENCE_LENGTH, - batch_size=flags["batch_size"], + path=f"{cfg.data.path}valid.npz", + input_length_sequence=cfg.data.input_sequence_length, + batch_size=cfg.data.batch_size, ) if len(dl_valid.dataset._data[0]) != n_features: raise ValueError( @@ -474,25 +423,29 @@ def train(rank, flags, world_size, device): print(f"rank = {rank}, cuda = {torch.cuda.is_available()}") if rank == 0 or device == torch.device("cpu"): - writer = SummaryWriter(log_dir=flags["tensorboard_log_dir"]) + writer = SummaryWriter(log_dir=cfg.logging.tensorboard_dir) - writer.add_text("Data path", flags["data_path"]) writer.add_text("metadata", json.dumps(metadata, indent=4)) + yaml_config = OmegaConf.to_yaml(cfg) + writer.add_text("Config", yaml_config, global_step=0) # Log hyperparameters hparam_dict = { - "lr_init": flags["lr_init"], - "lr_decay": flags["lr_decay"], - "lr_decay_steps": flags["lr_decay_steps"], - "batch_size": flags["batch_size"], - "noise_std": flags["noise_std"], - "ntraining_steps": flags["ntraining_steps"], + "lr_init": cfg.training.learning_rate.initial, + "lr_decay": cfg.training.learning_rate.decay, + "lr_decay_steps": cfg.training.learning_rate.decay_steps, + "batch_size": cfg.data.batch_size, + "noise_std": cfg.data.noise_std, + "ntraining_steps": cfg.training.steps, } metric_dict = {"train_loss": 0, "valid_loss": 0} # Initial values writer.add_hparams(hparam_dict, metric_dict) try: - num_epochs = flags["ntraining_steps"] // len(dl) # Calculate total epochs + num_epochs = max( + 1, (cfg.training.steps + len(dl) - 1) // len(dl) + ) # Calculate total epochs + print(f"Total epochs = {num_epochs}") for epoch in tqdm(range(epoch, num_epochs), desc="Training", unit="epoch"): if device == torch.device("cuda"): torch.distributed.barrier() @@ -520,11 +473,11 @@ def train(rank, flags, world_size, device): sampled_noise = ( noise_utils.get_random_walk_noise_for_position_sequence( - position, noise_std_last_step=flags["noise_std"] + position, noise_std_last_step=cfg.data.noise_std ).to(device_id) ) non_kinematic_mask = ( - (particle_type != KINEMATIC_PARTICLE_ID) + (particle_type != cfg.data.kinematic_particle_id) .clone() .detach() .to(device_id) @@ -545,15 +498,17 @@ def train(rank, flags, world_size, device): device_or_rank ), particle_types=particle_type.to(device_or_rank), - material_property=material_property.to(device_or_rank) - if n_features == 3 - else None, + material_property=( + material_property.to(device_or_rank) + if n_features == 3 + else None + ), ) if ( - flags["validation_interval"] is not None + cfg.training.validation_interval is not None and step > 0 - and step % flags["validation_interval"] == 0 + and step % cfg.training.validation_interval == 0 ): if rank == 0 or device == torch.device("cpu"): sampled_valid_example = next(iter(dl_valid)) @@ -561,7 +516,7 @@ def train(rank, flags, world_size, device): simulator, sampled_valid_example, n_features, - flags, + cfg, rank, device_id, ) @@ -578,8 +533,11 @@ def train(rank, flags, world_size, device): optimizer.step() lr_new = ( - flags["lr_init"] - * (flags["lr_decay"] ** (step / flags["lr_decay_steps"])) + cfg.training.learning_rate.initial + * ( + cfg.training.learning_rate.decay + ** (step / cfg.training.learning_rate.decay_steps) + ) * world_size ) for param in optimizer.param_groups: @@ -598,14 +556,14 @@ def train(rank, flags, world_size, device): ) pbar.update(1) - if (rank == 0 or device == torch.device("cpu")) and step % flags[ - "nsave_steps" - ] == 0: + if ( + rank == 0 or device == torch.device("cpu") + ) and step % cfg.training.save_steps == 0: save_model_and_train_state( rank, device, simulator, - flags, + cfg, step, epoch, optimizer, @@ -616,7 +574,7 @@ def train(rank, flags, world_size, device): ) step += 1 - if step >= flags["ntraining_steps"]: + if step >= cfg.training.steps: break # Epoch level statistics @@ -629,10 +587,10 @@ def train(rank, flags, world_size, device): train_loss_hist.append((epoch, avg_loss.item())) - if flags["validation_interval"] is not None: + if cfg.training.validation_interval is not None: sampled_valid_example = next(iter(dl_valid)) epoch_valid_loss = validation( - simulator, sampled_valid_example, n_features, flags, rank, device_id + simulator, sampled_valid_example, n_features, cfg, rank, device_id ) if device == torch.device("cuda"): torch.distributed.reduce( @@ -643,12 +601,12 @@ def train(rank, flags, world_size, device): if rank == 0 or device == torch.device("cpu"): writer.add_scalar("Loss/train_epoch", avg_loss.item(), epoch) - if flags["validation_interval"] is not None: + if cfg.training.validation_interval is not None: writer.add_scalar( "Loss/valid_epoch", epoch_valid_loss.item(), epoch ) - if step >= flags["ntraining_steps"]: + if step >= cfg.training.steps: break except KeyboardInterrupt: pass @@ -658,7 +616,7 @@ def train(rank, flags, world_size, device): rank, device, simulator, - flags, + cfg, step, epoch, optimizer, @@ -676,7 +634,11 @@ def train(rank, flags, world_size, device): def _get_simulator( - metadata: json, acc_noise_std: float, vel_noise_std: float, device: torch.device + metadata: json, + num_particle_types: int, + acc_noise_std: float, + vel_noise_std: float, + device: torch.device, ) -> learned_simulator.LearnedSimulator: """Instantiates the simulator. @@ -725,18 +687,18 @@ def _get_simulator( connectivity_radius=metadata["default_connectivity_radius"], boundaries=np.array(metadata["bounds"]), normalization_stats=normalization_stats, - nparticle_types=NUM_PARTICLE_TYPES, + nparticle_types=num_particle_types, particle_type_embedding_size=16, - boundary_clamp_limit=metadata["boundary_augment"] - if "boundary_augment" in metadata - else 1.0, + boundary_clamp_limit=( + metadata["boundary_augment"] if "boundary_augment" in metadata else 1.0 + ), device=device, ) return simulator -def validation(simulator, example, n_features, flags, rank, device_id): +def validation(simulator, example, n_features, cfg, rank, device_id): position = example[0][0].to(device_id) particle_type = example[0][1].to(device_id) if n_features == 3: # if dl includes material_property @@ -750,10 +712,10 @@ def validation(simulator, example, n_features, flags, rank, device_id): # Sample the noise to add to the inputs. sampled_noise = noise_utils.get_random_walk_noise_for_position_sequence( - position, noise_std_last_step=flags["noise_std"] + position, noise_std_last_step=cfg.data.noise_std ).to(device_id) non_kinematic_mask = ( - (particle_type != KINEMATIC_PARTICLE_ID).clone().detach().to(device_id) + (particle_type != cfg.data.kinematic_particle_id).clone().detach().to(device_id) ) sampled_noise *= non_kinematic_mask.view(-1, 1, 1) @@ -773,9 +735,9 @@ def validation(simulator, example, n_features, flags, rank, device_id): position_sequence=position.to(device_or_rank), nparticles_per_example=n_particles_per_example.to(device_or_rank), particle_types=particle_type.to(device_or_rank), - material_property=material_property.to(device_or_rank) - if n_features == 3 - else None, + material_property=( + material_property.to(device_or_rank) if n_features == 3 else None + ), ) # Compute loss @@ -784,23 +746,22 @@ def validation(simulator, example, n_features, flags, rank, device_id): return loss -def main(_): +@hydra.main(version_base=None, config_path="..", config_name="config") +def main(cfg: Config): """Train or evaluates the model.""" device = torch.device("cuda" if torch.cuda.is_available() else "cpu") if device == torch.device("cuda"): os.environ["MASTER_ADDR"] = "localhost" os.environ["MASTER_PORT"] = "29500" - myflags = reading_utils.flags_to_dict(FLAGS) - - if FLAGS.mode == "train": + if cfg.mode == "train": # If model_path does not exist create new directory. - if not os.path.exists(FLAGS.model_path): - os.makedirs(FLAGS.model_path) + if not os.path.exists(cfg.model.path): + os.makedirs(cfg.model.path) # Create TensorBoard log directory - if not os.path.exists(FLAGS.tensorboard_log_dir): - os.makedirs(FLAGS.tensorboard_log_dir) + if not os.path.exists(cfg.logging.tensorboard_dir): + os.makedirs(cfg.logging.tensorboard_dir) # Train on gpu if device == torch.device("cuda"): @@ -808,36 +769,36 @@ def main(_): print(f"Available GPUs = {available_gpus}") # Set the number of GPUs based on availability and the specified number - if FLAGS.n_gpus is None or FLAGS.n_gpus > available_gpus: + if cfg.hardware.n_gpus is None or cfg.hardware.n_gpus > available_gpus: world_size = available_gpus - if FLAGS.n_gpus is not None: + if cfg.hardware.n_gpus is not None: print( - f"Warning: The number of GPUs specified ({FLAGS.n_gpus}) exceeds the available GPUs ({available_gpus})" + f"Warning: The number of GPUs specified ({cfg.hardware.n_gpus}) exceeds the available GPUs ({available_gpus})" ) else: - world_size = FLAGS.n_gpus + world_size = cfg.hardware.n_gpus # Print the status of GPU usage print(f"Using {world_size}/{available_gpus} GPUs") # Spawn training to GPUs - distribute.spawn_train(train, myflags, world_size, device) + distribute.spawn_train(train, cfg, world_size, device) # Train on cpu else: rank = None world_size = 1 - train(rank, myflags, world_size, device) + train(rank, cfg, world_size, device) - elif FLAGS.mode in ["valid", "rollout"]: + elif cfg.mode in ["valid", "rollout"]: # Set device world_size = torch.cuda.device_count() - if FLAGS.cuda_device_number is not None and torch.cuda.is_available(): - device = torch.device(f"cuda:{int(FLAGS.cuda_device_number)}") + if cfg.hardware.cuda_device_number is not None and torch.cuda.is_available(): + device = torch.device(f"cuda:{int(cfg.hardware.cuda_device_number)}") # test code print(f"device is {device} world size is {world_size}") - predict(device) + predict(device, cfg) if __name__ == "__main__": - app.run(main) + main() diff --git a/gns/train_multinode.py b/gns/train_multinode.py index 389b57a..7617f18 100644 --- a/gns/train_multinode.py +++ b/gns/train_multinode.py @@ -156,9 +156,9 @@ def rollout( "predicted_rollout": predictions.cpu().numpy(), "ground_truth_rollout": ground_truth_positions.cpu().numpy(), "particle_types": particle_types.cpu().numpy(), - "material_property": material_property.cpu().numpy() - if material_property is not None - else None, + "material_property": ( + material_property.cpu().numpy() if material_property is not None else None + ), } return output_dict, loss @@ -231,9 +231,9 @@ def rollout_par( "predicted_rollout": predictions.cpu().numpy(), "ground_truth_rollout": ground_truth_positions.cpu().numpy(), "particle_types": particle_types.cpu().numpy(), - "material_property": material_property.cpu().numpy() - if material_property is not None - else None, + "material_property": ( + material_property.cpu().numpy() if material_property is not None else None + ), } return output_dict, loss @@ -583,9 +583,9 @@ def train(rank, flags, world_size, verbose): position_sequence=position.to(rank), nparticles_per_example=n_particles_per_example.to(rank), particle_types=particle_type.to(rank), - material_property=material_property.to(rank) - if n_features == 3 - else None, + material_property=( + material_property.to(rank) if n_features == 3 else None + ), ) # Calculate the loss and mask out loss on kinematic particles @@ -706,9 +706,9 @@ def _get_simulator( normalization_stats=normalization_stats, nparticle_types=NUM_PARTICLE_TYPES, particle_type_embedding_size=16, - boundary_clamp_limit=metadata["boundary_augment"] - if "boundary_augment" in metadata - else 1.0, + boundary_clamp_limit=( + metadata["boundary_augment"] if "boundary_augment" in metadata else 1.0 + ), device=device, ) diff --git a/requirements.txt b/requirements.txt index 9693110..c6b45a9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,7 @@ absl-py black dm-tree +hydra-core matplotlib numpy==1.23.1 pyevtk @@ -12,4 +13,4 @@ torch_sparse torch_scatter torch-cluster tqdm -toml \ No newline at end of file +toml