From 54c055ed2f94a55672c9892df76c2816718cc556 Mon Sep 17 00:00:00 2001 From: Andrew Dye Date: Wed, 27 Sep 2023 13:09:25 -0700 Subject: [PATCH] Plugin changes for plumbing k8s events into TaskExecutionEvent (#406) * Plugin changes for k8s events Signed-off-by: Andrew Dye * make generate Signed-off-by: Andrew Dye * Comment for SendObjectEvents Signed-off-by: Andrew Dye * make generate Signed-off-by: Andrew Dye --------- Signed-off-by: Andrew Dye --- .../go/tasks/pluginmachinery/core/phase.go | 10 +++++++++- .../pluginmachinery/flytek8s/config/config.go | 6 +++++- .../flytek8s/config/k8spluginconfig_flags.go | 1 + .../flytek8s/config/k8spluginconfig_flags_test.go | 14 ++++++++++++++ 4 files changed, 29 insertions(+), 2 deletions(-) diff --git a/flyteplugins/go/tasks/pluginmachinery/core/phase.go b/flyteplugins/go/tasks/pluginmachinery/core/phase.go index 93fd3067d1..dbd1f699bd 100644 --- a/flyteplugins/go/tasks/pluginmachinery/core/phase.go +++ b/flyteplugins/go/tasks/pluginmachinery/core/phase.go @@ -4,8 +4,9 @@ import ( "fmt" "time" - "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" structpb "github.com/golang/protobuf/ptypes/struct" + + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" ) const DefaultPhaseVersion = uint32(0) @@ -83,6 +84,11 @@ type ExternalResource struct { Phase Phase } +type ReasonInfo struct { + Reason string + OccurredAt *time.Time +} + type TaskInfo struct { // log information for the task execution Logs []*core.TaskLog @@ -96,6 +102,8 @@ type TaskInfo struct { CustomInfo *structpb.Struct // A collection of information about external resources launched by this task ExternalResources []*ExternalResource + // Additional reasons for this case. Note, these are not included in the phase state. + AdditionalReasons []ReasonInfo } func (t *TaskInfo) String() string { diff --git a/flyteplugins/go/tasks/pluginmachinery/flytek8s/config/config.go b/flyteplugins/go/tasks/pluginmachinery/flytek8s/config/config.go index daaae6e681..56a57ffdeb 100755 --- a/flyteplugins/go/tasks/pluginmachinery/flytek8s/config/config.go +++ b/flyteplugins/go/tasks/pluginmachinery/flytek8s/config/config.go @@ -10,9 +10,10 @@ import ( "k8s.io/apimachinery/pkg/api/resource" - config2 "github.com/flyteorg/flytestdlib/config" v1 "k8s.io/api/core/v1" + config2 "github.com/flyteorg/flytestdlib/config" + "github.com/flyteorg/flyteplugins/go/tasks/config" ) @@ -167,6 +168,9 @@ type K8sPluginConfig struct { // DefaultPodTemplateResync defines the frequency at which the k8s informer resyncs the default // pod template resources. DefaultPodTemplateResync config2.Duration `json:"default-pod-template-resync" pflag:",Frequency of resyncing default pod templates"` + + // SendObjectEvents indicates whether to send k8s object events in TaskExecutionEvent updates (similar to kubectl get events). + SendObjectEvents bool `json:"send-object-events" pflag:",If true, will send k8s object events in TaskExecutionEvent updates."` } // FlyteCoPilotConfig specifies configuration for the Flyte CoPilot system. FlyteCoPilot, allows running flytekit-less containers diff --git a/flyteplugins/go/tasks/pluginmachinery/flytek8s/config/k8spluginconfig_flags.go b/flyteplugins/go/tasks/pluginmachinery/flytek8s/config/k8spluginconfig_flags.go index 7e339dbbe9..7a3f1c951e 100755 --- a/flyteplugins/go/tasks/pluginmachinery/flytek8s/config/k8spluginconfig_flags.go +++ b/flyteplugins/go/tasks/pluginmachinery/flytek8s/config/k8spluginconfig_flags.go @@ -66,5 +66,6 @@ func (cfg K8sPluginConfig) GetPFlagSet(prefix string) *pflag.FlagSet { cmdFlags.Bool(fmt.Sprintf("%v%v", prefix, "delete-resource-on-finalize"), defaultK8sConfig.DeleteResourceOnFinalize, "Instructs the system to delete the resource upon successful execution of a k8s pod rather than have the k8s garbage collector clean it up. This ensures that no resources are kept around (potentially consuming cluster resources). This, however, will cause k8s log links to expire as soon as the resource is finalized.") cmdFlags.String(fmt.Sprintf("%v%v", prefix, "default-pod-template-name"), defaultK8sConfig.DefaultPodTemplateName, "Name of the PodTemplate to use as the base for all k8s pods created by FlytePropeller.") cmdFlags.String(fmt.Sprintf("%v%v", prefix, "default-pod-template-resync"), defaultK8sConfig.DefaultPodTemplateResync.String(), "Frequency of resyncing default pod templates") + cmdFlags.Bool(fmt.Sprintf("%v%v", prefix, "send-object-events"), defaultK8sConfig.SendObjectEvents, "If true, will send k8s object events in TaskExecutionEvent updates.") return cmdFlags } diff --git a/flyteplugins/go/tasks/pluginmachinery/flytek8s/config/k8spluginconfig_flags_test.go b/flyteplugins/go/tasks/pluginmachinery/flytek8s/config/k8spluginconfig_flags_test.go index c10e9c70fb..4d5918a3b5 100755 --- a/flyteplugins/go/tasks/pluginmachinery/flytek8s/config/k8spluginconfig_flags_test.go +++ b/flyteplugins/go/tasks/pluginmachinery/flytek8s/config/k8spluginconfig_flags_test.go @@ -323,4 +323,18 @@ func TestK8sPluginConfig_SetFlags(t *testing.T) { } }) }) + t.Run("Test_send-object-events", func(t *testing.T) { + + t.Run("Override", func(t *testing.T) { + testValue := "1" + + cmdFlags.Set("send-object-events", testValue) + if vBool, err := cmdFlags.GetBool("send-object-events"); err == nil { + testDecodeJson_K8sPluginConfig(t, fmt.Sprintf("%v", vBool), &actual.SendObjectEvents) + + } else { + assert.FailNow(t, err.Error()) + } + }) + }) }