From dac0ba1f6326ffdc082d8ea54e37aa7c4743ecb0 Mon Sep 17 00:00:00 2001 From: Kevin Moore Date: Mon, 20 Jan 2025 12:37:39 +0100 Subject: [PATCH] Refactor configuration and data handling; integrate Hydra for improved training and data module setup --- .gitignore | 1 + .hydra/config.yaml | 16 ++- src/final_project/config/data.yaml | 15 +++ src/final_project/config/model.yaml | 3 +- src/final_project/config/train.yaml | 9 +- src/final_project/data.py | 148 ++++++++++++++++------------ src/final_project/data_module.py | 110 ++++++++------------- src/final_project/train.py | 14 +-- 8 files changed, 165 insertions(+), 151 deletions(-) create mode 100644 src/final_project/config/data.yaml diff --git a/.gitignore b/.gitignore index 63e76f0..8b70f3d 100644 --- a/.gitignore +++ b/.gitignore @@ -180,3 +180,4 @@ data/raw/mental_disorders_reddit.csv /outputs /wandb /lightning_logs +/models diff --git a/.hydra/config.yaml b/.hydra/config.yaml index 1583996..66d148e 100644 --- a/.hydra/config.yaml +++ b/.hydra/config.yaml @@ -1,12 +1,18 @@ +seed: 42 +raw_data_path: data/raw/mental_disorders_reddit.csv +output_folder: data/processed +training_data_path: ${output_folder}/training_data.pt +testing_data_path: ${output_folder}/testing_data.pt +tokenizer_name: distilbert-base-uncased +max_length: 512 +batch_size: 32 +num_workers: 4 +data_percentage: 0.01 model_name_or_path: distilbert-base-uncased num_labels: 5 adam_epsilon: 1.0e-08 warmup_steps: 0 weight_decay: 0.0 -seed: 42 learning_rate: 0.001 -batch_size: 32 -epochs: 10 +epochs: 1 total_training_steps: 1000 -num_workers: 4 -data_percentage: 0.01 diff --git a/src/final_project/config/data.yaml b/src/final_project/config/data.yaml new file mode 100644 index 0000000..415e7a6 --- /dev/null +++ b/src/final_project/config/data.yaml @@ -0,0 +1,15 @@ +hydra: + run: + dir: . + +seed: 42 +raw_data_path: "data/raw/mental_disorders_reddit.csv" +output_folder: "data/processed" +training_data_path: "${output_folder}/training_data.pt" +testing_data_path: "${output_folder}/testing_data.pt" +tokenizer_name: "distilbert-base-uncased" +max_length: 512 + +batch_size: 32 +num_workers: 4 +data_percentage: 0.01 diff --git a/src/final_project/config/model.yaml b/src/final_project/config/model.yaml index bfd9761..2d0b6c0 100644 --- a/src/final_project/config/model.yaml +++ b/src/final_project/config/model.yaml @@ -1,6 +1,7 @@ +defaults: + - data model_name_or_path: "distilbert-base-uncased" num_labels: 5 adam_epsilon: 1e-8 warmup_steps: 0 weight_decay: 0.0 -seed: 42 diff --git a/src/final_project/config/train.yaml b/src/final_project/config/train.yaml index e879947..58f0017 100644 --- a/src/final_project/config/train.yaml +++ b/src/final_project/config/train.yaml @@ -1,13 +1,6 @@ defaults: - model -hydra: - run: - dir: . - learning_rate: 1e-3 -batch_size: 32 -epochs: 10 +epochs: 1 total_training_steps: 1000 -num_workers: 4 -data_percentage: 0.01 diff --git a/src/final_project/data.py b/src/final_project/data.py index d025f5b..2a8c015 100644 --- a/src/final_project/data.py +++ b/src/final_project/data.py @@ -1,62 +1,76 @@ import torch -from pytorch_lightning import LightningDataModule +from pytorch_lightning import seed_everything from pathlib import Path import pandas as pd from sklearn.model_selection import train_test_split -import json from transformers import AutoTokenizer import typer -from torch.utils.data import random_split, Dataset, DataLoader +from torch.utils.data import Dataset +import hydra +import logging +import random +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" +) -RAW_DATA_PATH = Path("data/raw/mental_disorders_reddit.csv") -OUTPUT_FOLDER = Path("data/processed").resolve() -TRAINING_DATA_PATH = OUTPUT_FOLDER / "training_data.csv" -TESTING_DATA_PATH = OUTPUT_FOLDER / "testing_data.csv" +log = logging.getLogger(__name__) class MentalDisordersDataset(Dataset): """My custom dataset.""" - def __init__(self, train: bool, data_dir: Path = OUTPUT_FOLDER) -> None: - self.data_dir = data_dir + def __init__(self, data_percentage: float, seed: int, train: bool, training_data_path: str, testing_data_path: str) -> None: + self.data_percentage = data_percentage + self.seed = seed try: - print("Loading data from ", self.data_dir) if train: - self.data_path = TRAINING_DATA_PATH + self.data_path = Path(training_data_path).resolve() + log.info(f"Loading training data from {self.data_path}") else: - self.data_path = TESTING_DATA_PATH - # Open and read the JSON file - # with open(self.data_path, 'r') as file: - # self.df = json.load(file) - self.df = pd.read_csv(self.data_path) + self.data_path = Path(testing_data_path).resolve() + log.info(f"Loading testing data from {self.data_path}") + + data_dict = torch.load(self.data_path, weights_only=False) + self.input_ids = data_dict["input_ids"] + self.attention_masks = data_dict["attention_mask"] + self.labels = data_dict["labels"] + # Apply subset logic if data_percentage < 1.0 + if self.data_percentage < 1.0: + self._sample_data() except FileNotFoundError: print("File not found. Did you preprocess the data?") + def _sample_data(self): + """Samples a subset of the data based on the specified percentage.""" + subset_size = int(len(self.labels) * self.data_percentage) + seed_everything(self.seed) + indices = random.sample(range(len(self.labels)), subset_size) + self.input_ids = [self.input_ids[i] for i in indices] + self.attention_masks = [self.attention_masks[i] for i in indices] + self.labels = [self.labels[i] for i in indices] + def __len__(self) -> int: """Return the length of the dataset.""" - return len(self.df) + return len(self.labels) def __getitem__(self, index: int): """Return a given sample from the dataset.""" - row = self.df.iloc[index] - - input_ids = torch.tensor(eval(row["input_ids"]), dtype=torch.long) - attention_mask = torch.tensor( - eval(row["attention_mask"]), dtype=torch.long) - label = torch.tensor(row["label"], dtype=torch.long) - return { - "input_ids": input_ids, - "attention_mask": attention_mask, - "labels": label, + "input_ids": self.input_ids[index], + "attention_mask": self.attention_masks[index], + "labels": self.labels[index], } -def preprocess(raw_data_path: Path = RAW_DATA_PATH, output_folder: Path = OUTPUT_FOLDER, tokenizer_name: str = "bert-base-uncased", max_length: int = 512) -> None: - print("Preprocessing data...") - raw_data = pd.read_csv(raw_data_path) +def preprocess(raw_data_path: str, training_data_path: str, testing_data_path, tokenizer_name: str = "distilbert-base-uncased", max_length: int = 512) -> None: + log.info("Preprocessing data...") + raw_data = pd.read_csv(Path(raw_data_path).resolve()) preprocessed_data = raw_data.copy() + ################## + # CLEANING LOGIC # + ################## preprocessed_data.dropna( subset=["title", "selftext", "subreddit"], inplace=True) # Remove any rows where the selftext is [removed] or [deleted] @@ -68,54 +82,66 @@ def preprocess(raw_data_path: Path = RAW_DATA_PATH, output_folder: Path = OUTPUT # Remove low effort posts under 20 characters preprocessed_data = preprocessed_data[preprocessed_data["selftext"].apply( len) > 20] - + # Combine title and selftext into text column preprocessed_data["text"] = preprocessed_data["title"] + \ "\n" + preprocessed_data["selftext"] - - # preprocessed_data.rename( - # columns={"subreddit": "label"}, inplace=True) - - # Map string labels to integers + # Map subreddit to label label_mapping = {label: idx for idx, label in enumerate( preprocessed_data["subreddit"].unique())} - preprocessed_data["label"] = preprocessed_data["subreddit"].map( + preprocessed_data["labels"] = preprocessed_data["subreddit"].map( label_mapping) + ###################### + # TOKENIZATION LOGIC # + ###################### + # Tokenize text + log.info("Tokenizing text...") tokenizer = AutoTokenizer.from_pretrained(tokenizer_name) - tokens = tokenizer( - preprocessed_data["text"].tolist(), + + df_train, df_test = train_test_split( + preprocessed_data, test_size=0.1, random_state=42) + + train_tokens = tokenizer( + df_train["text"].tolist(), truncation=True, padding="max_length", max_length=max_length, return_tensors="pt" ) + train_dict = { + "input_ids": train_tokens["input_ids"], + "attention_mask": train_tokens["attention_mask"], + "labels": torch.tensor(df_train["labels"].tolist(), dtype=torch.long), + } + + test_tokens = tokenizer( + df_test["text"].tolist(), + truncation=True, + padding="max_length", + max_length=max_length, + return_tensors="pt" + ) + test_dict = { + "input_ids": test_tokens["input_ids"], + "attention_mask": test_tokens["attention_mask"], + "labels": torch.tensor(df_test["labels"].tolist(), dtype=torch.long), + } - # Add tokenized data to DataFrame - preprocessed_data["input_ids"] = tokens["input_ids"].tolist() - preprocessed_data["attention_mask"] = tokens["attention_mask"].tolist() - - # Calculate percentages for each unique value - label_percentage = preprocessed_data['label'].value_counts( - normalize=True) * 100 - label_percentage = label_percentage.round(2) - print("Label percentages:") - print(label_percentage) - - # Save the data - df_train, df_test = train_test_split( - preprocessed_data, test_size=0.1, random_state=42) + # Save both dictionaries with torch.save + torch.save(train_dict, Path(training_data_path).resolve()) + torch.save(test_dict, Path(testing_data_path).resolve()) - # Convert to csv - with open(output_folder / "label_mapping.json", "w") as f: - json.dump(label_mapping, f) - df_train.to_csv(TRAINING_DATA_PATH, index=False) + log.info( + f"Saved train and test splits at {training_data_path} and {testing_data_path}") + log.info(f"Label mapping: {label_mapping}") - df_test.to_csv(TESTING_DATA_PATH, index=False) - print(f"Saved train and test splits at {OUTPUT_FOLDER}") - print(f"Label mapping: {label_mapping}") +@hydra.main(version_base="1.1", config_path="config", config_name="data.yaml") +def main(cfg): + preprocess(raw_data_path=cfg.raw_data_path, training_data_path=cfg.training_data_path, + testing_data_path=cfg.testing_data_path, tokenizer_name=cfg.tokenizer_name, max_length=cfg.max_length) if __name__ == "__main__": - typer.run(preprocess) + main() diff --git a/src/final_project/data_module.py b/src/final_project/data_module.py index de5ab4a..375209b 100644 --- a/src/final_project/data_module.py +++ b/src/final_project/data_module.py @@ -1,71 +1,45 @@ -from src.final_project.data import MentalDisordersDataset, OUTPUT_FOLDER -from pathlib import Path -import pandas as pd -from pytorch_lightning import LightningDataModule -from torch.utils.data import DataLoader, random_split, Subset +from src.final_project.data import MentalDisordersDataset +from pytorch_lightning import LightningDataModule, seed_everything +from torch.utils.data import DataLoader, random_split import torch -import random +import hydra +import logging +log = logging.getLogger(__name__) class MentalDisordersDataModule(LightningDataModule): - def __init__(self, data_dir: Path = OUTPUT_FOLDER, batch_size: int = 32, num_workers: int = 4, data_percentage: float = 1.0): + def __init__(self, cfg): super().__init__() - self.data_dir = Path(data_dir).resolve() - self.batch_size = batch_size - self.num_workers = num_workers - self.data_percentage = data_percentage - - def sample_dataset(self, dataset): - """Samples a subset of the dataset based on the specified percentage.""" - if self.data_percentage < 1.0: - subset_size = int(len(dataset) * self.data_percentage) - indices = random.sample(range(len(dataset)), subset_size) - return Subset(dataset, indices) - return dataset + self.batch_size = cfg.batch_size + self.num_workers = cfg.num_workers + self.data_percentage = cfg.data_percentage + self.seed = cfg.seed + self.training_data_path = cfg.training_data_path + self.testing_data_path = cfg.testing_data_path def setup(self, stage: str): - # trainFullDataset = MentalDisordersDataset( - # train=True, data_dir=self.data_dir) - # # Sample the dataset if a smaller percentage is specified - # trainFullDataset = self.sample_dataset(trainFullDataset) - - # self.train, self.val = random_split( - # trainFullDataset, - # [int(0.9 * len(trainFullDataset)), - # int(0.1 * len(trainFullDataset))], - # generator=torch.Generator().manual_seed(42)) - - # self.test = MentalDisordersDataset( - # train=False, data_dir=self.data_dir) - # Load the full training dataset - trainFullDataset = MentalDisordersDataset( - train=True, data_dir=self.data_dir) - - # Apply sampling if data_percentage is less than 1.0 - if self.data_percentage < 1.0: - subset_size = int(len(trainFullDataset) * self.data_percentage) - indices = random.sample(range(len(trainFullDataset)), subset_size) - trainFullDataset = Subset(trainFullDataset, indices) + seed_everything(self.seed) + if stage == "fit": + trainFullDataset = MentalDisordersDataset(data_percentage=self.data_percentage, seed=self.seed, + train=True, training_data_path=self.training_data_path, testing_data_path=self.testing_data_path) - # Calculate split sizes based on the sampled dataset - train_size = int(0.9 * len(trainFullDataset)) - val_size = len(trainFullDataset) - train_size - print(f"Sampled dataset length: {len(trainFullDataset)}") - print(f"Train size: {train_size}, Validation size: {val_size}") + # Calculate split sizes based on the sampled dataset + train_size = int(0.9 * len(trainFullDataset)) + val_size = len(trainFullDataset) - train_size + print(f"Sampled dataset length: {len(trainFullDataset)}") + print(f"Train size: {train_size}, Validation size: {val_size}") - # Perform the split - self.train, self.val = random_split( - trainFullDataset, - [train_size, val_size], - generator=torch.Generator().manual_seed(42) - ) + # Perform the split + self.train, self.val = random_split( + trainFullDataset, + [train_size, val_size], + generator=torch.Generator() + ) - # Load and sample the test dataset (if needed) - self.test = MentalDisordersDataset(train=False, data_dir=self.data_dir) - if self.data_percentage < 1.0: - subset_size = int(len(self.test) * self.data_percentage) - indices = random.sample(range(len(self.test)), subset_size) - self.test = Subset(self.test, indices) + if stage == "test": + # Load and sample the test dataset (if needed) + self.test = MentalDisordersDataset(data_percentage=self.data_percentage, seed=self.seed, + train=False, training_data_path=self.training_data_path, testing_data_path=self.testing_data_path) def train_dataloader(self): return DataLoader(self.train, batch_size=self.batch_size, num_workers=self.num_workers, persistent_workers=True) @@ -77,16 +51,18 @@ def test_dataloader(self): return DataLoader(self.test, batch_size=self.batch_size, num_workers=self.num_workers) -if __name__ == "__main__": - data_module = MentalDisordersDataModule() +@hydra.main(version_base="1.1", config_path="config", config_name="data.yaml") +def main(cfg): + data_module = MentalDisordersDataModule(cfg) data_module.setup("fit") train_loader = data_module.train_dataloader() + log.info(f"Train loader length: {len(train_loader)}") val_loader = data_module.val_dataloader() + log.info(f"Val loader length: {len(val_loader)}") + data_module.setup("test") test_loader = data_module.test_dataloader() - print("DataModule setup complete.") - print("Train loader length: ", len(train_loader)) - print("Val loader length: ", len(val_loader)) - print("Test loader length: ", len(test_loader)) - # print("Example batch: ", next(iter(train_loader))) - # print("Example batch: ", next(iter(val_loader))) - # print("Example batch: ", next(iter(test_loader))) + log.info(f"Test loader length: {len(test_loader)}") + + +if __name__ == "__main__": + main() diff --git a/src/final_project/train.py b/src/final_project/train.py index dd1c2a5..5d0442d 100644 --- a/src/final_project/train.py +++ b/src/final_project/train.py @@ -13,14 +13,12 @@ ) else "mps" if torch.backends.mps.is_available() else "cpu") -def train(model, batch_size: int = 32, num_workers: int = 4, epochs: int = 3, learning_rate: float = 1e-5, data_percentage: float = 1.0): +def train(model, cfg): log.info("Starting training") - log.info(f"{learning_rate=}, {batch_size=}, {epochs=}") + log.info(f"{cfg.learning_rate=}, {cfg.batch_size=}, {cfg.epochs=}") # Instantiate DataModule dm = MentalDisordersDataModule( - batch_size=batch_size, - num_workers=num_workers, - data_percentage=data_percentage, + cfg=cfg ) # Make sure to call prepare_data() + setup() to figure out #labels if needed @@ -35,7 +33,7 @@ def train(model, batch_size: int = 32, num_workers: int = 4, epochs: int = 3, le # Create a trainer # Inside main() function after setting up the model trainer = pl.Trainer( - max_epochs=epochs, + max_epochs=cfg.epochs, accelerator="auto", devices="auto", logger=True, @@ -52,9 +50,7 @@ def train(model, batch_size: int = 32, num_workers: int = 4, epochs: int = 3, le @hydra.main(version_base="1.1", config_path="config", config_name="train.yaml") def main(cfg): model = AwesomeModel(cfg).to(DEVICE) - model.train() - train(model, num_workers=cfg.num_workers, batch_size=cfg.batch_size, - epochs=cfg.epochs, learning_rate=cfg.learning_rate, data_percentage=cfg.data_percentage) + train(model, cfg) if __name__ == "__main__":