Skip to content

Commit

Permalink
Introduces PipelineConfig class with TTL configuration
Browse files Browse the repository at this point in the history
Signed-off-by: Ricardo M. Oliveira <[email protected]>
  • Loading branch information
rimolive committed Oct 30, 2024
1 parent 6684b6d commit 079d482
Show file tree
Hide file tree
Showing 9 changed files with 76 additions and 9 deletions.
6 changes: 3 additions & 3 deletions api/v2alpha1/go/cachekey/cache_key.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 15 additions & 3 deletions api/v2alpha1/go/pipelinespec/pipeline_spec.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions api/v2alpha1/pipeline_spec.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1079,6 +1079,8 @@ message PipelineStateEnum {
message PlatformSpec {
// Platform key to full platform config
map<string, SinglePlatformSpec> platforms = 1;


}

message SinglePlatformSpec {
Expand All @@ -1095,6 +1097,11 @@ message SinglePlatformSpec {
PipelineConfig pipelineConfig = 4;
}

message PipelineConfig {
// Set TTL at pipeline-level
optional int32 pipelineTtl = 1;
}

message PlatformDeploymentConfig {
// Map of executor label to executor-level config
// Mirrors PipelineSpec.deployment_spec.executors structure
Expand Down
23 changes: 20 additions & 3 deletions backend/src/apiserver/template/v2_template.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"regexp"
"strings"

"github.com/golang/glog"
structpb "github.com/golang/protobuf/ptypes/struct"
"github.com/kubeflow/pipelines/api/v2alpha1/go/pipelinespec"
"github.com/kubeflow/pipelines/backend/src/apiserver/model"
Expand Down Expand Up @@ -77,9 +78,17 @@ func (t *V2Spec) ScheduledWorkflow(modelJob *model.Job) (*scheduledworkflow.Sche
}
}

var pipeline_options argocompiler.Options
if t.platformSpec.PipelineConfig.Ttl != nil {

Check failure on line 82 in backend/src/apiserver/template/v2_template.go

View workflow job for this annotation

GitHub Actions / run-go-unittests

t.platformSpec.PipelineConfig undefined (type *pipelinespec.PlatformSpec has no field or method PipelineConfig)

Check failure on line 82 in backend/src/apiserver/template/v2_template.go

View workflow job for this annotation

GitHub Actions / backend-tests

t.platformSpec.PipelineConfig undefined (type *pipelinespec.PlatformSpec has no field or method PipelineConfig)
glog.Info("Found pipeline config")
pipeline_options = argocompiler.Options{
TtlSeconds: *t.platformSpec.PipelineConfig.Ttl,

Check failure on line 85 in backend/src/apiserver/template/v2_template.go

View workflow job for this annotation

GitHub Actions / run-go-unittests

t.platformSpec.PipelineConfig undefined (type *pipelinespec.PlatformSpec has no field or method PipelineConfig)

Check failure on line 85 in backend/src/apiserver/template/v2_template.go

View workflow job for this annotation

GitHub Actions / backend-tests

t.platformSpec.PipelineConfig undefined (type *pipelinespec.PlatformSpec has no field or method PipelineConfig)
}
}

var obj interface{}
if util.CurrentExecutionType() == util.ArgoWorkflow {
obj, err = argocompiler.Compile(job, kubernetesSpec, nil)
obj, err = argocompiler.Compile(job, kubernetesSpec, &pipeline_options)
} else if util.CurrentExecutionType() == util.TektonPipelineRun {
obj, err = tektoncompiler.Compile(job, kubernetesSpec, &tektoncompiler.Options{LauncherImage: Launcher})
}
Expand Down Expand Up @@ -300,9 +309,17 @@ func (t *V2Spec) RunWorkflow(modelRun *model.Run, options RunWorkflowOptions) (u
}
}

var pipeline_options *argocompiler.Options
if t.platformSpec.PipelineConfig.Ttl != nil {

Check failure on line 313 in backend/src/apiserver/template/v2_template.go

View workflow job for this annotation

GitHub Actions / run-go-unittests

t.platformSpec.PipelineConfig undefined (type *pipelinespec.PlatformSpec has no field or method PipelineConfig)

Check failure on line 313 in backend/src/apiserver/template/v2_template.go

View workflow job for this annotation

GitHub Actions / backend-tests

t.platformSpec.PipelineConfig undefined (type *pipelinespec.PlatformSpec has no field or method PipelineConfig)
glog.Info("Found pipeline config")
pipeline_options = &argocompiler.Options{
TtlSeconds: *t.platformSpec.PipelineConfig.Ttl,

Check failure on line 316 in backend/src/apiserver/template/v2_template.go

View workflow job for this annotation

GitHub Actions / run-go-unittests

t.platformSpec.PipelineConfig undefined (type *pipelinespec.PlatformSpec has no field or method PipelineConfig)

Check failure on line 316 in backend/src/apiserver/template/v2_template.go

View workflow job for this annotation

GitHub Actions / backend-tests

t.platformSpec.PipelineConfig undefined (type *pipelinespec.PlatformSpec has no field or method PipelineConfig)
}
}

var obj interface{}
if util.CurrentExecutionType() == util.ArgoWorkflow {
obj, err = argocompiler.Compile(job, kubernetesSpec, nil)
obj, err = argocompiler.Compile(job, kubernetesSpec, pipeline_options)
} else if util.CurrentExecutionType() == util.TektonPipelineRun {
obj, err = tektoncompiler.Compile(job, kubernetesSpec, nil)
}
Expand Down Expand Up @@ -344,7 +361,7 @@ func IsPlatformSpecWithKubernetesConfig(template []byte) bool {
return false
}
_, ok := platformSpec.Platforms["kubernetes"]
return ok
return ok || platformSpec.PipelineConfig != nil

Check failure on line 364 in backend/src/apiserver/template/v2_template.go

View workflow job for this annotation

GitHub Actions / run-go-unittests

platformSpec.PipelineConfig undefined (type pipelinespec.PlatformSpec has no field or method PipelineConfig)

Check failure on line 364 in backend/src/apiserver/template/v2_template.go

View workflow job for this annotation

GitHub Actions / backend-tests

platformSpec.PipelineConfig undefined (type pipelinespec.PlatformSpec has no field or method PipelineConfig)
}

func (t *V2Spec) validatePipelineJobInputs(job *pipelinespec.PipelineJob) error {
Expand Down
13 changes: 13 additions & 0 deletions backend/src/v2/compiler/argocompiler/argo.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,13 @@ type Options struct {
// optional
PipelineRoot string
// TODO(Bobgy): add an option -- dev mode, ImagePullPolicy should only be Always in dev mode.
TtlSeconds int32
}

const (
pipeline_default_ttlSeconds = int32(30)
)

func Compile(jobArg *pipelinespec.PipelineJob, kubernetesSpecArg *pipelinespec.SinglePlatformSpec, opts *Options) (*wfapi.Workflow, error) {
// clone jobArg, because we don't want to change it
jobMsg := proto.Clone(jobArg)
Expand Down Expand Up @@ -86,6 +91,11 @@ func Compile(jobArg *pipelinespec.PipelineJob, kubernetesSpecArg *pipelinespec.S
}
}

pipeline_ttlseconds := pipeline_default_ttlSeconds
if &opts.TtlSeconds != nil {
pipeline_ttlseconds = opts.TtlSeconds
}

// initialization
wf := &wfapi.Workflow{
TypeMeta: k8smeta.TypeMeta{
Expand Down Expand Up @@ -117,6 +127,9 @@ func Compile(jobArg *pipelinespec.PipelineJob, kubernetesSpecArg *pipelinespec.S
},
ServiceAccountName: "pipeline-runner",
Entrypoint: tmplEntrypoint,
TTLStrategy: &wfapi.TTLStrategy{
SecondsAfterCompletion: &pipeline_ttlseconds,
},
},
}
c := &workflowCompiler{
Expand Down
8 changes: 8 additions & 0 deletions sdk/python/kfp/compiler/pipeline_spec_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -1953,7 +1953,10 @@ def create_pipeline_spec(
if pipeline_config is not None:
_merge_pipeline_config(
pipelineConfig=pipeline_config, platformSpec=platform_spec)
<<<<<<< HEAD

=======
>>>>>>> cc9690e2b (Introduces PipelineConfig class with TTL configuration)
for group in all_groups:
build_spec_by_group(
pipeline_spec=pipeline_spec,
Expand Down Expand Up @@ -2076,6 +2079,11 @@ def _merge_pipeline_config(pipelineConfig: pipeline_config.PipelineConfig,
# {'pipelineConfig': {
# '<some pipeline config option>': pipelineConfig.<get that value>,
# }}, platformSpec.platforms['kubernetes'])

pipeline_config_json = json_format.ParseDict(
{'pipelineConfig': {
'pipelineTtl': pipelineConfig.get_ttl(),
}}, platformSpec.platforms['kubernetes'])

return platformSpec

Expand Down
1 change: 1 addition & 0 deletions sdk/python/kfp/dsl/component_decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import warnings

from kfp.dsl import component_factory
from kfp.dsl import pipeline_config


def component(func: Optional[Callable] = None,
Expand Down
5 changes: 5 additions & 0 deletions sdk/python/kfp/dsl/pipeline_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,8 @@ def __init__(self):
pass

# TODO add pipeline level configs
def set_ttl(self, ttl: int):
self.__ttl = ttl

def get_ttl(self) -> int:
return int(self.__ttl)
4 changes: 4 additions & 0 deletions sdk/python/kfp/dsl/pipeline_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,11 @@ def my_pipeline(a: str, b: int):
pipeline_root: The root directory from which to read input and output
parameters and artifacts.
display_name: A human-readable name for the pipeline.
<<<<<<< HEAD
pipeline_config: Pipeline-level config options.
=======
pipeline_config: Pipeline-level configuration.
>>>>>>> cc9690e2b (Introduces PipelineConfig class with TTL configuration)
"""
if func is None:
return functools.partial(
Expand Down

0 comments on commit 079d482

Please sign in to comment.