diff --git a/integrations/mage_ai/functions.py b/integrations/mage_ai/functions.py new file mode 100644 index 00000000..63b1411a --- /dev/null +++ b/integrations/mage_ai/functions.py @@ -0,0 +1,20 @@ +import numpy as np + +# Define a function to compute Haversine distance between consecutive coordinates +def haversine(long, lat): + """Compute Haversine distance between each consecutive coordinate in (long, lat).""" + + # Shift the longitude and latitude columns to get consecutive values + long_shifted = long.shift() + lat_shifted = lat.shift() + + # Calculate the differences in longitude and latitude + long_diff = long_shifted - long + lat_diff = lat_shifted - lat + + # Haversine formula to compute distance + a = np.sin(lat_diff/2.0)**2 + b = np.cos(lat) * np.cos(lat_shifted) * np.sin(long_diff/2.0)**2 + c = 2*np.arcsin(np.sqrt(a + b)) + + return c \ No newline at end of file diff --git a/integrations/mage_ai/mage_tutorial/.gitignore b/integrations/mage_ai/mage_tutorial/.gitignore new file mode 100755 index 00000000..8b3e82f6 --- /dev/null +++ b/integrations/mage_ai/mage_tutorial/.gitignore @@ -0,0 +1,14 @@ +.DS_Store +.file_versions +.gitkeep +.log +.logs/ +.mage_temp_profiles +.preferences.yaml +.variables/ +__pycache__/ +docker-compose.override.yml +logs/ +mage-ai.db +mage_data/ +secrets/ diff --git a/integrations/mage_ai/mage_tutorial/.ssh_tunnel/aws_emr.json b/integrations/mage_ai/mage_tutorial/.ssh_tunnel/aws_emr.json new file mode 100644 index 00000000..9e26dfee --- /dev/null +++ b/integrations/mage_ai/mage_tutorial/.ssh_tunnel/aws_emr.json @@ -0,0 +1 @@ +{} \ No newline at end of file diff --git a/integrations/mage_ai/mage_tutorial/custom/transactions_feature_view.py b/integrations/mage_ai/mage_tutorial/custom/transactions_feature_view.py new file mode 100644 index 00000000..9bb1da66 --- /dev/null +++ b/integrations/mage_ai/mage_tutorial/custom/transactions_feature_view.py @@ -0,0 +1,66 @@ +import hopsworks +from mage_ai.data_preparation.shared.secrets import get_secret_value +if 'custom' not in globals(): + from mage_ai.data_preparation.decorators import custom +if 'test' not in globals(): + from mage_ai.data_preparation.decorators import test + + +@custom +def transform_custom(*args, **kwargs): + """ + args: The output from any upstream parent blocks (if applicable) + + Returns: + Anything (e.g. data frame, dictionary, array, int, str, etc.) + """ + TEST_SIZE = 0.2 + # Specify the window length as "4h" + window_len = "4h" + + # Specify your data exporting logic here + project = hopsworks.login( + api_key_value=get_secret_value('HOPSWORKS_API_KEY'), + ) + + fs = project.get_feature_store() + + trans_fg = fs.get_feature_group( + name="transactions", + version=1, + ) + + window_aggs_fg = fs.get_feature_group( + name=f"transactions_{window_len}_aggs", + version=1, + ) + + # Select features for training data. + query = trans_fg.select(["fraud_label", "category", "amount", "age_at_transaction", "days_until_card_expires", "loc_delta"])\ + .join(window_aggs_fg.select_except(["cc_num"])) + + # Load transformation functions. + label_encoder = fs.get_transformation_function(name="label_encoder") + + # Map features to transformations. + transformation_functions = { + "category": label_encoder, + } + + # Get or create the 'transactions_view' feature view + feature_view = fs.get_or_create_feature_view( + name='transactions_view', + version=1, + query=query, + labels=["fraud_label"], + transformation_functions=transformation_functions, + ) + + return print('✅ Done') + +@test +def test_output(output, *args) -> None: + """ + Template code for testing the output of the block. + """ + assert output is not None, 'The output is undefined' diff --git a/integrations/mage_ai/mage_tutorial/data_exporters/deployment_inference.py b/integrations/mage_ai/mage_tutorial/data_exporters/deployment_inference.py new file mode 100644 index 00000000..5bcecbac --- /dev/null +++ b/integrations/mage_ai/mage_tutorial/data_exporters/deployment_inference.py @@ -0,0 +1,38 @@ +import hopsworks +from mage_ai.data_preparation.shared.secrets import get_secret_value +if 'data_exporter' not in globals(): + from mage_ai.data_preparation.decorators import data_exporter + + +@data_exporter +def inference(data, *args, **kwargs): + """ + Deployment inference. + + Args: + data: The output from the upstream parent block + args: The output from any additional upstream blocks (if applicable) + """ + # Specify your data exporting logic here + project = hopsworks.login( + api_key_value=get_secret_value('HOPSWORKS_API_KEY'), + ) + + # get Hopsworks Model Serving + ms = project.get_model_serving() + + # get deployment object + deployment = ms.get_deployment("fraud") + + # Start the deployment and wait for it to be running, with a maximum waiting time of 480 seconds + deployment.start(await_running=480) + + # Make predictions using the deployed model + predictions = deployment.predict( + inputs=[4700702588013561], + ) + print(f'⛳️ Prediction: {predictions}') + + deployment.stop() + + print('🔮 Deployment is stopped!') diff --git a/integrations/mage_ai/mage_tutorial/data_exporters/model_building.py b/integrations/mage_ai/mage_tutorial/data_exporters/model_building.py new file mode 100644 index 00000000..fd0eec0e --- /dev/null +++ b/integrations/mage_ai/mage_tutorial/data_exporters/model_building.py @@ -0,0 +1,151 @@ +import hopsworks +import xgboost as xgb +import pandas as pd +import os +from sklearn.metrics import confusion_matrix +from sklearn.metrics import f1_score +from matplotlib import pyplot +import seaborn as sns +import joblib +from hsml.schema import Schema +from hsml.model_schema import ModelSchema +from mage_ai.data_preparation.shared.secrets import get_secret_value +if 'data_exporter' not in globals(): + from mage_ai.data_preparation.decorators import data_exporter + + +def prepare_training_data(X_train, X_test, y_train, y_test): + # Sort the training features DataFrame 'X_train' based on the 'datetime' column + X_train = X_train.sort_values("datetime") + + # Reindex the target variable 'y_train' to match the sorted order of 'X_train' index + y_train = y_train.reindex(X_train.index) + + # Sort the test features DataFrame 'X_test' based on the 'datetime' column + X_test = X_test.sort_values("datetime") + + # Reindex the target variable 'y_test' to match the sorted order of 'X_test' index + y_test = y_test.reindex(X_test.index) + + # Drop the 'datetime' column from the training features DataFrame 'X_train' + X_train.drop(["datetime"], axis=1, inplace=True) + + # Drop the 'datetime' column from the test features DataFrame 'X_test' + X_test.drop(["datetime"], axis=1, inplace=True) + + return X_train, X_test, y_train, y_test + + +@data_exporter +def train_model(data, *args, **kwargs): + """ + Train an XGBoost classifier for fraud detection and save it in the Hopsworks Model Registry. + + Args: + data: The output from the upstream parent block + args: The output from any additional upstream blocks (if applicable) + """ + TEST_SIZE = 0.2 + + # Specify your data exporting logic here + project = hopsworks.login( + api_key_value=get_secret_value('HOPSWORKS_API_KEY'), + ) + + fs = project.get_feature_store() + + # Get the 'transactions_view' feature view + feature_view = fs.get_feature_view( + name='transactions_view', + version=1, + ) + + X_train, X_test, y_train, y_test = feature_view.train_test_split( + description='transactions fraud training dataset', + test_size=TEST_SIZE, + ) + + X_train, X_test, y_train, y_test = prepare_training_data( + X_train, + X_test, + y_train, + y_test, + ) + X_train.to_csv(f'X_train.csv') + + # Create an XGBoost classifier + model = xgb.XGBClassifier() + + # Fit XGBoost classifier to the training data + model.fit(X_train, y_train) + + # Predict the training data using the trained classifier + y_pred_train = model.predict(X_train) + + # Predict the test data using the trained classifier + y_pred_test = model.predict(X_test) + + # Compute f1 score + metrics = { + "f1_score": f1_score(y_test, y_pred_test, average='macro') + } + + # Calculate and print the confusion matrix for the test predictions + results = confusion_matrix(y_test, y_pred_test) + print(results) + + # Create a DataFrame for the confusion matrix results + df_cm = pd.DataFrame( + results, + ['True Normal', 'True Fraud'], + ['Pred Normal', 'Pred Fraud'], + ) + + # Create a heatmap using seaborn with annotations + cm = sns.heatmap(df_cm, annot=True) + + # Get the figure and display it + fig = cm.get_figure() + + # Create a Schema for the input features using the values of X_train + input_schema = Schema(X_train.values) + + # Create a Schema for the output using y_train + output_schema = Schema(y_train) + + # Create a ModelSchema using the defined input and output schemas + model_schema = ModelSchema( + input_schema=input_schema, + output_schema=output_schema, + ) + + # Convert the model schema to a dictionary for inspection + model_schema.to_dict() + + # Specify the directory name for saving the model and related artifacts + model_dir = "quickstart_fraud_model" + + # Check if the directory already exists; if not, create it + if not os.path.isdir(model_dir): + os.mkdir(model_dir) + + # Save the trained XGBoost classifier to a joblib file in the specified directory + joblib.dump(model, model_dir + '/xgboost_model.pkl') + + # Save the confusion matrix heatmap figure to an image file in the specified directory + fig.savefig(model_dir + "/confusion_matrix.png") + + # Get the model registry + mr = project.get_model_registry() + + # Create a Python model named "fraud" in the model registry + fraud_model = mr.python.create_model( + name="fraud", + metrics=metrics, # Specify the metrics used to evaluate the model + model_schema=model_schema, # Use the previously defined model schema + input_example=[4700702588013561], # Provide an input example for testing deployments + description="Quickstart Fraud Predictor", # Add a description for the model + ) + + # Save the model to the specified directory + fraud_model.save(model_dir) \ No newline at end of file diff --git a/integrations/mage_ai/mage_tutorial/data_exporters/model_deployment.py b/integrations/mage_ai/mage_tutorial/data_exporters/model_deployment.py new file mode 100644 index 00000000..5be15fc1 --- /dev/null +++ b/integrations/mage_ai/mage_tutorial/data_exporters/model_deployment.py @@ -0,0 +1,58 @@ +import hopsworks +import os +import time +from mage_ai.data_preparation.shared.secrets import get_secret_value +if 'data_exporter' not in globals(): + from mage_ai.data_preparation.decorators import data_exporter + + +@data_exporter +def deploy_model(data, *args, **kwargs): + """ + Deploys the trained XGBoost classifier. + + Args: + data: The output from the upstream parent block + args: The output from any additional upstream blocks (if applicable) + """ + # Specify your data exporting logic here + project = hopsworks.login( + api_key_value=get_secret_value('HOPSWORKS_API_KEY'), + ) + + fs = project.get_feature_store() + # Get the model registry + mr = project.get_model_registry() + + # Get model object + fraud_model = mr.get_model( + name="fraud", + version=1, + ) + print('Model is here!') + + # Get the dataset API from the project + dataset_api = project.get_dataset_api() + + # Specify the file to upload ("predict_example.py") to the "Models" directory, and allow overwriting + uploaded_file_path = dataset_api.upload( + "predictor_script.py", + "Models", + overwrite=True, + ) + + # Construct the full path to the uploaded predictor script + predictor_script_path = os.path.join( + "/Projects", + project.name, + uploaded_file_path, + ) + + # Deploy the fraud model + deployment = fraud_model.deploy( + name="fraud", # Specify the deployment name + script_file=predictor_script_path, # Provide the path to the predictor script + ) + + print("Deployment is warming up...") + time.sleep(45) diff --git a/integrations/mage_ai/mage_tutorial/data_exporters/transactions_feature_view.py b/integrations/mage_ai/mage_tutorial/data_exporters/transactions_feature_view.py new file mode 100644 index 00000000..a8c61b6e --- /dev/null +++ b/integrations/mage_ai/mage_tutorial/data_exporters/transactions_feature_view.py @@ -0,0 +1,55 @@ +import hopsworks +from mage_ai.data_preparation.shared.secrets import get_secret_value +if 'data_exporter' not in globals(): + from mage_ai.data_preparation.decorators import data_exporter + + +@data_exporter +def create_feature_view(data, *args, **kwargs): + """ + Selects features for the training dataset and creates the Feature View. + + Args: + data: The output from the upstream parent block + args: The output from any additional upstream blocks (if applicable) + """ + # Specify the window length as "4h" + window_len = "4h" + + # Specify your data exporting logic here + project = hopsworks.login( + api_key_value=get_secret_value('HOPSWORKS_API_KEY'), + ) + + fs = project.get_feature_store() + + trans_fg = fs.get_feature_group( + name="transactions", + version=1, + ) + + window_aggs_fg = fs.get_feature_group( + name=f"transactions_{window_len}_aggs", + version=1, + ) + + # Select features for training data. + query = trans_fg.select(["fraud_label", "category", "amount", "age_at_transaction", "days_until_card_expires", "loc_delta"])\ + .join(window_aggs_fg.select_except(["cc_num"])) + + # Load transformation functions. + label_encoder = fs.get_transformation_function(name="label_encoder") + + # Map features to transformations. + transformation_functions = { + "category": label_encoder, + } + + # Get or create the 'transactions_view' feature view + feature_view = fs.get_or_create_feature_view( + name='transactions_view', + version=1, + query=query, + labels=["fraud_label"], + transformation_functions=transformation_functions, + ) diff --git a/integrations/mage_ai/mage_tutorial/data_exporters/transactions_fg.py b/integrations/mage_ai/mage_tutorial/data_exporters/transactions_fg.py new file mode 100644 index 00000000..63d9486f --- /dev/null +++ b/integrations/mage_ai/mage_tutorial/data_exporters/transactions_fg.py @@ -0,0 +1,52 @@ +import hopsworks +from mage_ai.data_preparation.shared.secrets import get_secret_value +if 'data_exporter' not in globals(): + from mage_ai.data_preparation.decorators import data_exporter + + +@data_exporter +def transaction_fg_creation(data, *args, **kwargs): + """ + Creates the transaction Feature Group. + + Args: + data: The output from the upstream parent block + args: The output from any additional upstream blocks (if applicable) + """ + project = hopsworks.login( + api_key_value=get_secret_value('HOPSWORKS_API_KEY'), + ) + + fs = project.get_feature_store() + + # Get or create the 'transactions' feature group + trans_fg = fs.get_or_create_feature_group( + name="transactions", + version=1, + description="Transaction data", + primary_key=["cc_num"], + event_time="datetime", + online_enabled=True, + ) + # Insert data into feature group + trans_fg.insert(data) + + # Update feature descriptions + feature_descriptions = [ + {"name": "tid", "description": "Transaction id"}, + {"name": "datetime", "description": "Transaction time"}, + {"name": "cc_num", "description": "Number of the credit card performing the transaction"}, + {"name": "category", "description": "Expense category"}, + {"name": "amount", "description": "Dollar amount of the transaction"}, + {"name": "latitude", "description": "Transaction location latitude"}, + {"name": "longitude", "description": "Transaction location longitude"}, + {"name": "city", "description": "City in which the transaction was made"}, + {"name": "country", "description": "Country in which the transaction was made"}, + {"name": "fraud_label", "description": "Whether the transaction was fraudulent or not"}, + {"name": "age_at_transaction", "description": "Age of the card holder when the transaction was made"}, + {"name": "days_until_card_expires", "description": "Card validity days left when the transaction was made"}, + {"name": "loc_delta", "description": "Haversine distance between this transaction location and the previous transaction location from the same card"}, + ] + + for desc in feature_descriptions: + trans_fg.update_feature_description(desc["name"], desc["description"]) diff --git a/integrations/mage_ai/mage_tutorial/data_exporters/window_aggs_fg.py b/integrations/mage_ai/mage_tutorial/data_exporters/window_aggs_fg.py new file mode 100644 index 00000000..caa6ae5a --- /dev/null +++ b/integrations/mage_ai/mage_tutorial/data_exporters/window_aggs_fg.py @@ -0,0 +1,56 @@ +import hopsworks +from mage_ai.data_preparation.shared.secrets import get_secret_value +if 'data_exporter' not in globals(): + from mage_ai.data_preparation.decorators import data_exporter + + +@data_exporter +def window_aggs_fg_creation(data, *args, **kwargs): + """ + Creates the transaction window aggregations Feature Group. + + Args: + data: The output from the upstream parent block + args: The output from any additional upstream blocks (if applicable) + """ + # Specify the window length as "4h" + window_len = "4h" + + # Specify your data exporting logic here + project = hopsworks.login( + api_key_value=get_secret_value('HOPSWORKS_API_KEY'), + ) + + fs = project.get_feature_store() + + # Get or create the 'transactions' feature group with aggregations using specified window len + window_aggs_fg = fs.get_or_create_feature_group( + name=f"transactions_{window_len}_aggs", + version=1, + description=f"Aggregate transaction data over {window_len} windows.", + primary_key=["cc_num"], + event_time="datetime", + online_enabled=True, + ) + + # Insert data into feature group + window_aggs_fg.insert( + data, + write_options={"wait_for_job": True}, + ) + + # Update feature descriptions + feature_descriptions = [ + {"name": "datetime", "description": "Transaction time"}, + {"name": "cc_num", "description": "Number of the credit card performing the transaction"}, + {"name": "loc_delta_mavg", "description": "Moving average of location difference between consecutive transactions from the same card"}, + {"name": "trans_freq", "description": "Moving average of transaction frequency from the same card"}, + {"name": "trans_volume_mavg", "description": "Moving average of transaction volume from the same card"}, + {"name": "trans_volume_mstd", "description": "Moving standard deviation of transaction volume from the same card"}, + ] + + for desc in feature_descriptions: + window_aggs_fg.update_feature_description(desc["name"], desc["description"]) + + + diff --git a/integrations/mage_ai/mage_tutorial/data_loaders/credit_cards.py b/integrations/mage_ai/mage_tutorial/data_loaders/credit_cards.py new file mode 100644 index 00000000..fdabc584 --- /dev/null +++ b/integrations/mage_ai/mage_tutorial/data_loaders/credit_cards.py @@ -0,0 +1,22 @@ +import io +import pandas as pd +import requests +if 'data_loader' not in globals(): + from mage_ai.data_preparation.decorators import data_loader +if 'test' not in globals(): + from mage_ai.data_preparation.decorators import test + + +@data_loader +def load_credit_data(*args, **kwargs): + # Specify the URL for the data + url = "https://repo.hops.works/master/hopsworks-tutorials/data/card_fraud_data/" + # Read the 'credit_cards.csv' file + credit_cards_df = pd.read_csv(url + "credit_cards.csv") + + return credit_cards_df + + +@test +def test_output(output, *args) -> None: + assert output is not None, 'The output is undefined' diff --git a/integrations/mage_ai/mage_tutorial/data_loaders/profiles.py b/integrations/mage_ai/mage_tutorial/data_loaders/profiles.py new file mode 100644 index 00000000..31be68c9 --- /dev/null +++ b/integrations/mage_ai/mage_tutorial/data_loaders/profiles.py @@ -0,0 +1,27 @@ +import io +import pandas as pd +import requests +if 'data_loader' not in globals(): + from mage_ai.data_preparation.decorators import data_loader +if 'test' not in globals(): + from mage_ai.data_preparation.decorators import test + + +@data_loader +def load_profiles_data(*args, **kwargs): + # Specify the URL for the data + url = "https://repo.hops.works/master/hopsworks-tutorials/data/card_fraud_data/" + + # Read the 'profiles.csv' file + # Parse the 'birthdate' column as dates + profiles_df = pd.read_csv( + url + "profiles.csv", + parse_dates=["birthdate"], + ) + + return profiles_df + + +@test +def test_output(output, *args) -> None: + assert output is not None, 'The output is undefined' diff --git a/integrations/mage_ai/mage_tutorial/data_loaders/transactions.py b/integrations/mage_ai/mage_tutorial/data_loaders/transactions.py new file mode 100644 index 00000000..4569aad7 --- /dev/null +++ b/integrations/mage_ai/mage_tutorial/data_loaders/transactions.py @@ -0,0 +1,27 @@ +import io +import pandas as pd +import requests +if 'data_loader' not in globals(): + from mage_ai.data_preparation.decorators import data_loader +if 'test' not in globals(): + from mage_ai.data_preparation.decorators import test + + +@data_loader +def load_transactions_data(*args, **kwargs): + # Specify the URL for the data + url = "https://repo.hops.works/master/hopsworks-tutorials/data/card_fraud_data/" + + # Read the 'transactions.csv' file + # Parse the 'datetime' column as dates + trans_df = pd.read_csv( + url + "transactions.csv", + parse_dates=["datetime"], + ) + + return trans_df + + +@test +def test_output(output, *args) -> None: + assert output is not None, 'The output is undefined' diff --git a/integrations/mage_ai/mage_tutorial/dbt/profiles.yml b/integrations/mage_ai/mage_tutorial/dbt/profiles.yml new file mode 100755 index 00000000..90599f89 --- /dev/null +++ b/integrations/mage_ai/mage_tutorial/dbt/profiles.yml @@ -0,0 +1,9 @@ +# https://docs.getdbt.com/docs/core/connect-data-platform/profiles.yml + +base: + outputs: + + dev: + type: duckdb + + target: dev diff --git a/integrations/mage_ai/mage_tutorial/io_config.yaml b/integrations/mage_ai/mage_tutorial/io_config.yaml new file mode 100755 index 00000000..06f9d3ba --- /dev/null +++ b/integrations/mage_ai/mage_tutorial/io_config.yaml @@ -0,0 +1,112 @@ +version: 0.1.1 +default: + # Default profile created for data IO access. + # Add your credentials for the source you use, and delete the rest. + # AWS + AWS_ACCESS_KEY_ID: "{{ env_var('AWS_ACCESS_KEY_ID') }}" + AWS_SECRET_ACCESS_KEY: "{{ env_var('AWS_SECRET_ACCESS_KEY') }}" + AWS_SESSION_TOKEN: session_token (Used to generate Redshift credentials) + AWS_REGION: region + # Azure + AZURE_CLIENT_ID: "{{ env_var('AZURE_CLIENT_ID') }}" + AZURE_CLIENT_SECRET: "{{ env_var('AZURE_CLIENT_SECRET') }}" + AZURE_STORAGE_ACCOUNT_NAME: "{{ env_var('AZURE_STORAGE_ACCOUNT_NAME') }}" + AZURE_TENANT_ID: "{{ env_var('AZURE_TENANT_ID') }}" + # Clickhouse + CLICKHOUSE_DATABASE: default + CLICKHOUSE_HOST: host.docker.internal + CLICKHOUSE_INTERFACE: http + CLICKHOUSE_PASSWORD: null + CLICKHOUSE_PORT: 8123 + CLICKHOUSE_USERNAME: null + # Druid + DRUID_HOST: hostname + DRUID_PASSWORD: password + DRUID_PATH: /druid/v2/sql/ + DRUID_PORT: 8082 + DRUID_SCHEME: http + DRUID_USER: user + # DuckDB + DUCKDB_DATABASE: database + DUCKDB_SCHEMA: main + # Google + GOOGLE_SERVICE_ACC_KEY: + type: service_account + project_id: project-id + private_key_id: key-id + private_key: "-----BEGIN PRIVATE KEY-----\nyour_private_key\n-----END_PRIVATE_KEY" + client_email: your_service_account_email + auth_uri: "https://accounts.google.com/o/oauth2/auth" + token_uri: "https://accounts.google.com/o/oauth2/token" + auth_provider_x509_cert_url: "https://www.googleapis.com/oauth2/v1/certs" + client_x509_cert_url: "https://www.googleapis.com/robot/v1/metadata/x509/your_service_account_email" + GOOGLE_SERVICE_ACC_KEY_FILEPATH: "/path/to/your/service/account/key.json" + GOOGLE_LOCATION: US # Optional + # MongoDB + # Specify either the connection string or the (host, password, user, port) to connect to MongoDB. + MONGODB_CONNECTION_STRING: "mongodb://{username}:{password}@{host}:{port}/" + MONGODB_HOST: host + MONGODB_PORT: 27017 + MONGODB_USER: user + MONGODB_PASSWORD: password + MONGODB_DATABASE: database + MONGODB_COLLECTION: collection + # MSSQL + MSSQL_DATABASE: database + MSSQL_SCHEMA: schema + MSSQL_DRIVER: "ODBC Driver 18 for SQL Server" + MSSQL_HOST: host + MSSQL_PASSWORD: password + MSSQL_PORT: 1433 + MSSQL_USER: SA + # MySQL + MYSQL_DATABASE: database + MYSQL_HOST: host + MYSQL_PASSWORD: password + MYSQL_PORT: 3306 + MYSQL_USER: root + # PostgresSQL + POSTGRES_CONNECT_TIMEOUT: 10 + POSTGRES_DBNAME: postgres + POSTGRES_SCHEMA: public # Optional + POSTGRES_USER: username + POSTGRES_PASSWORD: password + POSTGRES_HOST: hostname + POSTGRES_PORT: 5432 + # Redshift + REDSHIFT_SCHEMA: public # Optional + REDSHIFT_DBNAME: redshift_db_name + REDSHIFT_HOST: redshift_cluster_id.identifier.region.redshift.amazonaws.com + REDSHIFT_PORT: 5439 + REDSHIFT_TEMP_CRED_USER: temp_username + REDSHIFT_TEMP_CRED_PASSWORD: temp_password + REDSHIFT_DBUSER: redshift_db_user + REDSHIFT_CLUSTER_ID: redshift_cluster_id + REDSHIFT_IAM_PROFILE: default + # Snowflake + SNOWFLAKE_USER: username + SNOWFLAKE_PASSWORD: password + SNOWFLAKE_ACCOUNT: account_id.region + SNOWFLAKE_DEFAULT_WH: null # Optional default warehouse + SNOWFLAKE_DEFAULT_DB: null # Optional default database + SNOWFLAKE_DEFAULT_SCHEMA: null # Optional default schema + SNOWFLAKE_PRIVATE_KEY_PASSPHRASE: null # Optional private key passphrase + SNOWFLAKE_PRIVATE_KEY_PATH: null # Optional private key path + SNOWFLAKE_ROLE: null # Optional role name + SNOWFLAKE_TIMEOUT: null # Optional timeout in seconds + # Trino + trino: + catalog: postgresql # Change this to the catalog of your choice + host: 127.0.0.1 + http_headers: + X-Something: 'mage=power' + http_scheme: http + password: mage1337 # Optional + port: 8080 + schema: core_data + session_properties: # Optional + acc01.optimize_locality_enabled: false + optimize_hash_generation: true + source: trino-cli # Optional + user: admin + verify: /path/to/your/ca.crt # Optional diff --git a/integrations/mage_ai/mage_tutorial/metadata.yaml b/integrations/mage_ai/mage_tutorial/metadata.yaml new file mode 100755 index 00000000..dcf42397 --- /dev/null +++ b/integrations/mage_ai/mage_tutorial/metadata.yaml @@ -0,0 +1,50 @@ +project_type: standalone + +variables_dir: ~/.mage_data +# remote_variables_dir: s3://bucket/path_prefix + +variables_retention_period: '90d' + +emr_config: + # You can customize the EMR cluster instance size with the two parameters + master_instance_type: 'r5.4xlarge' + slave_instance_type: 'r5.4xlarge' + + # Configure security groups for EMR cluster instances. + # The default managed security groups are ElasticMapReduce-master and ElasticMapReduce-slave + # master_security_group: 'sg-xxxxxxxxxxxx' + # slave_security_group: 'sg-yyyyyyyyyyyy' + + # If you want to ssh tunnel into EMR cluster, ec2_key_name must be configured. + # You can create a key pair in page https://console.aws.amazon.com/ec2#KeyPairs and download the key file. + # ec2_key_name: '[ec2_key_pair_name]' + +spark_config: + # Application name + app_name: 'my spark app' + # Master URL to connect to + # e.g., spark_master: 'spark://host:port', or spark_master: 'yarn' + spark_master: 'local' + # Executor environment variables + # e.g., executor_env: {'PYTHONPATH': '/home/path'} + executor_env: {} + # Jar files to be uploaded to the cluster and added to the classpath + # e.g., spark_jars: ['/home/path/example1.jar'] + spark_jars: [] + # Path where Spark is installed on worker nodes, + # e.g. spark_home: '/usr/lib/spark' + spark_home: + # List of key-value pairs to be set in SparkConf + # e.g., others: {'spark.executor.memory': '4g', 'spark.executor.cores': '2'} + others: {} + +notification_config: + alert_on: + - trigger_failure + - trigger_passed_sla + slack_config: + webhook_url: "{{ env_var('MAGE_SLACK_WEBHOOK_URL') }}" + teams_config: + webhook_url: "{{ env_var('MAGE_TEAMS_WEBHOOK_URL') }}" +project_uuid: 68ac84292a2e42d6b6ebf01a52cc2dc9 +help_improve_mage: false diff --git a/integrations/mage_ai/mage_tutorial/pipelines/hopsworks_mage_quickstart/metadata.yaml b/integrations/mage_ai/mage_tutorial/pipelines/hopsworks_mage_quickstart/metadata.yaml new file mode 100755 index 00000000..7f3c34f6 --- /dev/null +++ b/integrations/mage_ai/mage_tutorial/pipelines/hopsworks_mage_quickstart/metadata.yaml @@ -0,0 +1,213 @@ +blocks: +- all_upstream_blocks_executed: true + color: null + configuration: {} + downstream_blocks: + - transactions_fe + executor_config: null + executor_type: local_python + has_callback: false + language: python + name: credit_cards + retry_config: null + status: executed + timeout: null + type: data_loader + upstream_blocks: [] + uuid: credit_cards +- all_upstream_blocks_executed: true + color: null + configuration: {} + downstream_blocks: + - transactions_fe + executor_config: null + executor_type: local_python + has_callback: false + language: python + name: profiles + retry_config: null + status: executed + timeout: null + type: data_loader + upstream_blocks: [] + uuid: profiles +- all_upstream_blocks_executed: true + color: null + configuration: {} + downstream_blocks: + - transactions_fe + executor_config: null + executor_type: local_python + has_callback: false + language: python + name: transactions + retry_config: null + status: executed + timeout: null + type: data_loader + upstream_blocks: [] + uuid: transactions +- all_upstream_blocks_executed: true + color: null + configuration: {} + downstream_blocks: + - window_aggs + - transactions_fg + executor_config: null + executor_type: local_python + has_callback: false + language: python + name: transactions_fe + retry_config: null + status: executed + timeout: null + type: transformer + upstream_blocks: + - transactions + - profiles + - credit_cards + uuid: transactions_fe +- all_upstream_blocks_executed: true + color: null + configuration: {} + downstream_blocks: + - window_aggs_fg + executor_config: null + executor_type: local_python + has_callback: false + language: python + name: window_aggs + retry_config: null + status: executed + timeout: null + type: transformer + upstream_blocks: + - transactions_fe + uuid: window_aggs +- all_upstream_blocks_executed: true + color: null + configuration: {} + downstream_blocks: + - transactions_feature_view + executor_config: null + executor_type: local_python + has_callback: false + language: python + name: transactions_fg + retry_config: null + status: executed + timeout: null + type: data_exporter + upstream_blocks: + - transactions_fe + uuid: transactions_fg +- all_upstream_blocks_executed: true + color: null + configuration: {} + downstream_blocks: + - transactions_feature_view + executor_config: null + executor_type: local_python + has_callback: false + language: python + name: window_aggs_fg + retry_config: null + status: executed + timeout: null + type: data_exporter + upstream_blocks: + - window_aggs + uuid: window_aggs_fg +- all_upstream_blocks_executed: true + color: null + configuration: {} + downstream_blocks: + - model_building + executor_config: null + executor_type: local_python + has_callback: false + language: python + name: transactions_feature_view + retry_config: null + status: executed + timeout: null + type: data_exporter + upstream_blocks: + - window_aggs_fg + - transactions_fg + uuid: transactions_feature_view +- all_upstream_blocks_executed: true + color: null + configuration: {} + downstream_blocks: + - model_deployment + executor_config: null + executor_type: local_python + has_callback: false + language: python + name: model_building + retry_config: null + status: executed + timeout: null + type: data_exporter + upstream_blocks: + - transactions_feature_view + uuid: model_building +- all_upstream_blocks_executed: true + color: null + configuration: {} + downstream_blocks: + - deployment_inference + executor_config: null + executor_type: local_python + has_callback: false + language: python + name: model_deployment + retry_config: null + status: executed + timeout: null + type: data_exporter + upstream_blocks: + - model_building + uuid: model_deployment +- all_upstream_blocks_executed: true + color: null + configuration: {} + downstream_blocks: [] + executor_config: null + executor_type: local_python + has_callback: false + language: python + name: deployment_inference + retry_config: null + status: executed + timeout: null + type: data_exporter + upstream_blocks: + - model_deployment + uuid: deployment_inference +cache_block_output_in_memory: false +callbacks: [] +concurrency_config: {} +conditionals: [] +created_at: '2023-12-28 11:57:39.358016+00:00' +data_integration: null +description: null +executor_config: {} +executor_count: 1 +executor_type: null +extensions: {} +name: QuickStart +notification_config: {} +remote_variables_dir: null +retry_config: {} +run_pipeline_in_one_process: false +settings: + triggers: null +spark_config: {} +tags: [] +type: python +updated_at: '2024-01-03 16:31:35' +uuid: hopsworks_mage_quickstart +variables_dir: /Users/maxzhytnikov/.mage_data/mage_tutorial +widgets: [] diff --git a/integrations/mage_ai/mage_tutorial/transformers/transactions_fe.py b/integrations/mage_ai/mage_tutorial/transformers/transactions_fe.py new file mode 100644 index 00000000..76ae65da --- /dev/null +++ b/integrations/mage_ai/mage_tutorial/transformers/transactions_fe.py @@ -0,0 +1,73 @@ +import numpy as np +import pandas as pd +from math import radians +from functions import haversine +if 'transformer' not in globals(): + from mage_ai.data_preparation.decorators import transformer +if 'test' not in globals(): + from mage_ai.data_preparation.decorators import test + + +@transformer +def transform(trans_df, profiles_df, credit_cards_df, *args, **kwargs): + """ + Feature Engineering for the transaction dataframe. + + Args: + trans_df (pd.DataFrame): The DataFrame containing transaction data. + profiles_df (pd.DataFrame): The DataFrame containing profiles data. + credit_cards_df (pd.DataFrame): The DataFrame containing credit card data. + *args: The output from any additional upstream blocks (if applicable). + **kwargs: Additional keyword arguments. + + Returns: + pd.DataFrame: The transformed DataFrame with additional columns. + """ + # Merge the 'trans_df' DataFrame with the 'profiles_df' DataFrame based on the 'cc_num' column + age_df = trans_df.merge( + profiles_df, + on="cc_num", + how="left", + ) + + # Compute the age at the time of each transaction and store it in the 'age_at_transaction' column + trans_df["age_at_transaction"] = ( + age_df["datetime"] - age_df["birthdate"] + ) / np.timedelta64(1, "Y") + + # Merge the 'trans_df' DataFrame with the 'credit_cards_df' DataFrame based on the 'cc_num' column + card_expiry_df = trans_df.merge( + credit_cards_df, + on="cc_num", + how="left", + ) + + # Convert the 'expires' column to datetime format + card_expiry_df["expires"] = pd.to_datetime( + card_expiry_df["expires"], + format="%m/%y", + ) + + # Compute the days until the card expires and store it in the 'days_until_card_expires' column + trans_df["days_until_card_expires"] = ( + card_expiry_df["expires"] - card_expiry_df["datetime"] + ) / np.timedelta64(1, "D") + + # Sort the 'trans_df' DataFrame based on the 'datetime' column in ascending order + trans_df.sort_values("datetime", inplace=True) + + # Convert the 'longitude' and 'latitude' columns to radians + trans_df[["longitude", "latitude"]] = trans_df[["longitude", "latitude"]].applymap(radians) + + # Apply the haversine function to compute the 'loc_delta' column + trans_df["loc_delta"] = trans_df.groupby("cc_num")\ + .apply(lambda x : haversine(x["longitude"], x["latitude"]))\ + .reset_index(level=0, drop=True)\ + .fillna(0) + + return trans_df + + +@test +def test_output(output, *args) -> None: + assert output is not None, 'The output is undefined' diff --git a/integrations/mage_ai/mage_tutorial/transformers/window_aggs.py b/integrations/mage_ai/mage_tutorial/transformers/window_aggs.py new file mode 100644 index 00000000..007b1566 --- /dev/null +++ b/integrations/mage_ai/mage_tutorial/transformers/window_aggs.py @@ -0,0 +1,75 @@ +import pandas as pd +if 'transformer' not in globals(): + from mage_ai.data_preparation.decorators import transformer +if 'test' not in globals(): + from mage_ai.data_preparation.decorators import test + + +@transformer +def transform(trans_df, *args, **kwargs): + """ + Compute the dataframe with window aggregations. + + Args: + trans_df (pd.DataFrame): The DataFrame containing transaction data. + *args: The output from any additional upstream blocks (if applicable). + **kwargs: Additional keyword arguments. + + Returns: + pd.DataFrame: The transformed DataFrame with aggregated window features. + """ + # Specify the window length as "4h" + window_len = "4h" + + # Define a rolling window groupby on 'cc_num' with a specified window length on the 'datetime' column + cc_group = trans_df[["cc_num", "amount", "datetime"]].groupby("cc_num").rolling( + window_len, + on="datetime", + ) + + # Moving average of transaction volume. + df_4h_mavg = pd.DataFrame(cc_group.mean()) + df_4h_mavg.columns = ["trans_volume_mavg", "datetime"] + df_4h_mavg = df_4h_mavg.reset_index(level=["cc_num"]) + df_4h_mavg = df_4h_mavg.drop(columns=["cc_num", "datetime"]) + df_4h_mavg = df_4h_mavg.sort_index() + + # Moving standard deviation of transaction volume. + df_4h_std = pd.DataFrame(cc_group.mean()) + df_4h_std.columns = ["trans_volume_mstd", "datetime"] + df_4h_std = df_4h_std.reset_index(level=["cc_num"]) + df_4h_std = df_4h_std.drop(columns=["cc_num", "datetime"]) + df_4h_std = df_4h_std.fillna(0) + df_4h_std = df_4h_std.sort_index() + window_aggs_df = df_4h_std.merge(df_4h_mavg, left_index=True, right_index=True) + + # Moving average of transaction frequency. + df_4h_count = pd.DataFrame(cc_group.mean()) + df_4h_count.columns = ["trans_freq", "datetime"] + df_4h_count = df_4h_count.reset_index(level=["cc_num"]) + df_4h_count = df_4h_count.drop(columns=["cc_num", "datetime"]) + df_4h_count = df_4h_count.sort_index() + window_aggs_df = window_aggs_df.merge(df_4h_count, left_index=True, right_index=True) + + # Moving average of location difference between consecutive transactions. + cc_group_loc_delta = trans_df[["cc_num", "loc_delta", "datetime"]].groupby("cc_num").rolling(window_len, on="datetime").mean() + df_4h_loc_delta_mavg = pd.DataFrame(cc_group_loc_delta) + df_4h_loc_delta_mavg.columns = ["loc_delta_mavg", "datetime"] + df_4h_loc_delta_mavg = df_4h_loc_delta_mavg.reset_index(level=["cc_num"]) + df_4h_loc_delta_mavg = df_4h_loc_delta_mavg.drop(columns=["cc_num", "datetime"]) + df_4h_loc_delta_mavg = df_4h_loc_delta_mavg.sort_index() + window_aggs_df = window_aggs_df.merge(df_4h_loc_delta_mavg, left_index=True, right_index=True) + + # Merge 'trans_df' with selected columns for the final result + window_aggs_df = window_aggs_df.merge( + trans_df[["cc_num", "datetime"]].sort_index(), + left_index=True, + right_index=True, + ) + + return window_aggs_df + + +@test +def test_output(output, *args) -> None: + assert output is not None, 'The output is undefined' diff --git a/integrations/mage_ai/predictor_script.py b/integrations/mage_ai/predictor_script.py new file mode 100644 index 00000000..606ae42a --- /dev/null +++ b/integrations/mage_ai/predictor_script.py @@ -0,0 +1,30 @@ +import os +import numpy as np +import hsfs +import joblib + + +class Predict(object): + + def __init__(self): + """ Initializes the serving state, reads a trained model""" + # Get feature store handle + fs_conn = hsfs.connection() + self.fs = fs_conn.get_feature_store() + + # Get feature view + self.fv = self.fs.get_feature_view("transactions_view", 1) + + # Initialize serving + self.fv.init_serving(1) + + # Load the trained model + self.model = joblib.load(os.environ["ARTIFACT_FILES_PATH"] + "/xgboost_model.pkl") + print("Initialization Complete") + + def predict(self, inputs): + """ Serves a prediction request usign a trained model""" + feature_vector = self.fv.get_feature_vector({"cc_num": inputs[0][0]}) + feature_vector = feature_vector[:-1] + + return self.model.predict(np.asarray(feature_vector).reshape(1, -1)).tolist() # Numpy Arrays are not JSON serializable diff --git a/integrations/mage_ai/requirements.txt b/integrations/mage_ai/requirements.txt new file mode 100644 index 00000000..602afd9a --- /dev/null +++ b/integrations/mage_ai/requirements.txt @@ -0,0 +1,4 @@ +hopsworks +xgboost +matplotlib +seaborn \ No newline at end of file