diff --git a/api/v2alpha1/go/cachekey/cache_key.pb.go b/api/v2alpha1/go/cachekey/cache_key.pb.go index eb29fd917c7b..1d6fa474c0ed 100644 --- a/api/v2alpha1/go/cachekey/cache_key.pb.go +++ b/api/v2alpha1/go/cachekey/cache_key.pb.go @@ -14,7 +14,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.26.0 +// protoc-gen-go v1.33.0 // protoc v3.17.3 // source: cache_key.proto @@ -42,7 +42,7 @@ type CacheKey struct { unknownFields protoimpl.UnknownFields InputArtifactNames map[string]*ArtifactNameList `protobuf:"bytes,1,rep,name=inputArtifactNames,proto3" json:"inputArtifactNames,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` - // Deprecated: Do not use. + // Deprecated: Marked as deprecated in cache_key.proto. InputParameters map[string]*pipelinespec.Value `protobuf:"bytes,2,rep,name=inputParameters,proto3" json:"inputParameters,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` OutputArtifactsSpec map[string]*pipelinespec.RuntimeArtifact `protobuf:"bytes,3,rep,name=outputArtifactsSpec,proto3" json:"outputArtifactsSpec,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` OutputParametersSpec map[string]string `protobuf:"bytes,4,rep,name=outputParametersSpec,proto3" json:"outputParametersSpec,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` @@ -89,7 +89,7 @@ func (x *CacheKey) GetInputArtifactNames() map[string]*ArtifactNameList { return nil } -// Deprecated: Do not use. +// Deprecated: Marked as deprecated in cache_key.proto. func (x *CacheKey) GetInputParameters() map[string]*pipelinespec.Value { if x != nil { return x.InputParameters diff --git a/api/v2alpha1/go/pipelinespec/pipeline_spec.pb.go b/api/v2alpha1/go/pipelinespec/pipeline_spec.pb.go index b4bca0cec5ff..2275b69af0df 100644 --- a/api/v2alpha1/go/pipelinespec/pipeline_spec.pb.go +++ b/api/v2alpha1/go/pipelinespec/pipeline_spec.pb.go @@ -2484,7 +2484,7 @@ type PlatformDeploymentConfig struct { func (x *PlatformDeploymentConfig) Reset() { *x = PlatformDeploymentConfig{} if protoimpl.UnsafeEnabled { - mi := &file_pipeline_spec_proto_msgTypes[29] + mi := &file_pipeline_spec_proto_msgTypes[30] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -2497,7 +2497,7 @@ func (x *PlatformDeploymentConfig) String() string { func (*PlatformDeploymentConfig) ProtoMessage() {} func (x *PlatformDeploymentConfig) ProtoReflect() protoreflect.Message { - mi := &file_pipeline_spec_proto_msgTypes[29] + mi := &file_pipeline_spec_proto_msgTypes[30] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -2510,7 +2510,7 @@ func (x *PlatformDeploymentConfig) ProtoReflect() protoreflect.Message { // Deprecated: Use PlatformDeploymentConfig.ProtoReflect.Descriptor instead. func (*PlatformDeploymentConfig) Descriptor() ([]byte, []int) { - return file_pipeline_spec_proto_rawDescGZIP(), []int{29} + return file_pipeline_spec_proto_rawDescGZIP(), []int{30} } func (x *PlatformDeploymentConfig) GetExecutors() map[string]*structpb.Struct { @@ -7163,6 +7163,18 @@ func file_pipeline_spec_proto_init() { } } file_pipeline_spec_proto_msgTypes[29].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*PipelineConfig); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_pipeline_spec_proto_msgTypes[30].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*PlatformDeploymentConfig); i { case 0: return &v.state diff --git a/api/v2alpha1/pipeline_spec.proto b/api/v2alpha1/pipeline_spec.proto index 075913f80520..1b261d64803d 100644 --- a/api/v2alpha1/pipeline_spec.proto +++ b/api/v2alpha1/pipeline_spec.proto @@ -1079,6 +1079,8 @@ message PipelineStateEnum { message PlatformSpec { // Platform key to full platform config map platforms = 1; + + } message SinglePlatformSpec { @@ -1095,6 +1097,11 @@ message SinglePlatformSpec { PipelineConfig pipelineConfig = 4; } +message PipelineConfig { + // Set TTL at pipeline-level + optional int32 pipelineTtl = 1; +} + message PlatformDeploymentConfig { // Map of executor label to executor-level config // Mirrors PipelineSpec.deployment_spec.executors structure diff --git a/backend/src/apiserver/template/v2_template.go b/backend/src/apiserver/template/v2_template.go index d14ddffdaeb7..482bfbcf5d97 100644 --- a/backend/src/apiserver/template/v2_template.go +++ b/backend/src/apiserver/template/v2_template.go @@ -23,6 +23,7 @@ import ( "regexp" "strings" + "github.com/golang/glog" structpb "github.com/golang/protobuf/ptypes/struct" "github.com/kubeflow/pipelines/api/v2alpha1/go/pipelinespec" "github.com/kubeflow/pipelines/backend/src/apiserver/model" @@ -77,9 +78,17 @@ func (t *V2Spec) ScheduledWorkflow(modelJob *model.Job) (*scheduledworkflow.Sche } } + var pipeline_options argocompiler.Options + if t.platformSpec.PipelineConfig.Ttl != nil { + glog.Info("Found pipeline config") + pipeline_options = argocompiler.Options{ + TtlSeconds: *t.platformSpec.PipelineConfig.Ttl, + } + } + var obj interface{} if util.CurrentExecutionType() == util.ArgoWorkflow { - obj, err = argocompiler.Compile(job, kubernetesSpec, nil) + obj, err = argocompiler.Compile(job, kubernetesSpec, &pipeline_options) } else if util.CurrentExecutionType() == util.TektonPipelineRun { obj, err = tektoncompiler.Compile(job, kubernetesSpec, &tektoncompiler.Options{LauncherImage: Launcher}) } @@ -300,9 +309,17 @@ func (t *V2Spec) RunWorkflow(modelRun *model.Run, options RunWorkflowOptions) (u } } + var pipeline_options *argocompiler.Options + if t.platformSpec.PipelineConfig.Ttl != nil { + glog.Info("Found pipeline config") + pipeline_options = &argocompiler.Options{ + TtlSeconds: *t.platformSpec.PipelineConfig.Ttl, + } + } + var obj interface{} if util.CurrentExecutionType() == util.ArgoWorkflow { - obj, err = argocompiler.Compile(job, kubernetesSpec, nil) + obj, err = argocompiler.Compile(job, kubernetesSpec, pipeline_options) } else if util.CurrentExecutionType() == util.TektonPipelineRun { obj, err = tektoncompiler.Compile(job, kubernetesSpec, nil) } @@ -344,7 +361,7 @@ func IsPlatformSpecWithKubernetesConfig(template []byte) bool { return false } _, ok := platformSpec.Platforms["kubernetes"] - return ok + return ok || platformSpec.PipelineConfig != nil } func (t *V2Spec) validatePipelineJobInputs(job *pipelinespec.PipelineJob) error { diff --git a/backend/src/v2/compiler/argocompiler/argo.go b/backend/src/v2/compiler/argocompiler/argo.go index 1f1c19ed3ec1..4c7aa8e560a1 100644 --- a/backend/src/v2/compiler/argocompiler/argo.go +++ b/backend/src/v2/compiler/argocompiler/argo.go @@ -40,8 +40,13 @@ type Options struct { // optional PipelineRoot string // TODO(Bobgy): add an option -- dev mode, ImagePullPolicy should only be Always in dev mode. + TtlSeconds int32 } +const ( + pipeline_default_ttlSeconds = int32(30) +) + func Compile(jobArg *pipelinespec.PipelineJob, kubernetesSpecArg *pipelinespec.SinglePlatformSpec, opts *Options) (*wfapi.Workflow, error) { // clone jobArg, because we don't want to change it jobMsg := proto.Clone(jobArg) @@ -86,6 +91,11 @@ func Compile(jobArg *pipelinespec.PipelineJob, kubernetesSpecArg *pipelinespec.S } } + pipeline_ttlseconds := pipeline_default_ttlSeconds + if &opts.TtlSeconds != nil { + pipeline_ttlseconds = opts.TtlSeconds + } + // initialization wf := &wfapi.Workflow{ TypeMeta: k8smeta.TypeMeta{ @@ -117,6 +127,9 @@ func Compile(jobArg *pipelinespec.PipelineJob, kubernetesSpecArg *pipelinespec.S }, ServiceAccountName: "pipeline-runner", Entrypoint: tmplEntrypoint, + TTLStrategy: &wfapi.TTLStrategy{ + SecondsAfterCompletion: &pipeline_ttlseconds, + }, }, } c := &workflowCompiler{ diff --git a/sdk/python/kfp/compiler/pipeline_spec_builder.py b/sdk/python/kfp/compiler/pipeline_spec_builder.py index 3f1575005da5..44738fbbf5a3 100644 --- a/sdk/python/kfp/compiler/pipeline_spec_builder.py +++ b/sdk/python/kfp/compiler/pipeline_spec_builder.py @@ -1953,7 +1953,10 @@ def create_pipeline_spec( if pipeline_config is not None: _merge_pipeline_config( pipelineConfig=pipeline_config, platformSpec=platform_spec) +<<<<<<< HEAD +======= +>>>>>>> cc9690e2b (Introduces PipelineConfig class with TTL configuration) for group in all_groups: build_spec_by_group( pipeline_spec=pipeline_spec, @@ -2076,6 +2079,11 @@ def _merge_pipeline_config(pipelineConfig: pipeline_config.PipelineConfig, # {'pipelineConfig': { # '': pipelineConfig., # }}, platformSpec.platforms['kubernetes']) + + pipeline_config_json = json_format.ParseDict( + {'pipelineConfig': { + 'pipelineTtl': pipelineConfig.get_ttl(), + }}, platformSpec.platforms['kubernetes']) return platformSpec diff --git a/sdk/python/kfp/dsl/component_decorator.py b/sdk/python/kfp/dsl/component_decorator.py index f9392c14424a..9311e046004c 100644 --- a/sdk/python/kfp/dsl/component_decorator.py +++ b/sdk/python/kfp/dsl/component_decorator.py @@ -17,6 +17,7 @@ import warnings from kfp.dsl import component_factory +from kfp.dsl import pipeline_config def component(func: Optional[Callable] = None, diff --git a/sdk/python/kfp/dsl/pipeline_config.py b/sdk/python/kfp/dsl/pipeline_config.py index a4e90c28a012..cdc2ca25b32b 100644 --- a/sdk/python/kfp/dsl/pipeline_config.py +++ b/sdk/python/kfp/dsl/pipeline_config.py @@ -21,3 +21,8 @@ def __init__(self): pass # TODO add pipeline level configs + def set_ttl(self, ttl: int): + self.__ttl = ttl + + def get_ttl(self) -> int: + return int(self.__ttl) diff --git a/sdk/python/kfp/dsl/pipeline_context.py b/sdk/python/kfp/dsl/pipeline_context.py index 049aad3a748c..beb481c2fcf2 100644 --- a/sdk/python/kfp/dsl/pipeline_context.py +++ b/sdk/python/kfp/dsl/pipeline_context.py @@ -53,7 +53,11 @@ def my_pipeline(a: str, b: int): pipeline_root: The root directory from which to read input and output parameters and artifacts. display_name: A human-readable name for the pipeline. +<<<<<<< HEAD pipeline_config: Pipeline-level config options. +======= + pipeline_config: Pipeline-level configuration. +>>>>>>> cc9690e2b (Introduces PipelineConfig class with TTL configuration) """ if func is None: return functools.partial(