diff --git a/doc/prompt/model.md b/doc/prompt/model.md new file mode 100644 index 0000000..ee10588 --- /dev/null +++ b/doc/prompt/model.md @@ -0,0 +1,10 @@ +```csv +date,segment,time_sec,time_min,frequency_hz,amplitude,phase,label +2024-11-17,1,7.0,0.11666666666666667,37993.1,12.93741,-4.32236,0 +2024-11-17,1,7.0,0.11666666666666667,41437.9,12.87911,-4.04065,0 +2024-11-17,1,8.0,0.13333333333333333,44882.8,12.78722,-3.65708,0 +2024-11-17,1,8.0,0.13333333333333333,48327.6,12.73593,-3.31598,0 +2024-11-17,1,9.0,0.15,51772.4,12.69731,-2.83716,0 +``` +我的数据如上,请你基于Pytorch写一个血糖三分类的模型,label 0 代表低,1代表中,2代表高。 +请你将数据按照1分钟的长度来进行划分,写出dataset和dataloader,并帮我写出完整的训练验证以及测试流程代码。 \ No newline at end of file diff --git a/doc/prompt/preprocess.md b/doc/prompt/preprocess.md index f63563a..17b854a 100644 --- a/doc/prompt/preprocess.md +++ b/doc/prompt/preprocess.md @@ -1,479 +1,65 @@ -```python -import torch -import torch.nn as nn -import torch.nn.functional as F -from lightning import LightningModule -from torch.optim import Adam -from torch.optim.lr_scheduler import StepLR -from torchmetrics import MeanMetric - - -class PPGBaseModule(LightningModule): - def __init__( - self, - input_dim: int = 14, # 每个时间步的特征数 - window_size: int = 256, # 时间步数 - hidden_dim: int = 128, - lr: float = 1e-3, - scheduler_step_size: int = 10, - scheduler_gamma: float = 0.1, - ): - """ - LightningModule for PPG-based physiological indicator prediction. - - :param input_dim: Number of input features per time step. - :param window_size: Number of time steps per window. - :param hidden_dim: Number of hidden units. - :param lr: Learning rate. - :param scheduler_step_size: Step size for learning rate scheduler. - :param scheduler_gamma: Gamma for learning rate scheduler. - """ - super().__init__() - self.save_hyperparameters() - - # 定义1D卷积网络 - self.conv1 = nn.Conv1d(in_channels=input_dim, out_channels=hidden_dim, kernel_size=3, padding=1) - self.bn1 = nn.BatchNorm1d(hidden_dim) - self.conv2 = nn.Conv1d(in_channels=hidden_dim, out_channels=hidden_dim*2, kernel_size=3, padding=1) - self.bn2 = nn.BatchNorm1d(hidden_dim*2) - self.pool = nn.MaxPool1d(kernel_size=2) - - # 计算池化后的时间步数 - pooled_size = window_size // 2 # 因为进行了一个池化层,时间步数减半 - - self.fc1 = nn.Linear((hidden_dim*2) * pooled_size, hidden_dim) - self.dropout = nn.Dropout(0.5) - self.fc2 = nn.Linear(hidden_dim, 1) - - # Loss function - self.criterion = nn.MSELoss() - - # Metrics - self.train_mse = MeanMetric() - self.val_mse = MeanMetric() - self.test_mse = MeanMetric() - - def forward(self, x): - """ - Forward pass. - - :param x: Input tensor of shape [batch_size, window_size, 14] - :return: Predicted tensor of shape [batch_size] - """ - x = x.permute(0, 2, 1) # 转换为 [batch_size, 14, window_size] 以适应 Conv1d - x = F.relu(self.bn1(self.conv1(x))) # [batch_size, hidden_dim, window_size] - x = F.relu(self.bn2(self.conv2(x))) # [batch_size, hidden_dim*2, window_size] - x = self.pool(x) # [batch_size, hidden_dim*2, window_size//2] - x = x.view(x.size(0), -1) # 展平成 [batch_size, hidden_dim*2 * (window_size//2)] - x = F.relu(self.fc1(x)) # [batch_size, hidden_dim] - x = self.dropout(x) - x = self.fc2(x).squeeze(1) # [batch_size] - return x - - def training_step(self, batch, batch_idx): - """ - Training step. - - :param batch: Batch of data. - :param batch_idx: Batch index. - :return: Loss value. - """ - x, y = batch # x: [batch_size, window_size, 14], y: [batch_size] - y_pred = self.forward(x) # [batch_size] - loss = self.criterion(y_pred, y) - - # Log loss and metric - self.log("train/loss", loss, on_step=False, on_epoch=True, prog_bar=True) - self.train_mse(y_pred, y) - self.log( - "train/mse", self.train_mse, on_step=False, on_epoch=True, prog_bar=True - ) - - return loss - - def validation_step(self, batch, batch_idx): - """ - Validation step. - - :param batch: Batch of data. - :param batch_idx: Batch index. - """ - x, y = batch # x: [batch_size, window_size, 14], y: [batch_size] - y_pred = self.forward(x) # [batch_size] - loss = self.criterion(y_pred, y) - - # Log loss and metric - self.log("val/loss", loss, on_step=False, on_epoch=True, prog_bar=True) - self.val_mse(y_pred, y) - self.log("val/mse", self.val_mse, on_step=False, on_epoch=True, prog_bar=True) - - def test_step(self, batch, batch_idx): - """ - Test step. - - :param batch: Batch of data. - :param batch_idx: Batch index. - """ - x, y = batch # x: [batch_size, window_size, 14], y: [batch_size] - y_pred = self.forward(x) # [batch_size] - loss = self.criterion(y_pred, y) - - # Log loss and metric - self.log("test/loss", loss, on_step=False, on_epoch=True, prog_bar=True) - self.test_mse(y_pred, y) - self.log("test/mse", self.test_mse, on_step=False, on_epoch=True, prog_bar=True) - - def configure_optimizers(self): - """ - Configure optimizers and learning rate schedulers. - - :return: Optimizer and scheduler configuration. - """ - optimizer = Adam(self.parameters(), lr=self.hparams.lr) - scheduler = StepLR( - optimizer, - step_size=self.hparams.scheduler_step_size, - gamma=self.hparams.scheduler_gamma, - ) - return [optimizer], [scheduler] +我现在需要做一个血糖高中低三分类的项目,我的数据集如下 +```zsh +(base) sisyphus@sisyphus-dual4090  ~/Projects/PPG/data   master  tree GlucoseBIT +GlucoseBIT +├── 2024-11-13 +│   ├── 1-24.txt +│   ├── 2-24.txt +│   └── 3-24.txt +├── 2024-11-17 +│   ├── 1-10.txt +│   ├── 2-12.txt +│   ├── 3-10.txt +│   ├── 4-12.txt +│   ├── 5-10.txt +│   └── 6-12.txt +└── 2024-11-18 + ├── 1-12.txt + ├── 2-12.txt + ├── 3-12.txt + ├── 4-10.txt + ├── 5-12.txt + ├── 6-10.txt + └── 7-10.txt + +3 directories, 16 files ``` -这是我的一个能够运行的例子,我的项目是基于lightning的项目,所以我希望你把MvCFT的代码写成上述格式,以便我们能够快速地进行实验。 -```python -import os -import pickle -from typing import Any, Dict, Optional - -import torch -from torch.utils.data import Dataset, DataLoader, random_split -from lightning import LightningDataModule - - -class TorchStandardScaler: - def __init__(self): - self.mean = None - self.std = None - - def fit(self, data: torch.Tensor): - """ - 计算数据的均值和标准差。 - - :param data: 输入数据张量,形状为 [时间步, 特征] - """ - self.mean = torch.mean(data, dim=0) - self.std = torch.std(data, dim=0) - return self - - def transform(self, data: torch.Tensor) -> torch.Tensor: - """ - 使用计算得到的均值和标准差对数据进行标准化。 - - :param data: 输入数据张量,形状为 [时间步, 特征] - :return: 标准化后的数据张量 - """ - return (data - self.mean) / self.std - - def fit_transform(self, data: torch.Tensor) -> torch.Tensor: - """ - 计算均值和标准差,并对数据进行标准化。 - - :param data: 输入数据张量,形状为 [时间步, 特征] - :return: 标准化后的数据张量 - """ - self.fit(data) - return self.transform(data) - - -class PPGDataset(Dataset): - def __init__( - self, - data: Dict[str, Any], - labels: torch.Tensor, - window_size: int = 256, - stride: int = 128, - transform: Optional[Any] = None, - ): - """ - 自定义 PPG 数据集。 - - :param data: 包含所有受试者拼接信号数据的字典。 - :param labels: 拼接后的标签张量。 - :param window_size: 每个窗口的时间步数。 - :param stride: 窗口之间的步幅。 - :param transform: 可选的变换操作。 - """ - self.window_size = window_size - self.stride = stride - self.transform = transform - - # 假设所有信号已被截断到相同长度 - self.length = data["chest"]["ECG"].shape[0] - self.num_windows = (self.length - window_size) // stride + 1 - - # 验证所有信号长度是否相同 - for location in ["chest", "wrist"]: - for sensor in data[location]: - assert data[location][sensor].shape[0] == self.length, ( - f"信号长度不匹配在 {location}/{sensor}: " - f"期望 {self.length}, 但得到 {data[location][sensor].shape[0]}" - ) - - # 验证标签长度 - assert ( - len(labels) >= self.num_windows - ), f"标签数量 ({len(labels)}) 少于窗口数量 ({self.num_windows})。" - - # 使用 TorchStandardScaler 对信号进行标准化 - self.scalers = {} - for location in ["chest", "wrist"]: - self.scalers[location] = {} - for sensor, signal in data[location].items(): - scaler = TorchStandardScaler() - if signal.ndimension() > 1: - scaler.fit(signal) - self.scalers[location][sensor] = scaler - else: - scaler.fit(signal.unsqueeze(1)) - self.scalers[location][sensor] = scaler - - # 标准化信号 - if signal.ndimension() > 1: - data[location][sensor] = scaler.transform(signal) - else: - data[location][sensor] = scaler.transform(signal.unsqueeze(1)).squeeze() - - self.data = data - self.labels = labels[:self.num_windows] # 确保标签与窗口数量匹配 - - def __len__(self): - return self.num_windows - - def __getitem__(self, idx): - """ - 获取指定索引的窗口数据和对应标签。 - - :param idx: 窗口的索引。 - :return: (特征张量, 标签张量) 的元组 - """ - start = idx * self.stride - end = start + self.window_size - - features = {} - for location in ["chest", "wrist"]: - features[location] = {} - for sensor, signal in self.data[location].items(): - window = signal[start:end] - features[location][sensor] = window - - # 将所有传感器的数据沿特征维度拼接 - chest_features = torch.cat( - [features["chest"][sensor] for sensor in features["chest"]], dim=1 - ) # 形状: [window_size, chest_features] - wrist_features = torch.cat( - [features["wrist"][sensor] for sensor in features["wrist"]], dim=1 - ) # 形状: [window_size, wrist_features] - combined_features = torch.cat([chest_features, wrist_features], dim=1) # 形状: [window_size, total_features] - - if self.transform: - combined_features = self.transform(combined_features) - - # 保留整个窗口的特征,不进行汇聚 - aggregated_features = combined_features # 形状: [window_size, total_features] - - # 获取对应的标签 - label = self.labels[idx] - - return aggregated_features, label - - -class PPGDataModule(LightningDataModule): - def __init__( - self, - data_dir: str = "./data/PPG_FieldStudy", - window_size: int = 256, - stride: int = 128, - batch_size: int = 64, - num_workers: int = 0, # 调试时设置为 0 - train_split: float = 0.7, - val_split: float = 0.15, - test_split: float = 0.15, - ): - """ - PPG 数据集的 DataModule。 - - :param data_dir: 包含受试者文件夹的根目录。 - :param window_size: 每个窗口的时间步数。 - :param stride: 窗口之间的步幅。 - :param batch_size: 批大小。 - :param num_workers: DataLoader 的工作线程数量。 - :param train_split: 训练集的比例。 - :param val_split: 验证集的比例。 - :param test_split: 测试集的比例。 - """ - super().__init__() - self.data_dir = data_dir - self.window_size = window_size - self.stride = stride - self.batch_size = batch_size - self.num_workers = num_workers - self.train_split = train_split - self.val_split = val_split - self.test_split = test_split - - self.train_dataset: Optional[Dataset] = None - self.val_dataset: Optional[Dataset] = None - self.test_dataset: Optional[Dataset] = None - - def setup(self, stage: Optional[str] = None): - """ - 加载并预处理所有受试者的数据。 - - :param stage: 可选的阶段 (fit, validate, test, predict)。 - """ - # 初始化用于拼接数据的字典 - concatenated_data = { - "chest": { - "ACC": [], - "ECG": [], - "EMG": [], - "EDA": [], - "Temp": [], - "Resp": [], - }, - "wrist": { - "ACC": [], - "BVP": [], - "EDA": [], - "TEMP": [], - }, - } - concatenated_labels = [] - - # 遍历每个受试者目录 (S1 到 S15) - for subject_num in range(1, 16): - subject_dir = os.path.join(self.data_dir, f"S{subject_num}") - pkl_file = os.path.join(subject_dir, f"S{subject_num}.pkl") - - if not os.path.isfile(pkl_file): - print(f"Pickle 文件未找到: {pkl_file}. 跳过。") - continue - - try: - with open(pkl_file, "rb") as f: - subject_data = pickle.load(f, encoding="latin1") - print(f"成功加载 {pkl_file}.") - except Exception as e: - print(f"加载 {pkl_file} 时出错: {e}. 跳过。") - continue - - # 拼接每个传感器的数据 - for location in ["chest", "wrist"]: - for sensor, signal in subject_data["signal"][location].items(): - concatenated_data[location][sensor].append(torch.tensor(signal, dtype=torch.float32)) - - # 拼接标签 - concatenated_labels.append(torch.tensor(subject_data["label"], dtype=torch.float32)) - - if not concatenated_labels: - raise ValueError( - "未加载到任何有效的 Pickle 文件。请检查你的数据。" - ) - - # 拼接所有受试者的信号数据 - for location in ["chest", "wrist"]: - for sensor in concatenated_data[location]: - # 沿时间轴拼接 - concatenated_data[location][sensor] = torch.cat( - concatenated_data[location][sensor], dim=0 - ) - print( - f"传感器 '{location}/{sensor}' 拼接后的形状: {concatenated_data[location][sensor].shape}" - ) - - # 计算所有传感器的最小长度 - lengths = [ - concatenated_data[location][sensor].shape[0] - for location in ["chest", "wrist"] - for sensor in concatenated_data[location] - ] - min_length = min(lengths) - print(f"所有传感器的最小信号长度: {min_length}") - - # 将所有信号截断到最小长度 - for location in ["chest", "wrist"]: - for sensor in concatenated_data[location]: - original_length = concatenated_data[location][sensor].shape[0] - concatenated_data[location][sensor] = concatenated_data[location][sensor][:min_length] - print( - f"截断 '{location}/{sensor}' 从 {original_length} 到 {min_length}" - ) - - # 拼接所有标签 - concatenated_labels = torch.cat(concatenated_labels, dim=0) - print(f"总拼接标签的形状: {concatenated_labels.shape}") - - # 根据最小长度计算窗口数量 - num_windows = (min_length - self.window_size) // self.stride + 1 - print(f"基于最小长度的窗口数量: {num_windows}") - - # 确保有足够的标签 - if len(concatenated_labels) < num_windows: - raise ValueError( - f"标签数量 ({len(concatenated_labels)}) 少于窗口数量 ({num_windows})。" - ) - concatenated_labels = concatenated_labels[:num_windows] - print( - f"截断标签以匹配窗口数量: {concatenated_labels.shape}" - ) - - # 初始化 PPGDataset - dataset = PPGDataset( - data=concatenated_data, - labels=concatenated_labels, - window_size=self.window_size, - stride=self.stride, - ) - - # 将数据集分割为训练集、验证集和测试集 - total_length = len(dataset) - train_length = int(self.train_split * total_length) - val_length = int(self.val_split * total_length) - test_length = total_length - train_length - val_length - - self.train_dataset, self.val_dataset, self.test_dataset = random_split( - dataset, - [train_length, val_length, test_length], - generator=torch.Generator().manual_seed(42), - ) - - print( - f"数据集已分割为 训练集: {train_length}, 验证集: {val_length}, 测试集: {test_length}" - ) - - def train_dataloader(self): - return DataLoader( - self.train_dataset, - batch_size=self.batch_size, - shuffle=True, - num_workers=self.num_workers, # 调试时设置为 0 - pin_memory=True, - ) - - def val_dataloader(self): - return DataLoader( - self.val_dataset, - batch_size=self.batch_size, - shuffle=False, - num_workers=self.num_workers, # 调试时设置为 0 - pin_memory=True, - ) - - def test_dataloader(self): - return DataLoader( - self.test_dataset, - batch_size=self.batch_size, - shuffle=False, - num_workers=self.num_workers, # 调试时设置为 0 - pin_memory=True, - ) +每个txt文件的文件名意义如下: +-号前面的数字代表第几段,-号后面的数字代表持续时间, +例如2024-11-17的3-10.txt文件中记录的是当次实验的第22-32分钟时间段的数据。 +每个txt内部的数据如下 +```txt + 00:00:02 +6989.7 hz +15.40351 -7.47591 + 00:00:03 +10434.5 hz +15.56827 -9.31106 + 00:00:03 +13879.3 hz +15.69426 -14.58940 + 00:00:04 +17324.1 hz +15.27117 -19.40495 + 00:00:04 +20769.0 hz +15.15964 -20.27624 + 00:00:05 +24213.8 hz +14.57288 -18.74838 + 00:00:05 +27658.6 hz +14.27791 -15.12476 + 00:00:06 +31103.4 hz +14.10476 -9.03115 ``` -上面的代码是我目前加载数据集所使用的代码,请你根据论文中提到的方法对其进行修改。 \ No newline at end of file +以下面的数据单元为例 + 00:00:06 +31103.4 hz +14.10476 -9.03115 +最上面的是时间,第二行是频率,第三行左边的是幅值,右边的是相位。 + +请你完成几件事情: +1.将每一天的数据集合并成一份,并且存储为csv格式,并且根据时间打上标签0-10分钟和65分钟之后是低,10-25和45-65分钟是中,25-45是高。 +2.使用Pytorch框架来帮我写一个血糖三分类的模型 \ No newline at end of file diff --git a/extracode/preprocess/GlucoseBITProcess.py b/extracode/preprocess/GlucoseBITProcess.py new file mode 100644 index 0000000..c2d5a42 --- /dev/null +++ b/extracode/preprocess/GlucoseBITProcess.py @@ -0,0 +1,308 @@ +import os +import pandas as pd +import re +import logging + +# 配置日志 +logging.basicConfig( + filename='GlucoseBITProcess.log', + filemode='w', + level=logging.WARNING, + format='%(asctime)s - %(levelname)s - %(message)s' +) + +# 定义标签函数 +def assign_label(total_minutes): + if 0 <= total_minutes < 10 or total_minutes >= 65: + return 'low' + elif 10 <= total_minutes < 25 or 45 <= total_minutes < 65: + return 'medium' + elif 25 <= total_minutes < 45: + return 'high' + else: + return 'low' # 默认归为低 + +# 定义数据目录 +data_dir = '../../data/GlucoseBIT' # 请根据实际路径调整 + +# 初始化一个空的列表来收集数据 +data_list = [] + +# 定义辅助函数来识别行类型 +def is_time_line(line): + return ':' in line + +def is_frequency_line(line): + return 'hz' in line.lower() + +def is_amp_phase_line(line): + parts = line.split() + if len(parts) != 2: + return False + try: + float(parts[0]) + float(parts[1]) + return True + except ValueError: + return False + +# 遍历每个日期文件夹 +for date_folder in os.listdir(data_dir): + date_path = os.path.join(data_dir, date_folder) + if os.path.isdir(date_path): + # 获取所有txt文件,并按段号排序 + try: + txt_files = sorted( + [f for f in os.listdir(date_path) if f.endswith('.txt')], + key=lambda x: int(x.split('-')[0]) + ) + except ValueError as ve: + logging.warning(f"在排序文件时出错: {ve},文件夹: {date_path}") + continue + + # 初始化累计时间在日期文件夹级别 + cumulative_time = 0 # 以分钟为单位 + + for txt_file in txt_files: + # 从文件名中提取段号和持续时间 + match = re.match(r'(\d+)-(\d+)\.txt', txt_file) + if match: + segment_num = int(match.group(1)) + duration_min = int(match.group(2)) # 持续时间,以分钟为单位 + else: + logging.warning(f"文件名格式不正确: {txt_file}") + continue + + txt_path = os.path.join(date_path, txt_file) + + try: + with open(txt_path, 'r') as file: + lines = file.readlines() + except Exception as e: + logging.warning(f"无法读取文件 {txt_path}: {e}") + continue + + # 预处理:移除空行和仅包含空白字符的行 + lines = [line.strip() for line in lines if line.strip()] + + # 初始化状态 + state = 'expect_time' + current_data = {} + first_error_logged = False # 每个文件单独跟踪 + last_amp_phase = None # 用于检测重复的幅值和相位行 + + idx = 0 + while idx < len(lines): + line = lines[idx] + if state == 'expect_time': + if is_time_line(line): + # 解析时间 + time_str = line + time_parts = time_str.split(':') + if len(time_parts) != 3: + error_msg = f"文件 {txt_path} 中第 {idx+1} 行时间部分分割错误: '{time_str}',跳过该数据单元。" + logging.warning(error_msg) + if not first_error_logged: + print(error_msg) + first_error_logged = True + idx += 1 + continue + try: + hours = int(time_parts[0]) + minutes = int(time_parts[1]) + seconds = int(time_parts[2]) + time_sec = hours * 3600 + minutes * 60 + seconds + time_min = time_sec / 60 # 转换为分钟 + # 计算总时间 + total_time_min = cumulative_time + time_min + total_time_sec = total_time_min * 60 + current_data['time_sec'] = total_time_sec + current_data['time_min'] = total_time_min + state = 'expect_freq' + idx += 1 + except ValueError as ve: + error_msg = f"文件 {txt_path} 中第 {idx+1} 行时间解析错误: '{time_str}',跳过该数据单元。" + logging.warning(error_msg) + if not first_error_logged: + print(error_msg) + first_error_logged = True + idx += 1 + continue + elif is_amp_phase_line(line): + # 检测到意外的幅值相位行,保留第一条,跳过后续重复的行 + if last_amp_phase == line: + # 重复行,跳过 + error_msg = f"文件 {txt_path} 中第 {idx+1} 行检测到重复的幅值和相位行: '{line}',跳过该数据单元。" + logging.warning(error_msg) + if not first_error_logged: + print(error_msg) + first_error_logged = True + idx += 1 + continue + else: + # 保留第一条幅值相位行,但由于缺少时间和频率信息,无法处理,跳过 + last_amp_phase = line + error_msg = f"文件 {txt_path} 中第 {idx+1} 行幅值和相位行缺少时间和频率信息,跳过该数据单元。" + logging.warning(error_msg) + if not first_error_logged: + print(error_msg) + first_error_logged = True + idx += 1 + continue + else: + # 其他格式错误的行,跳过 + error_msg = f"文件 {txt_path} 中第 {idx+1} 行时间格式不正确: '{line}',跳过该数据单元。" + logging.warning(error_msg) + if not first_error_logged: + print(error_msg) + first_error_logged = True + idx += 1 + continue + elif state == 'expect_freq': + if is_frequency_line(line): + # 解析频率 + freq_str = line.lower().replace('hz', '').strip() + try: + freq = float(freq_str) + current_data['frequency_hz'] = freq + state = 'expect_amp_phase' + idx += 1 + except ValueError as ve: + error_msg = f"文件 {txt_path} 中第 {idx+1} 行频率解析错误: '{line}',跳过该数据单元。" + logging.warning(error_msg) + if not first_error_logged: + print(error_msg) + first_error_logged = True + # 尝试重新同步 + state = 'expect_time' + current_data = {} + idx += 1 + continue + elif is_amp_phase_line(line): + # 检测到意外的幅值相位行,跳过并尝试重新同步 + if last_amp_phase == line: + error_msg = f"文件 {txt_path} 中第 {idx+1} 行检测到重复的幅值和相位行: '{line}',跳过该数据单元。" + logging.warning(error_msg) + if not first_error_logged: + print(error_msg) + first_error_logged = True + idx += 1 + continue + else: + last_amp_phase = line + error_msg = f"文件 {txt_path} 中第 {idx+1} 行幅值和相位行缺少频率信息,跳过该数据单元。" + logging.warning(error_msg) + if not first_error_logged: + print(error_msg) + first_error_logged = True + idx += 1 + continue + else: + # 其他格式错误的行,跳过并尝试重新同步 + error_msg = f"文件 {txt_path} 中第 {idx+1} 行频率格式不正确: '{line}',跳过该数据单元。" + logging.warning(error_msg) + if not first_error_logged: + print(error_msg) + first_error_logged = True + # 尝试重新同步 + state = 'expect_time' + current_data = {} + idx += 1 + continue + elif state == 'expect_amp_phase': + if is_amp_phase_line(line): + # 检查是否为重复幅值相位行 + if last_amp_phase == line: + # 重复行,跳过 + error_msg = f"文件 {txt_path} 中第 {idx+1} 行检测到重复的幅值和相位行: '{line}',跳过该数据单元。" + logging.warning(error_msg) + if not first_error_logged: + print(error_msg) + first_error_logged = True + idx += 1 + continue + else: + # 解析幅值和相位 + amp_phase = line.split() + try: + amp, phase = map(float, amp_phase) + current_data['amplitude'] = amp + current_data['phase'] = phase + + # 分配标签 + label = assign_label(current_data['time_min']) + + # 添加到列表 + data_list.append({ + 'date': date_folder, + 'segment': segment_num, + 'time_sec': current_data.get('time_sec', 0), + 'time_min': current_data.get('time_min', 0), + 'frequency_hz': current_data.get('frequency_hz', 0), + 'amplitude': current_data.get('amplitude', 0), + 'phase': current_data.get('phase', 0), + 'label': label + }) + + # 更新最后一个幅值相位行 + last_amp_phase = line + + # 重置状态 + state = 'expect_time' + current_data = {} + idx += 1 + except ValueError as ve: + error_msg = f"文件 {txt_path} 中第 {idx+1} 行幅值和相位解析错误: '{line}',跳过该数据单元。" + logging.warning(error_msg) + if not first_error_logged: + print(error_msg) + first_error_logged = True + # 尝试重新同步 + state = 'expect_time' + current_data = {} + idx += 1 + continue + elif is_time_line(line): + # 意外遇到时间行,可能缺少幅值相位行,跳过并重新同步 + error_msg = f"文件 {txt_path} 中第 {idx+1} 行意外的时间行: '{line}',跳过该数据单元。" + logging.warning(error_msg) + if not first_error_logged: + print(error_msg) + first_error_logged = True + # 重置状态并处理该行作为新的时间行 + state = 'expect_time' + current_data = {} + # 不增加idx,这样可以重新处理当前行 + continue + else: + # 其他格式错误的行,跳过并尝试重新同步 + error_msg = f"文件 {txt_path} 中第 {idx+1} 行幅值和相位格式不正确: '{line}',跳过该数据单元。" + logging.warning(error_msg) + if not first_error_logged: + print(error_msg) + first_error_logged = True + # 尝试重新同步 + state = 'expect_time' + current_data = {} + idx += 1 + continue + + # 累加持续时间 + cumulative_time += duration_min + +# 将列表转换为DataFrame +all_data = pd.DataFrame(data_list) + +# 检查DataFrame是否为空 +if all_data.empty: + print("数据集为空,请检查数据源。") +else: + # 将标签转换为数字 + label_mapping = {'low': 0, 'medium': 1, 'high': 2} + all_data['label'] = all_data['label'].map(label_mapping) + + # 保存为CSV + output_csv = '../../data/GlucoseBIT/glucose_data.csv' + all_data.to_csv(output_csv, index=False) + print(f"数据已成功保存到 {output_csv}") + print(f"总解析的数据单元数: {len(all_data)}") diff --git a/extracode/train_demo/GlucoseBITtrain.py b/extracode/train_demo/GlucoseBITtrain.py new file mode 100644 index 0000000..b4ff3b0 --- /dev/null +++ b/extracode/train_demo/GlucoseBITtrain.py @@ -0,0 +1,255 @@ +# 导入必要的库 +import pandas as pd +import numpy as np +from sklearn.model_selection import train_test_split +from sklearn.preprocessing import StandardScaler +import torch +import torch.nn as nn +from torch.utils.data import Dataset, DataLoader +import logging + +# 配置日志(可选) +logging.basicConfig( + filename='GlucoseBITProcess.log', + filemode='w', + level=logging.WARNING, + format='%(asctime)s - %(levelname)s - %(message)s' +) + +# 设置随机种子,确保结果可重复 +def set_seed(seed=42): + np.random.seed(seed) + torch.manual_seed(seed) + if torch.cuda.is_available(): + torch.cuda.manual_seed(seed) + +set_seed(42) + +# 检查设备 +device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') +print(f"使用设备: {device}") + +# 加载数据 +csv_path = '../../data/GlucoseBIT/glucose_data.csv' # 请确保CSV文件路径正确 +df = pd.read_csv(csv_path) + +# 查看数据基本信息 +print("数据集概览:") +print(df.head()) +print(df['label'].value_counts()) + +# 按5分钟划分数据 +def group_by_five_minutes(df): + """ + 将数据按照每5分钟进行分组,并计算每组的特征均值。 + """ + # 确保数据按时间排序 + df = df.sort_values(['date', 'time_min']).reset_index(drop=True) + + # 创建一个新的列 'minute_group' 表示每5分钟的组 + df['minute_group'] = (df['time_min'] // 5).astype(int) + + # 按照 'date' 和 'minute_group' 进行分组,并计算每组的特征均值 + grouped = df.groupby(['date', 'minute_group']).agg({ + 'frequency_hz': 'mean', + 'amplitude': 'mean', + 'phase': 'mean' + }).reset_index() + + # 计算累计分钟数 + # 假设每个 'minute_group' 代表一个连续的5分钟段 + grouped = grouped.sort_values(['date', 'minute_group']).reset_index(drop=True) + grouped['cumulative_time_min'] = grouped.groupby('date')['minute_group'].transform(lambda x: x * 5) + + # 分配标签 + def assign_label(total_minutes): + if (0 <= total_minutes < 10) or (total_minutes >= 65): + return 0 # low + elif (10 <= total_minutes < 25) or (45 <= total_minutes < 65): + return 1 # medium + elif 25 <= total_minutes < 45: + return 2 # high + else: + return 0 # 默认归为低 + + grouped['label'] = grouped['cumulative_time_min'].apply(assign_label) + + return grouped + +# 按5分钟划分 +df_grouped = group_by_five_minutes(df) + +print("按5分钟划分后的数据集概览:") +print(df_grouped.head()) +print(df_grouped['label'].value_counts()) + +# 特征和标签 +feature_columns = ['frequency_hz', 'amplitude', 'phase'] +X = df_grouped[feature_columns].values +y = df_grouped['label'].values + +# 特征标准化 +scaler = StandardScaler() +X = scaler.fit_transform(X) + +# 数据集划分 +X_train, X_temp, y_train, y_temp = train_test_split( + X, y, test_size=0.3, random_state=42, stratify=y +) +X_val, X_test, y_val, y_test = train_test_split( + X_temp, y_temp, test_size=0.5, random_state=42, stratify=y_temp +) + +print(f"训练集大小: {X_train.shape[0]}") +print(f"验证集大小: {X_val.shape[0]}") +print(f"测试集大小: {X_test.shape[0]}") + +# 定义自定义 Dataset 类 +class GlucoseDataset(Dataset): + def __init__(self, features, labels): + self.X = torch.tensor(features, dtype=torch.float32) + self.y = torch.tensor(labels, dtype=torch.long) + + def __len__(self): + return len(self.y) + + def __getitem__(self, idx): + return self.X[idx], self.y[idx] + +# 创建 Dataset 实例 +train_dataset = GlucoseDataset(X_train, y_train) +val_dataset = GlucoseDataset(X_val, y_val) +test_dataset = GlucoseDataset(X_test, y_test) + +# 创建 DataLoader +batch_size = 32 + +train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True) +val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False) +test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False) + +# 定义神经网络模型 +class GlucoseClassifier(nn.Module): + def __init__(self, input_size=3, hidden_size=64, num_classes=3): + super(GlucoseClassifier, self).__init__() + self.fc1 = nn.Linear(input_size, hidden_size) + self.relu = nn.ReLU() + self.dropout = nn.Dropout(0.3) + self.fc2 = nn.Linear(hidden_size, num_classes) + + def forward(self, x): + out = self.fc1(x) + out = self.relu(out) + out = self.dropout(out) + out = self.fc2(out) + return out + +# 初始化模型、损失函数和优化器 +model = GlucoseClassifier().to(device) +criterion = nn.CrossEntropyLoss() +optimizer = torch.optim.Adam(model.parameters(), lr=0.001) + +# 定义训练函数 +def train_epoch(model, device, dataloader, criterion, optimizer): + model.train() + running_loss = 0.0 + correct = 0 + total = 0 + + for inputs, targets in dataloader: + inputs, targets = inputs.to(device), targets.to(device) + print(inputs.shape) + + # 前向传播 + outputs = model(inputs) + loss = criterion(outputs, targets) + + # 反向传播和优化 + optimizer.zero_grad() + loss.backward() + optimizer.step() + + # 统计 + running_loss += loss.item() * inputs.size(0) + _, predicted = torch.max(outputs.data, 1) + total += targets.size(0) + correct += (predicted == targets).sum().item() + + epoch_loss = running_loss / total + epoch_acc = correct / total + return epoch_loss, epoch_acc + +# 定义验证函数 +def validate_epoch(model, device, dataloader, criterion): + model.eval() + running_loss = 0.0 + correct = 0 + total = 0 + + with torch.no_grad(): + for inputs, targets in dataloader: + inputs, targets = inputs.to(device), targets.to(device) + + # 前向传播 + outputs = model(inputs) + loss = criterion(outputs, targets) + + # 统计 + running_loss += loss.item() * inputs.size(0) + _, predicted = torch.max(outputs.data, 1) + total += targets.size(0) + correct += (predicted == targets).sum().item() + + epoch_loss = running_loss / total + epoch_acc = correct / total + return epoch_loss, epoch_acc + +# 训练和验证过程 +num_epochs = 10000000 # 您可以根据需要调整轮数 +best_val_acc = 0.0 +best_model_path = 'best_glucose_model.pth' + +for epoch in range(1, num_epochs + 1): + train_loss, train_acc = train_epoch(model, device, train_loader, criterion, optimizer) + val_loss, val_acc = validate_epoch(model, device, val_loader, criterion) + + # 保存最佳模型 + if val_acc > best_val_acc: + best_val_acc = val_acc + torch.save(model.state_dict(), best_model_path) + + # 每100轮打印一次 + if epoch % 100 == 0 or epoch == 1: + print(f"Epoch [{epoch}/{num_epochs}]") + print(f"训练 Loss: {train_loss:.4f}, 准确率: {train_acc:.4f}") + print(f"验证 Loss: {val_loss:.4f}, 准确率: {val_acc:.4f}") + print("-" * 30) + +print(f"最佳验证准确率: {best_val_acc:.4f}") + +# 加载最佳模型 +model.load_state_dict(torch.load(best_model_path)) + +# 定义测试函数 +def test_model(model, device, dataloader): + model.eval() + correct = 0 + total = 0 + + with torch.no_grad(): + for inputs, targets in dataloader: + inputs, targets = inputs.to(device), targets.to(device) + + # 前向传播 + outputs = model(inputs) + _, predicted = torch.max(outputs.data, 1) + + total += targets.size(0) + correct += (predicted == targets).sum().item() + + test_acc = correct / total + return test_acc + +# 测试模型 +test_accuracy = test_model(model, device, test_loader) +print(f"测试集准确率: {test_accuracy:.4f}") diff --git a/extracode/train_demo/GlucoseBITtrain5min.py b/extracode/train_demo/GlucoseBITtrain5min.py new file mode 100644 index 0000000..445428f --- /dev/null +++ b/extracode/train_demo/GlucoseBITtrain5min.py @@ -0,0 +1,331 @@ +# 导入必要的库 +import pandas as pd +import numpy as np +from sklearn.model_selection import train_test_split +from sklearn.preprocessing import StandardScaler +import torch +import torch.nn as nn +from torch.utils.data import Dataset, DataLoader +import logging +import os + +# 配置日志(可选) +logging.basicConfig( + filename='GlucoseBITProcess.log', + filemode='w', + level=logging.WARNING, + format='%(asctime)s - %(levelname)s - %(message)s' +) + +# 设置随机种子,确保结果可重复 +def set_seed(seed=42): + np.random.seed(seed) + torch.manual_seed(seed) + if torch.cuda.is_available(): + torch.cuda.manual_seed(seed) + +set_seed(42) + +# 检查设备 +device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') +print(f"使用设备: {device}") + +# 加载数据 +csv_path = '../../data/GlucoseBIT/glucose_data.csv' # 请确保CSV文件路径正确 +df = pd.read_csv(csv_path) + +# 查看数据基本信息 +print("数据集概览:") +print(df.head()) +print(df['label'].value_counts()) + +# 定义标签函数 +def assign_label(total_minutes): + if (0 <= total_minutes < 10) or (total_minutes >= 65): + return 0 # low + elif (10 <= total_minutes < 25) or (45 <= total_minutes < 65): + return 1 # medium + elif 25 <= total_minutes < 45: + return 2 # high + else: + return 0 # 默认归为低 + +# 为每个数据点分配标签 +df['data_label'] = df['time_min'].apply(assign_label) + +# 移除原有的 'label' 列,以避免混淆(如果 'label' 列不再需要) +if 'label' in df.columns: + df = df.drop(columns=['label']) + +# 按滑动窗口划分数据,确保窗口内数据属于同一类别 +def sliding_window_group(df, window_size=5, step_size=1, samples_per_minute=100): + """ + 将数据按照滑动窗口进行分组,并将每组的数据展平成一个向量。 + + 参数: + - df: 原始数据的DataFrame + - window_size: 窗口大小(分钟) + - step_size: 窗口步长(分钟) + - samples_per_minute: 每分钟的采样数 + + 返回: + - grouped_df: 按滑动窗口划分并展平后的DataFrame + """ + # 确保数据按时间排序 + df = df.sort_values(['date', 'segment', 'time_min', 'time_sec']).reset_index(drop=True) + + # 初始化列表来存储处理后的数据 + processed_data = [] + + # 计算每个窗口的样本数 + samples_per_window = window_size * samples_per_minute # 5分钟 * 100采样/分钟 = 500 + feature_length = samples_per_window * 3 # 每个数据点有3个特征 + + # 遍历每个日期和段 + for (date, segment), group in df.groupby(['date', 'segment']): + group = group.reset_index(drop=True) + total_minutes = group['time_min'].max() + # 计算窗口的起始分钟 + window_starts = np.arange(0, total_minutes - window_size + step_size, step_size) + for start_min in window_starts: + end_min = start_min + window_size + # 过滤出窗口内的数据 + window_data = group[(group['time_min'] >= start_min) & (group['time_min'] < end_min)] + actual_samples = window_data.shape[0] + # 检查样本数是否足够 + if actual_samples < samples_per_window: + logging.warning(f"日期 {date} 的段 {segment} 的窗口 [{start_min}, {end_min}) 样本数为 {actual_samples},不足 {samples_per_window}。该窗口将被忽略。") + continue # 忽略样本数不足的窗口 + # 提取前samples_per_window个数据点 + window_data = window_data.iloc[:samples_per_window] + # 提取特征并展平 + features = window_data[['frequency_hz', 'amplitude', 'phase']].values.flatten() + # 分配标签,使用窗口的结束时间 + window_label = assign_label(end_min) + # 检查窗口内所有数据点的标签是否一致 + if not (window_data['data_label'] == window_label).all(): + logging.warning(f"日期 {date} 的段 {segment} 的窗口 [{start_min}, {end_min}) 中存在跨标签数据。该窗口将被忽略。") + continue # 忽略跨标签的窗口 + # 添加到列表 + processed_data.append({ + 'date': date, + 'segment': segment, + 'window_start_min': start_min, + 'window_end_min': end_min, + 'features': features, + 'label': window_label + }) + + # 创建处理后的DataFrame + grouped_df = pd.DataFrame(processed_data) + + return grouped_df + +# 按滑动窗口划分并展平数据 +df_grouped = sliding_window_group(df, window_size=5, step_size=1, samples_per_minute=100) + +# 检查是否有数据被处理 +if df_grouped.empty: + raise ValueError("分组后的数据集为空。请检查数据是否满足每5分钟100个样本的要求,或者调整 `window_size`、`step_size` 和 `samples_per_minute` 参数。") + +print("按滑动窗口划分后的数据集概览:") +print(df_grouped.head()) +print("列名:", df_grouped.columns) +print(df_grouped['label'].value_counts()) +print(f"总分组数: {len(df_grouped)}") + +# 特征和标签 +X = np.vstack(df_grouped['features'].values) # 形状为 [样本数, 1500] +y = df_grouped['label'].values + +print(f"特征形状: {X.shape}") # 应为 [样本数, 1500] +print(f"标签形状: {y.shape}") # 应为 [样本数] + +# 特征标准化 +scaler = StandardScaler() +X = scaler.fit_transform(X) + +# 数据集划分 +if len(X) < 2: + raise ValueError("样本数不足,无法进行训练和测试。请确保有足够的滑动窗口组被保留。") + +# 首先划分训练集和临时集(训练集70%,临时集30%) +X_train, X_temp, y_train, y_temp = train_test_split( + X, y, test_size=0.3, random_state=42, stratify=y +) + +# 打印临时集的标签分布 +print("y_temp 分布:", pd.Series(y_temp).value_counts()) + +# 尝试使用 stratify 进行第二次划分 +try: + X_val, X_test, y_val, y_test = train_test_split( + X_temp, y_temp, test_size=0.5, random_state=42, stratify=y_temp + ) +except ValueError as e: + print("Stratified split failed, trying without stratification.") + X_val, X_test, y_val, y_test = train_test_split( + X_temp, y_temp, test_size=0.5, random_state=42, stratify=None + ) + +print(f"训练集大小: {X_train.shape[0]}") +print(f"验证集大小: {X_val.shape[0]}") +print(f"测试集大小: {X_test.shape[0]}") + +# 定义自定义 Dataset 类 +class GlucoseDataset(Dataset): + def __init__(self, features, labels): + self.X = torch.tensor(features, dtype=torch.float32) + self.y = torch.tensor(labels, dtype=torch.long) + + def __len__(self): + return len(self.y) + + def __getitem__(self, idx): + return self.X[idx], self.y[idx] + +# 创建 Dataset 实例 +train_dataset = GlucoseDataset(X_train, y_train) +val_dataset = GlucoseDataset(X_val, y_val) +test_dataset = GlucoseDataset(X_test, y_test) + +# 创建 DataLoader +batch_size = 32 + +train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True) +val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False) +test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False) + +# 定义神经网络模型 +class GlucoseClassifier(nn.Module): + def __init__(self, input_size=1500, hidden_size=512, num_classes=3): + super(GlucoseClassifier, self).__init__() + self.fc1 = nn.Linear(input_size, hidden_size) + self.relu = nn.ReLU() + self.dropout = nn.Dropout(0.5) + self.fc2 = nn.Linear(hidden_size, num_classes) + + def forward(self, x): + out = self.fc1(x) + out = self.relu(out) + out = self.dropout(out) + out = self.fc2(out) + return out + +# 设置输入尺寸为 1500 +input_size = X.shape[1] # 1500 +model = GlucoseClassifier(input_size=input_size, hidden_size=512, num_classes=3).to(device) + +# 检查是否存在旧的模型文件,避免加载不兼容的 state_dict +best_model_path = 'best_glucose_model.pth' +if os.path.exists(best_model_path): + print(f"检测到旧的模型文件 '{best_model_path}',将其删除以避免加载不兼容的参数。") + os.remove(best_model_path) + +# 初始化损失函数和优化器 +criterion = nn.CrossEntropyLoss() +optimizer = torch.optim.Adam(model.parameters(), lr=0.001) + +# 定义训练函数 +def train_epoch(model, device, dataloader, criterion, optimizer): + model.train() + running_loss = 0.0 + correct = 0 + total = 0 + + for inputs, targets in dataloader: + inputs, targets = inputs.to(device), targets.to(device) + + # 前向传播 + outputs = model(inputs) + loss = criterion(outputs, targets) + + # 反向传播和优化 + optimizer.zero_grad() + loss.backward() + optimizer.step() + + # 统计 + running_loss += loss.item() * inputs.size(0) + _, predicted = torch.max(outputs.data, 1) + total += targets.size(0) + correct += (predicted == targets).sum().item() + + epoch_loss = running_loss / total + epoch_acc = correct / total + return epoch_loss, epoch_acc + +# 定义验证函数 +def validate_epoch(model, device, dataloader, criterion): + model.eval() + running_loss = 0.0 + correct = 0 + total = 0 + + with torch.no_grad(): + for inputs, targets in dataloader: + inputs, targets = inputs.to(device), targets.to(device) + + # 前向传播 + outputs = model(inputs) + loss = criterion(outputs, targets) + + # 统计 + running_loss += loss.item() * inputs.size(0) + _, predicted = torch.max(outputs.data, 1) + total += targets.size(0) + correct += (predicted == targets).sum().item() + + epoch_loss = running_loss / total + epoch_acc = correct / total + return epoch_loss, epoch_acc + +# 训练和验证过程 +num_epochs = 1000 # 您可以根据需要调整轮数 +best_val_acc = 0.0 +best_model_path = 'best_glucose_model.pth' # 确保这是一个新的文件名或已删除旧文件 + +for epoch in range(1, num_epochs + 1): + train_loss, train_acc = train_epoch(model, device, train_loader, criterion, optimizer) + val_loss, val_acc = validate_epoch(model, device, val_loader, criterion) + + # 保存最佳模型 + if val_acc > best_val_acc: + best_val_acc = val_acc + torch.save(model.state_dict(), best_model_path) + + # 每100轮打印一次 + if epoch % 100 == 0 or epoch == 1: + print(f"Epoch [{epoch}/{num_epochs}]") + print(f"训练 Loss: {train_loss:.4f}, 准确率: {train_acc:.4f}") + print(f"验证 Loss: {val_loss:.4f}, 准确率: {val_acc:.4f}") + print("-" * 30) + +print(f"最佳验证准确率: {best_val_acc:.4f}") + +# 加载最佳模型 +model.load_state_dict(torch.load(best_model_path)) + +# 定义测试函数 +def test_model(model, device, dataloader): + model.eval() + correct = 0 + total = 0 + + with torch.no_grad(): + for inputs, targets in dataloader: + inputs, targets = inputs.to(device), targets.to(device) + + # 前向传播 + outputs = model(inputs) + _, predicted = torch.max(outputs.data, 1) + + total += targets.size(0) + correct += (predicted == targets).sum().item() + + test_acc = correct / total + return test_acc + +# 测试模型 +test_accuracy = test_model(model, device, test_loader) +print(f"测试集准确率: {test_accuracy:.4f}") diff --git a/extracode/train_demo/best_glucose_model.pth b/extracode/train_demo/best_glucose_model.pth new file mode 100644 index 0000000..e65b64e Binary files /dev/null and b/extracode/train_demo/best_glucose_model.pth differ