Skip to content

Commit

Permalink
Implementing support to use VPC Network Peering (#219)
Browse files Browse the repository at this point in the history
* predicting for only the users with traffic in the past 72h - purchase propensity

* running inference only for users events in the past 72h

* including 72h users for all models predictions

* considering null values in TabWorkflow models

* deleting unused pipfile

* upgrading lib versions

* implementing reporting preprocessing as a new pipeline

* adding more code documentation

* adding important information on the main README.md and DEVELOPMENT.md

* adding schedule run name and more code documentation

* implementing a new scheduler using the vertex ai sdk & adding user_id to procedures for consistency

* adding more code documentation

* adding code doc to the python custom component

* adding more code documentation

* fixing aggregated predictions query

* removing unnecessary resources from deployment

* Writing MDS guide

* adding the MDS developer and troubleshooting documentation

* fixing deployment for activation pipelines and gemini dataset

* Update README.md

* Update README.md

* Update README.md

* Update README.md

* removing deprecated api

* fixing purchase propensity pipelines names

* adding extra condition for when there is not enough data for the window interval to be applied on backfill procedures

* adding more instructions for post deployment and fixing issues when GA4 export was configured for less than 10 days

* removing unnecessary comments

* adding the number of past days to process in the variables files

* adding comment about combining data from different ga4 export datasets to data store

* fixing small issues with feature engineering and ml pipelines

* fixing hyper parameter tuning for kmeans modeling

* fixing optuna parameters

* adding cloud shell image

* fixing the list of all possible users in the propensity training preparation tables

* additional guardrails for when there is not enough data

* adding more documentation

* adding more doc to feature store

* add feature store documentation

* adding ml pipelines docs

* adding ml pipelines docs

* adding more documentation

* adding user agent client info

* fixing scope of client info

* fix

* removing client_info from vertex components

* fixing versioning of tf submodules

* reconfiguring meta providers

* fixing issue 187

* chore(deps): upgrade terraform providers and modules version

* chore(deps): set the provider version

* chore: formatting

* fix: brand naming

* fix: typo

* fixing secrets issue

* implementing secrets region as tf variable

* implementing secrets region as tf variable

* last changes requested by lgrangeau

* documenting keys location better

* implementing vpc peering network

---------

Co-authored-by: Carlos Timoteo <[email protected]>
Co-authored-by: Laurent Grangeau <[email protected]>
  • Loading branch information
3 people authored Oct 29, 2024
1 parent b4d0fa7 commit ec07683
Show file tree
Hide file tree
Showing 8 changed files with 288 additions and 18 deletions.
160 changes: 160 additions & 0 deletions config/config.yaml.tftpl

Large diffs are not rendered by default.

30 changes: 29 additions & 1 deletion infrastructure/terraform/modules/pipelines/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ module "project_services" {
"artifactregistry.googleapis.com",
"aiplatform.googleapis.com",
"dataflow.googleapis.com",
"bigqueryconnection.googleapis.com"
"bigqueryconnection.googleapis.com",
"servicenetworking.googleapis.com",
"compute.googleapis.com"
]
}

Expand Down Expand Up @@ -160,3 +162,29 @@ resource "null_resource" "check_artifactregistry_api" {
module.project_services
]
}

# This resource executes gcloud commands to check whether the Service Networking API is enabled.
# Since enabling APIs can take a few seconds, we need to make the deployment wait until the API is enabled before resuming.
resource "null_resource" "check_servicenetworking_api" {
provisioner "local-exec" {
command = <<-EOT
COUNTER=0
MAX_TRIES=100
while ! gcloud services list --project=${module.project_services.project_id} | grep -i "servicenetworking.googleapis.com" && [ $COUNTER -lt $MAX_TRIES ]
do
sleep 6
printf "."
COUNTER=$((COUNTER + 1))
done
if [ $COUNTER -eq $MAX_TRIES ]; then
echo "service networking api is not enabled, terraform can not continue!"
exit 1
fi
sleep 20
EOT
}

depends_on = [
module.project_services
]
}
3 changes: 2 additions & 1 deletion infrastructure/terraform/modules/pipelines/pipelines.tf
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ resource "google_project_iam_member" "pipelines_sa_roles" {
"roles/artifactregistry.reader",
"roles/pubsub.publisher",
"roles/dataflow.developer",
"roles/bigquery.connectionUser"
"roles/bigquery.connectionUser",
"roles/compute.networkUser"
])
role = each.key
}
Expand Down
13 changes: 9 additions & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ packages = [{include = "python"}]

[tool.poetry.dependencies]
python = ">=3.8,<3.11"
google-cloud-aiplatform = "1.52.0"
#google-cloud-aiplatform = "1.52.0"
google-cloud-aiplatform = "1.70.0"
shapely = "<2.0.0"
google-cloud = "^0.34.0"
jinja2 = ">=3.0.1,<4.0.0"
Expand All @@ -37,11 +38,13 @@ google-cloud-bigquery = "2.30.0"
google-cloud-pipeline-components = "2.6.0"
google-auth = "^2.14.1"
google-cloud-storage = "^2.6.0"
kfp = "2.4.0"
## Fixing this error: https://stackoverflow.com/questions/76175487/sudden-importerror-cannot-import-name-appengine-from-requests-packages-urlli
kfp = "2.0.0-rc.2"
#kfp = "2.0.0-rc.2"
#kfp = {version = "2.0.0-b12", allow-prereleases = true}
#kfp = {version = "2.0.0-b16", allow-prereleases = true}
kfp-server-api = "2.0.0-rc.1"
kfp-server-api = "2.0.5"
#kfp-server-api = "2.0.0-rc.1"
#kfp-server-api = "2.0.0.a6"
#kfp-server-api = "2.0.0b1"
urllib3 = "1.26.18"
Expand All @@ -62,9 +65,11 @@ pyarrow = "15.0.2"
google-auth-oauthlib = "^1.2.1"
oauth2client = "^4.1.3"
google-cloud-core = "^2.4.1"
sympy="1.13.1"
google-cloud-resource-manager="1.13.0"

[tool.poetry.group.component_vertex.dependencies]
google-cloud-aiplatform = "1.52.0"
google-cloud-aiplatform = "1.70.0"
shapely = "<2.0.0"
toml = "0.10.2"

Expand Down
14 changes: 10 additions & 4 deletions python/base_component_image/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,29 @@
name = "ma-components"
version = "1.0.0"
description = "contains components used in marketing analytics project. the need is to package the components and containerise so that they can be used from the python function based component"
authors = ["Christos Aniftos <[email protected]>"]
authors = ["Marketing Analytics Solutions Architects <[email protected]>"]
license = "Apache 2.0"
readme = "README.md"
packages = [{include = "ma_components"}]

[tool.poetry.dependencies]
python = ">=3.8,<3.11"
pip = "23.3"
kfp = "2.4.0"
## Fixing this error: https://stackoverflow.com/questions/76175487/sudden-importerror-cannot-import-name-appengine-from-requests-packages-urlli
kfp = "2.0.0-rc.2"
#kfp = "2.0.0-rc.2"
#kfp = {version = "2.0.0-b12", allow-prereleases = true}
#kfp = {version = "2.0.0-b16", allow-prereleases = true}
kfp-server-api = "2.0.0-rc.1"
kfp-server-api = "2.0.5"
#kfp-server-api = "2.0.0-rc.1"
#kfp-server-api = "2.0.0.a6"
#kfp-server-api = "2.0.0b1"
urllib3 = "1.26.18"
toml = "^0.10.2"
docker = "^6.0.1"
google-cloud-bigquery = "2.30.0"
google-cloud-aiplatform = "1.52.0"
#google-cloud-aiplatform = "1.52.0"
google-cloud-aiplatform = "1.70.0"
shapely = "<2.0.0"
google-cloud-pubsub = "2.15.0"
#google-cloud-pipeline-components = "1.0.33"
Expand All @@ -35,6 +39,8 @@ pyarrow = "15.0.2"
google-auth-oauthlib = "^1.2.1"
oauth2client = "^4.1.3"
google-cloud-core = "^2.4.1"
sympy="1.13.1"
google-cloud-resource-manager="1.13.0"

[build-system]
requires = ["poetry-core>=1.0.0"]
Expand Down
81 changes: 74 additions & 7 deletions python/pipelines/pipeline_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from tracemalloc import start

import pip
from sympy import preview
from kfp import compiler
from google.cloud.aiplatform.pipeline_jobs import PipelineJob, _set_enable_caching_value
from google.cloud.aiplatform import TabularDataset, Artifact
Expand Down Expand Up @@ -625,6 +626,30 @@ def get_gcp_bearer_token() -> str:
return bearer_token


def _get_project_number(project_id) -> str:
"""
Retrieves the project number from a project id
Returns:
A string containing the project number
Raises:
Exception: If an error occurs while retrieving the resource manager project object.
"""
from google.cloud import resourcemanager_v3

# Create a resource manager client
client = resourcemanager_v3.ProjectsClient()

# Get the project number
project = client.get_project(name=f"projects/{project_id}").name
project_number = project.split('/')[-1]

logging.info(f"Project Number: {project_number}")

return project_number


# Function to schedule the pipeline.
def schedule_pipeline(
project_id: str,
Expand All @@ -637,6 +662,8 @@ def schedule_pipeline(
max_concurrent_run_count: str,
start_time: str,
end_time: str,
subnetwork: str = "default",
use_private_service_access: bool = False,
pipeline_parameters: Dict[str, Any] = None,
pipeline_parameters_substitutions: Optional[Dict[str, Any]] = None,
) -> dict:
Expand All @@ -654,6 +681,8 @@ def schedule_pipeline(
max_concurrent_run_count: The maximum number of concurrent pipeline runs.
start_time: The start time of the schedule.
end_time: The end time of the schedule.
subnetwork: The VPC subnetwork name to be used in VPC peering.
use_private_service_access: A flag to define whether to use the VPC private service access or not.
Returns:
A dictionary containing information about the scheduled pipeline.
Expand All @@ -663,6 +692,9 @@ def schedule_pipeline(
"""

from google.cloud import aiplatform
from google.cloud.aiplatform.preview.pipelinejobschedule import (
pipeline_job_schedules as preview_pipeline_job_schedules,
)

# Substitute pipeline parameters with necessary substitutions
if pipeline_parameters_substitutions != None:
Expand All @@ -680,16 +712,51 @@ def schedule_pipeline(
display_name=f"{pipeline_name}",
)

# Create the schedule with the pipeline job defined
pipeline_job_schedule = pipeline_job.create_schedule(
# https://cloud.google.com/python/docs/reference/aiplatform/latest/google.cloud.aiplatform.PipelineJobSchedule
# Create a schedule for the pipeline job
pipeline_job_schedule = preview_pipeline_job_schedules.PipelineJobSchedule(
display_name=f"{pipeline_name}",
cron=cron,
max_concurrent_run_count=max_concurrent_run_count,
start_time=start_time,
end_time=end_time,
service_account=pipeline_sa,
pipeline_job=pipeline_job,
location=region
)

# Get the project number to use in the network identifier
project_number = _get_project_number(project_id)

# Create the schedule using the pipeline job schedule
# Using the VPC private service access or not, depending on the flag
if use_private_service_access:
pipeline_job_schedule.create(
cron_expression=cron,
max_concurrent_run_count=max_concurrent_run_count,
start_time=start_time,
end_time=end_time,
max_run_count=2,
service_account=pipeline_sa,
network=f"projects/{project_number}/global/networks/{subnetwork}",
create_request_timeout=None,
)
else:
pipeline_job_schedule.create(
cron_expression=cron,
max_concurrent_run_count=max_concurrent_run_count,
start_time=start_time,
end_time=end_time,
max_run_count=2,
service_account=pipeline_sa,
create_request_timeout=None,
)

# Old version - Create the schedule with the pipeline job defined
#pipeline_job_schedule = pipeline_job.create_schedule(
# display_name=f"{pipeline_name}",
# cron=cron,
# max_concurrent_run_count=max_concurrent_run_count,
# start_time=start_time,
# end_time=end_time,
# service_account=pipeline_sa,
#)

logging.info(f"Pipeline scheduled : {pipeline_name}")

return pipeline_job
Expand Down
4 changes: 3 additions & 1 deletion python/pipelines/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,9 @@ def check_extention(file_path: str, type: str = '.yaml'):
cron=my_pipeline_vars['schedule']['cron'],
max_concurrent_run_count=my_pipeline_vars['schedule']['max_concurrent_run_count'],
start_time=my_pipeline_vars['schedule']['start_time'],
end_time=my_pipeline_vars['schedule']['end_time']
end_time=my_pipeline_vars['schedule']['end_time'],
subnetwork=my_pipeline_vars['schedule']['subnetwork'],
use_private_service_access=my_pipeline_vars['schedule']['use_private_service_access'],
)

if my_pipeline_vars['schedule']['state'] == 'PAUSED':
Expand Down
1 change: 1 addition & 0 deletions scripts/common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ declare -a apis_array=("cloudresourcemanager.googleapis.com"
"bigquerydatatransfer.googleapis.com"
"dataform.googleapis.com"
"cloudkms.googleapis.com"
"servicenetworking.googleapis.com"
)

get_project_id() {
Expand Down

0 comments on commit ec07683

Please sign in to comment.