Skip to content

Commit

Permalink
[FSTORE-1141] Mage-AI Tutorial (#226)
Browse files Browse the repository at this point in the history
* Mage Tutorial for fraud use case
  • Loading branch information
Maxxx-zh authored Feb 9, 2024
1 parent a386b1a commit d30e7c8
Show file tree
Hide file tree
Showing 21 changed files with 1,153 additions and 0 deletions.
20 changes: 20 additions & 0 deletions integrations/mage_ai/functions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import numpy as np

# Define a function to compute Haversine distance between consecutive coordinates
def haversine(long, lat):
"""Compute Haversine distance between each consecutive coordinate in (long, lat)."""

# Shift the longitude and latitude columns to get consecutive values
long_shifted = long.shift()
lat_shifted = lat.shift()

# Calculate the differences in longitude and latitude
long_diff = long_shifted - long
lat_diff = lat_shifted - lat

# Haversine formula to compute distance
a = np.sin(lat_diff/2.0)**2
b = np.cos(lat) * np.cos(lat_shifted) * np.sin(long_diff/2.0)**2
c = 2*np.arcsin(np.sqrt(a + b))

return c
14 changes: 14 additions & 0 deletions integrations/mage_ai/mage_tutorial/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
.DS_Store
.file_versions
.gitkeep
.log
.logs/
.mage_temp_profiles
.preferences.yaml
.variables/
__pycache__/
docker-compose.override.yml
logs/
mage-ai.db
mage_data/
secrets/
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import hopsworks
from mage_ai.data_preparation.shared.secrets import get_secret_value
if 'custom' not in globals():
from mage_ai.data_preparation.decorators import custom
if 'test' not in globals():
from mage_ai.data_preparation.decorators import test


@custom
def transform_custom(*args, **kwargs):
"""
args: The output from any upstream parent blocks (if applicable)
Returns:
Anything (e.g. data frame, dictionary, array, int, str, etc.)
"""
TEST_SIZE = 0.2
# Specify the window length as "4h"
window_len = "4h"

# Specify your data exporting logic here
project = hopsworks.login(
api_key_value=get_secret_value('HOPSWORKS_API_KEY'),
)

fs = project.get_feature_store()

trans_fg = fs.get_feature_group(
name="transactions",
version=1,
)

window_aggs_fg = fs.get_feature_group(
name=f"transactions_{window_len}_aggs",
version=1,
)

# Select features for training data.
query = trans_fg.select(["fraud_label", "category", "amount", "age_at_transaction", "days_until_card_expires", "loc_delta"])\
.join(window_aggs_fg.select_except(["cc_num"]))

# Load transformation functions.
label_encoder = fs.get_transformation_function(name="label_encoder")

# Map features to transformations.
transformation_functions = {
"category": label_encoder,
}

# Get or create the 'transactions_view' feature view
feature_view = fs.get_or_create_feature_view(
name='transactions_view',
version=1,
query=query,
labels=["fraud_label"],
transformation_functions=transformation_functions,
)

return print('✅ Done')

@test
def test_output(output, *args) -> None:
"""
Template code for testing the output of the block.
"""
assert output is not None, 'The output is undefined'
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import hopsworks
from mage_ai.data_preparation.shared.secrets import get_secret_value
if 'data_exporter' not in globals():
from mage_ai.data_preparation.decorators import data_exporter


@data_exporter
def inference(data, *args, **kwargs):
"""
Deployment inference.
Args:
data: The output from the upstream parent block
args: The output from any additional upstream blocks (if applicable)
"""
# Specify your data exporting logic here
project = hopsworks.login(
api_key_value=get_secret_value('HOPSWORKS_API_KEY'),
)

# get Hopsworks Model Serving
ms = project.get_model_serving()

# get deployment object
deployment = ms.get_deployment("fraud")

# Start the deployment and wait for it to be running, with a maximum waiting time of 480 seconds
deployment.start(await_running=480)

# Make predictions using the deployed model
predictions = deployment.predict(
inputs=[4700702588013561],
)
print(f'⛳️ Prediction: {predictions}')

deployment.stop()

print('🔮 Deployment is stopped!')
151 changes: 151 additions & 0 deletions integrations/mage_ai/mage_tutorial/data_exporters/model_building.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
import hopsworks
import xgboost as xgb
import pandas as pd
import os
from sklearn.metrics import confusion_matrix
from sklearn.metrics import f1_score
from matplotlib import pyplot
import seaborn as sns
import joblib
from hsml.schema import Schema
from hsml.model_schema import ModelSchema
from mage_ai.data_preparation.shared.secrets import get_secret_value
if 'data_exporter' not in globals():
from mage_ai.data_preparation.decorators import data_exporter


def prepare_training_data(X_train, X_test, y_train, y_test):
# Sort the training features DataFrame 'X_train' based on the 'datetime' column
X_train = X_train.sort_values("datetime")

# Reindex the target variable 'y_train' to match the sorted order of 'X_train' index
y_train = y_train.reindex(X_train.index)

# Sort the test features DataFrame 'X_test' based on the 'datetime' column
X_test = X_test.sort_values("datetime")

# Reindex the target variable 'y_test' to match the sorted order of 'X_test' index
y_test = y_test.reindex(X_test.index)

# Drop the 'datetime' column from the training features DataFrame 'X_train'
X_train.drop(["datetime"], axis=1, inplace=True)

# Drop the 'datetime' column from the test features DataFrame 'X_test'
X_test.drop(["datetime"], axis=1, inplace=True)

return X_train, X_test, y_train, y_test


@data_exporter
def train_model(data, *args, **kwargs):
"""
Train an XGBoost classifier for fraud detection and save it in the Hopsworks Model Registry.
Args:
data: The output from the upstream parent block
args: The output from any additional upstream blocks (if applicable)
"""
TEST_SIZE = 0.2

# Specify your data exporting logic here
project = hopsworks.login(
api_key_value=get_secret_value('HOPSWORKS_API_KEY'),
)

fs = project.get_feature_store()

# Get the 'transactions_view' feature view
feature_view = fs.get_feature_view(
name='transactions_view',
version=1,
)

X_train, X_test, y_train, y_test = feature_view.train_test_split(
description='transactions fraud training dataset',
test_size=TEST_SIZE,
)

X_train, X_test, y_train, y_test = prepare_training_data(
X_train,
X_test,
y_train,
y_test,
)
X_train.to_csv(f'X_train.csv')

# Create an XGBoost classifier
model = xgb.XGBClassifier()

# Fit XGBoost classifier to the training data
model.fit(X_train, y_train)

# Predict the training data using the trained classifier
y_pred_train = model.predict(X_train)

# Predict the test data using the trained classifier
y_pred_test = model.predict(X_test)

# Compute f1 score
metrics = {
"f1_score": f1_score(y_test, y_pred_test, average='macro')
}

# Calculate and print the confusion matrix for the test predictions
results = confusion_matrix(y_test, y_pred_test)
print(results)

# Create a DataFrame for the confusion matrix results
df_cm = pd.DataFrame(
results,
['True Normal', 'True Fraud'],
['Pred Normal', 'Pred Fraud'],
)

# Create a heatmap using seaborn with annotations
cm = sns.heatmap(df_cm, annot=True)

# Get the figure and display it
fig = cm.get_figure()

# Create a Schema for the input features using the values of X_train
input_schema = Schema(X_train.values)

# Create a Schema for the output using y_train
output_schema = Schema(y_train)

# Create a ModelSchema using the defined input and output schemas
model_schema = ModelSchema(
input_schema=input_schema,
output_schema=output_schema,
)

# Convert the model schema to a dictionary for inspection
model_schema.to_dict()

# Specify the directory name for saving the model and related artifacts
model_dir = "quickstart_fraud_model"

# Check if the directory already exists; if not, create it
if not os.path.isdir(model_dir):
os.mkdir(model_dir)

# Save the trained XGBoost classifier to a joblib file in the specified directory
joblib.dump(model, model_dir + '/xgboost_model.pkl')

# Save the confusion matrix heatmap figure to an image file in the specified directory
fig.savefig(model_dir + "/confusion_matrix.png")

# Get the model registry
mr = project.get_model_registry()

# Create a Python model named "fraud" in the model registry
fraud_model = mr.python.create_model(
name="fraud",
metrics=metrics, # Specify the metrics used to evaluate the model
model_schema=model_schema, # Use the previously defined model schema
input_example=[4700702588013561], # Provide an input example for testing deployments
description="Quickstart Fraud Predictor", # Add a description for the model
)

# Save the model to the specified directory
fraud_model.save(model_dir)
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import hopsworks
import os
import time
from mage_ai.data_preparation.shared.secrets import get_secret_value
if 'data_exporter' not in globals():
from mage_ai.data_preparation.decorators import data_exporter


@data_exporter
def deploy_model(data, *args, **kwargs):
"""
Deploys the trained XGBoost classifier.
Args:
data: The output from the upstream parent block
args: The output from any additional upstream blocks (if applicable)
"""
# Specify your data exporting logic here
project = hopsworks.login(
api_key_value=get_secret_value('HOPSWORKS_API_KEY'),
)

fs = project.get_feature_store()
# Get the model registry
mr = project.get_model_registry()

# Get model object
fraud_model = mr.get_model(
name="fraud",
version=1,
)
print('Model is here!')

# Get the dataset API from the project
dataset_api = project.get_dataset_api()

# Specify the file to upload ("predict_example.py") to the "Models" directory, and allow overwriting
uploaded_file_path = dataset_api.upload(
"predictor_script.py",
"Models",
overwrite=True,
)

# Construct the full path to the uploaded predictor script
predictor_script_path = os.path.join(
"/Projects",
project.name,
uploaded_file_path,
)

# Deploy the fraud model
deployment = fraud_model.deploy(
name="fraud", # Specify the deployment name
script_file=predictor_script_path, # Provide the path to the predictor script
)

print("Deployment is warming up...")
time.sleep(45)
Loading

0 comments on commit d30e7c8

Please sign in to comment.