Skip to content

Commit

Permalink
Refactor handling of local and S3 file paths
Browse files Browse the repository at this point in the history
Co-authored-by: Lucca Baumgärtner <[email protected]>
Signed-off-by: Tims777 <[email protected]>
  • Loading branch information
Tims777 and luccalb committed Feb 6, 2024
1 parent ea58ace commit 68156cc
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 174 deletions.
25 changes: 21 additions & 4 deletions src/database/leads/local_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ class LocalRepository(Repository):
DF_PREPROCESSED_INPUT = os.path.abspath(
os.path.join(BASE_PATH, "../../data/preprocessed_data_files/")
)
DF_PREDICTION_OUTPUT = os.path.abspath(
os.path.join(BASE_PATH, "../../data/leads_predicted_size.csv")
)
REVIEWS = os.path.abspath(os.path.join(BASE_PATH, "../../data/reviews/"))
SNAPSHOTS = os.path.abspath(os.path.join(BASE_PATH, "../../data/snapshots/"))
GPT_RESULTS = os.path.abspath(os.path.join(BASE_PATH, "../../data/gpt-results/"))
Expand All @@ -51,6 +54,13 @@ def save_dataframe(self):
self.df.to_csv(self.DF_OUTPUT, index=False)
log.info(f"Saved enriched data locally to {self.DF_OUTPUT}")

def save_prediction(self, df):
"""
Save dataframe in df parameter in chosen output location
"""
df.to_csv(self.DF_PREDICTION_OUTPUT, index=False)
log.info(f"Saved prediction result locally to {self.DF_PREDICTION_OUTPUT}")

def insert_data(self, data):
"""
TODO: Insert new data into specified dataframe
Expand Down Expand Up @@ -253,10 +263,17 @@ def save_classification_report(self, report, model_name: str):
except Exception as e:
log.error(f"Could not save report at {report_file_path}! Error: {str(e)}")

def load_preprocessed_data(
self, file_name: str = "historical_preprocessed_data.csv"
):
def get_preprocessed_data_path(self, historical: bool = True):
file_name = (
"historical_preprocessed_data.csv"
if historical
else "preprocessed_data.csv"
)
file_path = os.path.join(self.DF_PREPROCESSED_INPUT, file_name)
return file_path

def load_preprocessed_data(self, historical: bool = True):
try:
return pd.read_csv(os.path.join(self.DF_PREPROCESSED_INPUT, file_name))
return pd.read_csv(self.get_preprocessed_data_path(historical))
except FileNotFoundError:
log.error("Error: Could not find input file for preprocessed data.")
16 changes: 15 additions & 1 deletion src/database/leads/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,13 @@ def save_dataframe(self):
"""
pass

@abstractmethod
def save_prediction(self, df):
"""
Save dataframe in df parameter in chosen output location
"""
pass

@abstractmethod
def insert_data(self, data):
"""
Expand Down Expand Up @@ -221,7 +228,14 @@ def save_classification_report(self, report, model_name: str):
pass

@abstractmethod
def load_preprocessed_data(self, file_name: str):
def get_preprocessed_data_path(self, historical: bool = True):
"""
Returns the path for a preprocessed data file (either historical or current)
"""
pass

@abstractmethod
def load_preprocessed_data(self, historical: bool = True):
"""
Load the preprocessed data from the given file
"""
Expand Down
29 changes: 21 additions & 8 deletions src/database/leads/s3_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class S3Repository(Repository):
MODELS_BUCKET = "amos--models"
DF_INPUT = f"s3://{EVENTS_BUCKET}/leads/enriched.csv"
DF_OUTPUT = f"s3://{EVENTS_BUCKET}/leads/enriched.csv"
DF_PREDICTION_OUTPUT = f"s3://{EVENTS_BUCKET}/leads/leads_predicted_size.csv"
DF_PREPROCESSED_INPUT = f"s3://{FEATURES_BUCKET}/preprocessed_data_files/"
REVIEWS = f"s3://{EVENTS_BUCKET}/reviews/"
SNAPSHOTS = f"s3://{EVENTS_BUCKET}/snapshots/"
Expand Down Expand Up @@ -131,6 +132,16 @@ def save_dataframe(self):
self._save_to_s3(csv_buffer.getvalue(), bucket, obj_key)
log.info(f"Successfully saved enriched leads to s3://{bucket}/{obj_key}")

def save_prediction(self, df):
"""
Save dataframe in df parameter in chosen output location
"""
bucket, obj_key = decode_s3_url(self.DF_PREDICTION_OUTPUT)
csv_buffer = StringIO()
df.to_csv(csv_buffer, index=False)
self._save_to_s3(csv_buffer.getvalue(), bucket, obj_key)
log.info(f"Successfully saved prediction result to s3://{bucket}/{obj_key}")

def _save_to_s3(self, data, bucket, key):
s3.put_object(
Bucket=bucket,
Expand Down Expand Up @@ -374,15 +385,17 @@ def save_classification_report(self, report, model_name: str):
except Exception as e:
log.error(f"Could not save report for '{model_name}' to S3: {str(e)}")

def load_preprocessed_data(
self, file_name: str = "historical_preprocessed_data.csv"
):
def get_preprocessed_data_path(self, historical: bool = True):
file_name = (
"historical_preprocessed_data.csv"
if historical
else "preprocessed_data.csv"
)
file_path = self.DF_PREPROCESSED_INPUT + file_name
if not file_path.startswith("s3://"):
log.error(
"S3 location has to be defined like this: s3://<BUCKET>/<OBJECT_KEY>"
)
return
return file_path

def load_preprocessed_data(self, historical: bool = True):
file_path = self.get_preprocessed_data_path(historical)

source = None
remote_dataset = None
Expand Down
141 changes: 30 additions & 111 deletions src/demo/demos.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,10 +224,9 @@ def preprocessing_demo():
historical_bool = True
else:
historical_bool = False
S3_bool = DATABASE_TYPE == "S3"

preprocessor = Preprocessing(
filter_null_data=filter_bool, historical_bool=historical_bool, S3_bool=S3_bool
filter_null_data=filter_bool, historical_bool=historical_bool
)

preprocessor.preprocessed_df = pd.read_csv(preprocessor.data_path)
Expand All @@ -239,10 +238,7 @@ def preprocessing_demo():
def predict_MerchantSize_on_lead_data_demo():
import os
import sys
from io import BytesIO

import boto3
import joblib
import pandas as pd

log.info(
Expand All @@ -254,80 +250,49 @@ def predict_MerchantSize_on_lead_data_demo():
current_dir = os.path.dirname(__file__) if "__file__" in locals() else os.getcwd()
parent_dir = os.path.join(current_dir, "..")
sys.path.append(parent_dir)
from database import get_database
from preprocessing import Preprocessing

db = get_database()

log.info(f"Preprocessing the leads...")
preprocessor = Preprocessing(
filter_null_data=False, historical_bool=False, S3_bool=S3_bool
)
preprocessor = Preprocessing(filter_null_data=False, historical_bool=False)
preprocessor.preprocessed_df = pd.read_csv(preprocessor.data_path)
df = preprocessor.implement_preprocessing_pipeline()
preprocessor.save_preprocessed_data()

############################## adapting the preprocessing files ###########################
log.info(f"Adapting the leads' preprocessed data for the ML model...")
# load the data from the CSV files
historical_preprocessed_data = pd.read_csv(
"s3://amos--data--features/preprocessed_data_files/preprocessed_data.csv"
)
if S3_bool:
toBePredicted_preprocessed_data = pd.read_csv(
"s3://amos--data--events/leads/preprocessed_leads_data.csv"
)
else:
path_components = preprocessor.data_path.split(
"\\" if "\\" in preprocessor.data_path else "/"
)
path_components.pop()
path_components.append("preprocessed_data_files/leads_preprocessed_data.csv")
leads_preprocessed_data_path = "/".join(path_components)
toBePredicted_preprocessed_data = pd.read_csv(leads_preprocessed_data_path)
historical_preprocessed_data = db.load_preprocessed_data(historical=True)
unlabeled_preprocessed_data = db.load_preprocessed_data(historical=False)

historical_columns_order = historical_preprocessed_data.columns

missing_columns = set(historical_columns_order) - set(
toBePredicted_preprocessed_data.columns
unlabeled_preprocessed_data.columns
)
for column in missing_columns:
toBePredicted_preprocessed_data[column] = 0
unlabeled_preprocessed_data[column] = 0

for column in toBePredicted_preprocessed_data.columns:
for column in unlabeled_preprocessed_data.columns:
if column not in historical_columns_order:
toBePredicted_preprocessed_data = toBePredicted_preprocessed_data.drop(
unlabeled_preprocessed_data = unlabeled_preprocessed_data.drop(
column, axis=1
)

# reorder columns
toBePredicted_preprocessed_data = toBePredicted_preprocessed_data[
historical_columns_order
]
if S3_bool:
toBePredicted_output_path_s3 = (
"s3://amos--data--events/leads/toBePredicted_preprocessed_data_updated.csv"
)
toBePredicted_preprocessed_data.to_csv(
toBePredicted_output_path_s3,
index=False,
)
log.info(
f"Saving the adapted preprocessed data at {toBePredicted_output_path_s3}"
)
else:
path_components = preprocessor.data_path.split(
"\\" if "\\" in preprocessor.data_path else "/"
)
path_components.pop()
path_components.append("toBePredicted_preprocessed_data_updated.csv")
local_preprocessed_data_path = "/".join(path_components)
toBePredicted_preprocessed_data.to_csv(
local_preprocessed_data_path, index=False
)
log.info(
f"Saving the adapted preprocessed data at {local_preprocessed_data_path}"
)
unlabeled_preprocessed_data = unlabeled_preprocessed_data[historical_columns_order]
unlabeled_preprocessed_data.to_csv(
preprocessor.preprocessed_data_output_path,
index=False,
)
log.info(
f"Saving the adapted preprocessed data at {preprocessor.preprocessed_data_output_path}"
)

# check if columns in both dataframe are in same order and same number
assert list(toBePredicted_preprocessed_data.columns) == list(
assert list(unlabeled_preprocessed_data.columns) == list(
historical_preprocessed_data.columns
), "Column names are different"

Expand All @@ -343,57 +308,30 @@ def predict_MerchantSize_on_lead_data_demo():
model_name = get_string_input(
"Provide model file name in data/models local directory\nInput example: lightgbm_epochs(1)_f1(0.6375)_numclasses(5)_model.pkl\n"
)
# file_key = "models/lightgbm_epochs(1)_f1(0.6375)_numclasses(5)_model_updated.pkl" # adjust according to the desired model
model_name = model_name.replace(" ", "")
model_name = model_name.strip()
xgb_bool = False
if model_name[:3].lower() == "xgb":
if model_name.lower().startswith("xgb"):
xgb_bool = True

file_key = f"models/" + model_name

def check_classification_task(string):
match = re.search(r"\d+", string)
match = re.search(r"numclasses\((\d+)\)", string)
if match:
last_number = int(match.group())
last_number = int(match.group(1))
if last_number == 3:
return True
else:
False

classification_task_3 = check_classification_task(file_key)
classification_task_3 = check_classification_task(model_name)

try:
if S3_bool:
# create an S3 client
s3 = boto3.client("s3")
# download the file from S3
response = s3.get_object(Bucket=bucket_name, Key=file_key)
model_content = response["Body"].read()
# load model
with BytesIO(model_content) as model_file:
model = joblib.load(model_file)
log.info(f"Loaded the model from S3 bucket sucessfully!")
else:
path_components = preprocessor.data_path.split(
"\\" if "\\" in preprocessor.data_path else "/"
)
path_components.pop()
path_components.append(file_key)
model_local_path = "/".join(path_components)
model = joblib.load(model_local_path)
log.info(f"Loaded the model from the local path sucessfully!")
model = db.load_ml_model(model_name)
log.info(f"Loaded the model {model_name}!")
except:
log.error("No model found with the given name!")
return

if S3_bool:
data_path = (
"s3://amos--data--events/leads/toBePredicted_preprocessed_data_updated.csv"
)
else:
data_path = local_preprocessed_data_path

df = pd.read_csv(data_path)
df = pd.read_csv(preprocessor.preprocessed_data_output_path)
input = df.drop("MerchantSizeByDPV", axis=1)
if xgb_bool:
input = xgb.DMatrix(input)
Expand All @@ -405,29 +343,10 @@ def check_classification_task(string):
size_mapping = {0: "XS", 1: "S", 2: "M", 3: "L", 4: "XL"}
remapped_predictions = [size_mapping[prediction] for prediction in predictions]

if S3_bool:
enriched_data = pd.read_csv("s3://amos--data--events/leads/enriched.csv")
else:
enriched_data = pd.read_csv(preprocessor.data_path)
enriched_data = pd.read_csv(preprocessor.data_path)

# first 5 columns: Last Name,First Name,Company / Account,Phone,Email,
raw_data = enriched_data.iloc[:, :5]
raw_data["PredictedMerchantSize"] = remapped_predictions

if S3_bool:
raw_data.to_csv(
"s3://amos--data--events/leads/predicted_MerchantSize_of_leads.csv",
index=True,
)
log.info(
f"Saved the predicted Merchant Size of the leads at s3://amos--data--events/leads/predicted_MerchantSize_of_leads.csv"
)
else:
path_components = preprocessor.data_path.split(
"\\" if "\\" in preprocessor.data_path else "/"
)
path_components.pop()
path_components.append("predicted_MerchantSize_of_leads.csv")
output_path = "/".join(path_components)
raw_data.to_csv(output_path, index=True)
log.info(f"Saved the predicted Merchant Size of the leads at {output_path}")
db.save_prediction(raw_data)
Loading

0 comments on commit 68156cc

Please sign in to comment.