Skip to content

Commit

Permalink
Add scheduling capability (#173)
Browse files Browse the repository at this point in the history
* Basic scheduling.

* Clean up old schedule.

* Update changelog.

* Add schedules section to config.

* Implement specyfing other schedule parameters.

* Delete old scheduler.

* Fix tests.

* Test schedule pipeline.

* Add scheduled run name to the test.

* Test remove old schedule.

* Fix test compile.

* Mock PipelineJobSchedule in schedule test.

* Test cli schedule command.

* Provide defaults for cron expression and timezone.

* Deleted commented out code.

* Put all the changes under a single [Unreleased] section in changelog.
  • Loading branch information
sfczekalski committed Aug 19, 2024
1 parent 0e934f9 commit 5ee3304
Show file tree
Hide file tree
Showing 7 changed files with 252 additions and 128 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
# Changelog

## [Unreleased] 2024-07-23
## [Unreleased] 2024-07-29

- Brought back the Vertex AI Pipelines scheduling capability
- Migrated to kfp 2
- Removed `image_pull_policy` parameter from configuration, as it only applies to Kubernetes backend and not Vertex AI,
and it's only available in `kfp-kubernetes` extension package
Expand Down
17 changes: 17 additions & 0 deletions docs/source/02_installation/02_configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,23 @@ run_config:
# client_id: iam-client-id

dynamic_config_providers: []

# Schedules configuration
schedules:
default_schedule:
cron_expression: "0 * * * *"
timezone: Etc/UTC
# Optional. Timestamp after which the first run can be scheduled. If unspecified, it defaults to the schedule creation timestamp.
start_time: null
# Optional. Timestamp after which no more runs will be scheduled. If unspecified, then runs will be scheduled indefinitely.
end_time: null
# Optional. Whether new scheduled runs can be queued when max_concurrent_runs limit is reached.
allow_queueing: false
# Optional. Maximum run count of the schedule. If specified, The schedule will be completed when either started_run_count >= max_run_count or when end_time is reached. Must be positive and <= 2^63-1.
max_run_count: null
# Optional. Maximum number of runs that can be started concurrently for this PipelineJobSchedule.
max_concurrent_run_count: 1

```

## Dynamic configuration support
Expand Down
82 changes: 77 additions & 5 deletions kedro_vertexai/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from click import ClickException, Context, confirm

from .client import VertexAIPipelinesClient
from .config import PluginConfig, RunConfig
from .config import PluginConfig, RunConfig, ScheduleConfig
from .constants import VERTEXAI_RUN_ID_TAG
from .context_helper import ContextHelper
from .utils import (
Expand Down Expand Up @@ -206,6 +206,43 @@ def compile(ctx, image, pipeline, output) -> None:
help="Cron expression for recurring run",
required=False,
)
@click.option(
"-t",
"--timezone",
type=str,
help="Time zone of the crone expression.",
required=False,
)
@click.option(
"--start-time",
type=str,
help="Timestamp after which the first run can be scheduled.",
required=False,
)
@click.option(
"--end-time",
type=str,
help="Timestamp after which no more runs will be scheduled. ",
required=False,
)
@click.option(
"--allow-queueing",
type=bool,
help="Whether new scheduled runs can be queued when max_concurrent_runs limit is reached.",
required=False,
)
@click.option(
"--max-run-count",
type=int,
help="Maximum run count of the schedule.",
required=False,
)
@click.option(
"--max-concurrent-run-count",
type=int,
help="Maximum number of runs that can be started concurrently.",
required=False,
)
@click.option(
"--param",
"params",
Expand All @@ -218,12 +255,47 @@ def schedule(
ctx,
pipeline: str,
cron_expression: str,
params: list,
timezone: str,
start_time: str = None,
end_time: str = None,
allow_queueing: bool = None,
max_run_count: int = None,
max_concurrent_run_count: int = None,
params: list = [],
):
"""Schedules recurring execution of latest version of the pipeline"""
logger.warning(
"Scheduler functionality was temporarily disabled, "
"follow https://github.com/getindata/kedro-vertexai/issues/4 for updates"
context_helper = ctx.obj["context_helper"]
client: VertexAIPipelinesClient = context_helper.vertexai_client
config: RunConfig = context_helper.config.run_config

schedule_config: ScheduleConfig = config.schedules.get(
pipeline, config.schedules["default_schedule"]
)

schedule_config.cron_expression = (
cron_expression if cron_expression else schedule_config.cron_expression
)
schedule_config.timezone = timezone if timezone else schedule_config.timezone
schedule_config.start_time = (
start_time if start_time else schedule_config.start_time
)
schedule_config.end_time = end_time if end_time else schedule_config.end_time
schedule_config.allow_queueing = (
allow_queueing if allow_queueing else schedule_config.allow_queueing
)
schedule_config.max_run_count = (
max_run_count if max_run_count else schedule_config.max_run_count
)
schedule_config.max_concurrent_run_count = (
max_concurrent_run_count
if max_concurrent_run_count
else schedule_config.max_concurrent_run_count
)

client.schedule(
pipeline=pipeline,
schedule_config=schedule_config,
parameter_values=format_params(params),
)


Expand Down
94 changes: 54 additions & 40 deletions kedro_vertexai/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,17 @@
"""

import datetime as dt
import json
import logging
import os
from tempfile import NamedTemporaryFile
from typing import Any, Dict, Optional

from google.cloud import aiplatform as aip
from google.cloud.aiplatform import PipelineJob
from google.cloud.scheduler_v1.services.cloud_scheduler import (
CloudSchedulerClient,
)
from kfp import compiler
from kfp.compiler import Compiler
from tabulate import tabulate

from .config import PluginConfig
from .config import PluginConfig, ScheduleConfig
from .generator import PipelineGenerator


Expand All @@ -30,7 +27,6 @@ class VertexAIPipelinesClient:
def __init__(self, config: PluginConfig, project_name, context):

aip.init(project=config.project_id, location=config.region)
self.cloud_scheduler_client = CloudSchedulerClient()
self.location = f"projects/{config.project_id}/locations/{config.region}"
self.run_config = config.run_config
self.run_name = self._generate_run_name(config)
Expand Down Expand Up @@ -106,61 +102,79 @@ def compile(
"""
token = os.getenv("MLFLOW_TRACKING_TOKEN", "")
pipeline_func = self.generator.generate_pipeline(pipeline, image, token)
compiler.Compiler().compile(
Compiler().compile(
pipeline_func=pipeline_func,
package_path=output,
)
self.log.info("Generated pipeline definition was saved to %s", str(output))

def _cleanup_old_schedule(self, pipeline_name):
"""
Removes old jobs scheduled for given pipeline name
def _cleanup_old_schedule(self, display_name: str):
"""Cleanup old schedules with a given display name.
Args:
display_name (str): Display name of the schedule.
"""
for job in self.cloud_scheduler_client.list_jobs(parent=self.location):
if "jobs/pipeline_pipeline" not in job.name:
continue

job_pipeline_name = json.loads(job.http_target.body)["pipelineSpec"][
"pipelineInfo"
]["name"]
if job_pipeline_name == pipeline_name:
self.log.info(
"Found existing schedule for the pipeline at %s, deleting...",
job.schedule,
)
self.cloud_scheduler_client.delete_job(name=job.name)
existing_schedules = aip.PipelineJobSchedule.list(
filter=f'display_name="{display_name}"'
)
self.log.info(
f"Found {len(existing_schedules)} existing schedules with display name {display_name}"
)

for schedule in existing_schedules:
schedule.delete()

self.log.info(
f"Cleaned up existing old schedules with display name {display_name}"
)

def schedule(
self,
pipeline,
cron_expression,
parameter_values=None,
image_pull_policy="IfNotPresent",
pipeline: str,
schedule_config: ScheduleConfig,
parameter_values: Optional[Dict[str, Any]] = None,
):
"""
Schedule pipeline to Vertex AI with given cron expression
:param pipeline:
:param cron_expression:
:param parameter_values:
:param image_pull_policy:
:param pipeline: Name of the Kedro pipeline to schedule.
:param schedule_config: Schedule config.
:param parameter_values: Kubeflow pipeline parameter values.
:return:
"""
self._cleanup_old_schedule(self.generator.get_pipeline_name())
self._cleanup_old_schedule(display_name=self.run_config.scheduled_run_name)

with NamedTemporaryFile(
mode="rt", prefix="kedro-vertexai", suffix=".json"
mode="rt", prefix="kedro-vertexai", suffix=".yaml"
) as spec_output:
self.compile(
pipeline,
self.run_config.image,
output=spec_output.name,
)
self.api_client.create_schedule_from_job_spec(
job_spec_path=spec_output.name,
time_zone="Etc/UTC",
schedule=cron_expression,

job = aip.PipelineJob(
display_name=self.run_name,
template_path=spec_output.name,
job_id=self.run_name,
pipeline_root=f"gs://{self.run_config.root}",
enable_caching=False,
parameter_values=parameter_values or {},
enable_caching=False,
)

cron_with_timezone = (
f"TZ={schedule_config.timezone} {schedule_config.cron_expression}"
)

job.create_schedule(
cron=cron_with_timezone,
display_name=self.run_config.scheduled_run_name,
start_time=schedule_config.start_time,
end_time=schedule_config.end_time,
allow_queueing=schedule_config.allow_queueing,
max_run_count=schedule_config.max_run_count,
max_concurrent_run_count=schedule_config.max_concurrent_run_count,
service_account=self.run_config.service_account,
network=self.run_config.network.vpc,
)

self.log.info("Pipeline scheduled to %s", cron_expression)
self.log.info("Pipeline scheduled to %s", cron_with_timezone)
30 changes: 30 additions & 0 deletions kedro_vertexai/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,25 @@
# mlflow:
# request_header_provider_params:
# key: value
# Schedules configuration
schedules:
default_schedule:
cron_expression: "0 * * * *"
timezone: Etc/UTC
start_time: none
end_time: none
allow_queueing: false
max_run_count: none
max_concurrent_run_count: 1
# training_pipeline:
# cron_expression: "0 0 * * *"
# timezone: America/New_York
# start_time: none
# end_time: none
# allow_queueing: false
# max_run_count: none
# max_concurrent_run_count: 1
"""


Expand Down Expand Up @@ -193,6 +212,16 @@ class MLFlowVertexAIConfig(BaseModel):
request_header_provider_params: Optional[Dict[str, str]]


class ScheduleConfig(BaseModel):
cron_expression: Optional[str] = "0 * * * *"
timezone: Optional[str] = "Etc/UTC"
start_time: Optional[str] = None
end_time: Optional[str] = None
allow_queueing: Optional[bool] = False
max_run_count: Optional[int] = None
max_concurrent_run_count: Optional[int] = 1


class RunConfig(BaseModel):
image: str
root: Optional[str]
Expand All @@ -209,6 +238,7 @@ class RunConfig(BaseModel):
node_selectors: Optional[Dict[str, Dict[str, str]]] = {}
dynamic_config_providers: Optional[List[DynamicConfigProviderConfig]] = []
mlflow: Optional[MLFlowVertexAIConfig] = None
schedules: Optional[Dict[str, ScheduleConfig]] = None

def resources_for(self, node: str, tags: Optional[set] = None):
default_config = self.resources["__default__"].dict()
Expand Down
Loading

0 comments on commit 5ee3304

Please sign in to comment.