Skip to content

Commit

Permalink
Merge pull request #10 from Aura-healthcare/orchestration_train
Browse files Browse the repository at this point in the history
Orchestration train
  • Loading branch information
alexisgcomte authored Dec 7, 2024
2 parents 90f4b43 + 3c6c8fd commit a255b83
Show file tree
Hide file tree
Showing 25 changed files with 27,687 additions and 923 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ train:

train_ml:
. $(FOLDER_PATH)/env/bin/activate; \
python3 src/usecase/train_model.py --ml-dataset-path /home/DATA/DetecTeppe-2022-04-08/ML_ready/ML/train/df_ml_train.csv --ml-dataset-path-test /home/DATA/DetecTeppe-2022-04-08/ML_ready/ML/test/df_ml_test.csv
python3 src/usecase/train_model.py --ml-dataset-path /home/DATA/DetecTeppe-2022-04-08/ml_dataset_2022_04_08/train/df_ml_train.csv --ml-dataset-path-test /home/DATA/DetecTeppe-2022-04-08/ml_dataset_2022_04_08/test/df_ml_test.csv


## VISUALIZATION
Expand Down
61 changes: 61 additions & 0 deletions dags/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import os
import sys
from datetime import datetime as dt
from sklearn.ensemble import RandomForestClassifier
import datetime
import xgboost as xgb
import numpy as np

PROJECT_FOLDER = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
DATA_FOLDER = os.path.join(PROJECT_FOLDER, 'data')

ML_DATASET_OUTPUT_FOLDER = "/opt/airflow/output"
AIRFLOW_PREFIX_TO_DATA = '/opt/airflow/data/'
MLRUNS_DIR = '/mlruns'

TRAIN_DATA = os.path.join(AIRFLOW_PREFIX_TO_DATA, "train/df_ml_train.csv")
TEST_DATA = os.path.join(AIRFLOW_PREFIX_TO_DATA , "test/df_ml_test.csv")
FEATURE_TRAIN_PATH= os.path.join(ML_DATASET_OUTPUT_FOLDER, "ml_train.csv")
FEATURE_TEST_PATH= os.path.join(ML_DATASET_OUTPUT_FOLDER, "ml_test.csv")

COL_TO_DROP = ['interval_index', 'interval_start_time', 'set']

START_DATE = dt(2021, 8, 1)
CONCURRENCY = 4
SCHEDULE_INTERVAL = datetime.timedelta(hours=2)
DEFAULT_ARGS = {'owner': 'airflow'}

TRACKING_URI = 'http://mlflow:5000'

MODEL_PARAM = {
'model': xgb.XGBClassifier(),
'grid_parameters': {
'nthread':[4],
'learning_rate': [0.1, 0.01, 0.05],
'max_depth': np.arange(3, 5, 2),
'scale_pos_weight':[1],
'n_estimators': np.arange(15, 25, 2),
'missing':[-999]}
}

MODELS_PARAM = {
'xgboost': {
'model': xgb.XGBClassifier(),
'grid_parameters': {
'nthread':[4],
'learning_rate': [0.1, 0.01, 0.05],
'max_depth': np.arange(3, 5, 2),
'scale_pos_weight':[1],
'n_estimators': np.arange(15, 25, 2),
'missing':[-999]
}
},
'random_forest': {
'model': RandomForestClassifier(),
'grid_parameters': {
'min_samples_leaf': np.arange(1, 5, 1),
'max_depth': np.arange(1, 7, 1),
'max_features': ['auto'],
'n_estimators': np.arange(10, 20, 2)}
}
}
28 changes: 28 additions & 0 deletions dags/predict.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import os
import sys
from datetime import datetime, timedelta, datetime

from airflow.decorators import dag, task
from airflow.utils.dates import days_ago

sys.path.append('.')
from dags.config import (DEFAULT_ARGS, START_DATE, CONCURRENCY, SCHEDULE_INTERVAL)


@dag(default_args=DEFAULT_ARGS,
start_date=START_DATE,
schedule_interval=timedelta(minutes=2),
concurrency=CONCURRENCY)
def predict():
@task
def prepare_features_with_io_task() -> str:
pass

@task
def predict_with_io_task(feature_path: str) -> None:
pass

feature_path = prepare_features_with_io_task()
predict_with_io_task(feature_path)

predict_dag = predict()
69 changes: 69 additions & 0 deletions dags/train.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
from src.usecase.data_processing.prepare_features import prepare_features_with_io
from src.usecase.train_model import (train_pipeline_with_io)
from dags.config import (
DEFAULT_ARGS,
START_DATE,
CONCURRENCY,
SCHEDULE_INTERVAL,
MODELS_PARAM,
MLRUNS_DIR,
TEST_DATA,
TRACKING_URI,
TRAIN_DATA,
FEATURE_TRAIN_PATH,
FEATURE_TEST_PATH,
COL_TO_DROP)
import sys

from airflow.decorators import dag, task

@dag(default_args=DEFAULT_ARGS,
start_date=START_DATE,
schedule_interval=SCHEDULE_INTERVAL,
catchup=False,
concurrency=CONCURRENCY)
def train_pipeline():

@task
def prepare_features_task(
dataset_path: str,
col_to_drop: list,
feature_path: str) -> str:

prepare_features_with_io(
dataset_path=dataset_path,
col_to_drop=col_to_drop,
features_path=feature_path)

return feature_path

@task
def train_model_task(
feature_tain_path: str,
feature_test_path: str,
tracking_uri: str = TRACKING_URI,
model_param: dict = MODELS_PARAM['xgboost'],
mlruns_dir: str = MLRUNS_DIR) -> None:

train_pipeline_with_io(feature_tain_path, feature_test_path,
tracking_uri=tracking_uri, model_param=model_param, mlruns_dir=mlruns_dir)

# Orchestration
features_train_path = FEATURE_TRAIN_PATH
features_test_path = FEATURE_TEST_PATH

ml_train_path = prepare_features_task(
dataset_path=TRAIN_DATA,
col_to_drop=COL_TO_DROP,
feature_path=features_train_path)

ml_test_path = prepare_features_task(
dataset_path=TEST_DATA,
col_to_drop=COL_TO_DROP,
feature_path=features_test_path)

train_model_task(feature_tain_path=ml_train_path, feature_test_path=ml_test_path, tracking_uri=TRACKING_URI,
model_param=MODELS_PARAM['xgboost'], mlruns_dir=MLRUNS_DIR)


train_pipeline_dag = train_pipeline()
1 change: 1 addition & 0 deletions data/ml_dataset_2022_04_08
3 changes: 3 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ x-airflow-common:
- ./output:/opt/airflow/output
- ./models:/opt/airflow/models
- ./src:/opt/airflow/src
- ./mlruns:/mlruns
- ./great_expectations:/opt/airflow/great_expectations
user: "${AIRFLOW_UID}:0"
depends_on:
Expand Down Expand Up @@ -227,6 +228,8 @@ services:
- '${MLFLOW_PORT}:5000'
volumes:
- ./mlruns:/mlruns
environment:
- PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION=python

ge_data_docs:
image: flashspys/nginx-static
Expand Down
2 changes: 1 addition & 1 deletion docker/mlflow/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
FROM python:3.7-slim-buster

RUN pip install mlflow==1.19.0 psycopg2-binary==2.9.1
RUN pip install mlflow==1.28 psycopg2-binary==2.9.3
4 changes: 2 additions & 2 deletions env.sh
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,5 @@ export REDIS_PORT=6379
export FLOWER_PORT=5555

# DATA PATH
export SYMLINK_FOLDER='test_airflow'
export DATA_PATH='/home/DATA/lateppe/DetecTeppe-2022-4-7/PL'
export SYMLINK_FOLDER=''
export DATA_PATH='/home/DATA/DetecTeppe-2022-04-08/ml_dataset_2022_04_08'
Loading

0 comments on commit a255b83

Please sign in to comment.