From 2bc6e1d491fedc1117559534b7e0baa09acfce8e Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Wed, 17 Jan 2024 10:04:29 -0800 Subject: [PATCH] make execution manager changes Signed-off-by: Yee Hing Tong --- .../pkg/manager/impl/execution_manager.go | 80 +++++++--------- .../manager/impl/execution_manager_test.go | 91 +++++++++++++++++++ 2 files changed, 126 insertions(+), 45 deletions(-) diff --git a/flyteadmin/pkg/manager/impl/execution_manager.go b/flyteadmin/pkg/manager/impl/execution_manager.go index dc36a308c4..37dcf2e6d4 100644 --- a/flyteadmin/pkg/manager/impl/execution_manager.go +++ b/flyteadmin/pkg/manager/impl/execution_manager.go @@ -742,6 +742,7 @@ func (m *ExecutionManager) getStringFromInput(ctx context.Context, inputBinding strVal = p.GetStringValue() case *core.Primitive_Datetime: t := time.Unix(p.GetDatetime().Seconds, int64(p.GetDatetime().Nanos)) + t = t.In(time.UTC) strVal = t.Format("2006-01-02") case *core.Primitive_StringValue: strVal = p.GetStringValue() @@ -776,46 +777,6 @@ func (m *ExecutionManager) fillInTemplateArgs(ctx context.Context, query core.Ar if query.GetUri() != "" { // If a query string, then just pass it through, nothing to fill in. return query, nil - } else if query.GetArtifactTag() != nil { - t := query.GetArtifactTag() - ak := t.GetArtifactKey() - if ak == nil { - return query, errors.NewFlyteAdminErrorf(codes.InvalidArgument, "tag doesn't have key") - } - var project, domain string - if ak.GetProject() == "" { - project = contextutils.Value(ctx, contextutils.ProjectKey) - } else { - project = ak.GetProject() - } - if ak.GetDomain() == "" { - domain = contextutils.Value(ctx, contextutils.DomainKey) - } else { - domain = ak.GetDomain() - } - strValue, err := m.getLabelValue(ctx, t.GetValue(), inputs) - if err != nil { - logger.Errorf(ctx, "Failed to template input string [%s] [%v]", t.GetValue(), err) - return query, err - } - - return core.ArtifactQuery{ - Identifier: &core.ArtifactQuery_ArtifactTag{ - ArtifactTag: &core.ArtifactTag{ - ArtifactKey: &core.ArtifactKey{ - Project: project, - Domain: domain, - Name: ak.GetName(), - }, - Value: &core.LabelValue{ - Value: &core.LabelValue_StaticValue{ - StaticValue: strValue, - }, - }, - }, - }, - }, nil - } else if query.GetArtifactId() != nil { artifactID := query.GetArtifactId() ak := artifactID.GetArtifactKey() @@ -836,7 +797,7 @@ func (m *ExecutionManager) fillInTemplateArgs(ctx context.Context, query core.Ar var partitions map[string]*core.LabelValue - if artifactID.GetPartitions() != nil && artifactID.GetPartitions().GetValue() != nil { + if artifactID.GetPartitions().GetValue() != nil { partitions = make(map[string]*core.LabelValue, len(artifactID.GetPartitions().Value)) for k, v := range artifactID.GetPartitions().GetValue() { newValue, err := m.getLabelValue(ctx, v, inputs) @@ -847,6 +808,36 @@ func (m *ExecutionManager) fillInTemplateArgs(ctx context.Context, query core.Ar partitions[k] = &core.LabelValue{Value: &core.LabelValue_StaticValue{StaticValue: newValue}} } } + + var timePartition *core.TimePartition + if artifactID.GetTimePartition().GetValue() != nil { + if artifactID.GetTimePartition().Value.GetTimeValue() != nil { + // If the time value is set, then just pass it through, nothing to fill in. + timePartition = artifactID.GetTimePartition() + } else if artifactID.GetTimePartition().Value.GetInputBinding() != nil { + // Evaluate the time partition input binding + lit, ok := inputs[artifactID.GetTimePartition().Value.GetInputBinding().GetVar()] + if !ok { + return query, errors.NewFlyteAdminErrorf(codes.InvalidArgument, "time partition input binding var [%s] not found in inputs %v", artifactID.GetTimePartition().Value.GetInputBinding().GetVar(), inputs) + } + + if lit.GetScalar().GetPrimitive().GetDatetime() == nil { + return query, errors.NewFlyteAdminErrorf(codes.InvalidArgument, + "time partition binding to input var [%s] failing because %v is not a datetime", + artifactID.GetTimePartition().Value.GetInputBinding().GetVar(), lit) + } + timePartition = &core.TimePartition{ + Value: &core.LabelValue{ + Value: &core.LabelValue_TimeValue{ + TimeValue: lit.GetScalar().GetPrimitive().GetDatetime(), + }, + }, + } + } else { + return query, errors.NewFlyteAdminErrorf(codes.InvalidArgument, "time partition value cannot be empty when evaluating query: %v", query) + } + } + return core.ArtifactQuery{ Identifier: &core.ArtifactQuery_ArtifactId{ ArtifactId: &core.ArtifactID{ @@ -855,11 +846,10 @@ func (m *ExecutionManager) fillInTemplateArgs(ctx context.Context, query core.Ar Domain: domain, Name: ak.GetName(), }, - Dimensions: &core.ArtifactID_Partitions{ - Partitions: &core.Partitions{ - Value: partitions, - }, + Partitions: &core.Partitions{ + Value: partitions, }, + TimePartition: timePartition, }, }, }, nil diff --git a/flyteadmin/pkg/manager/impl/execution_manager_test.go b/flyteadmin/pkg/manager/impl/execution_manager_test.go index e1f59cdf43..72c700008f 100644 --- a/flyteadmin/pkg/manager/impl/execution_manager_test.go +++ b/flyteadmin/pkg/manager/impl/execution_manager_test.go @@ -5719,5 +5719,96 @@ func TestAddStateFilter(t *testing.T) { assert.NoError(t, err) assert.Equal(t, "state <> ?", expression.Query) }) +} + +func TestQueryTemplate(t *testing.T) { + ctx := context.Background() + + aTime := time.Date( + 2063, 4, 5, 00, 00, 00, 0, time.UTC) + + rawInputs := map[string]interface{}{ + "aStr": "hello world", + "anInt": 1, + "aFloat": 1.3, + "aTime": aTime, + } + + otherInputs, err := coreutils.MakeLiteralMap(rawInputs) + assert.NoError(t, err) + + m := ExecutionManager{} + + ak := &core.ArtifactKey{ + Project: "project", + Domain: "domain", + Name: "testname", + } + t.Run("test all present, nothing to fill in", func(t *testing.T) { + pMap := map[string]*core.LabelValue{ + "partition1": {Value: &core.LabelValue_StaticValue{StaticValue: "my value"}}, + "partition2": {Value: &core.LabelValue_StaticValue{StaticValue: "my value 2"}}, + } + p := &core.Partitions{Value: pMap} + + q := core.ArtifactQuery{ + Identifier: &core.ArtifactQuery_ArtifactId{ + ArtifactId: &core.ArtifactID{ + ArtifactKey: ak, + Partitions: p, + TimePartition: nil, + }, + }, + } + + filledQuery, err := m.fillInTemplateArgs(ctx, q, otherInputs.Literals) + assert.NoError(t, err) + assert.True(t, proto.Equal(&q, &filledQuery)) + }) + + t.Run("template date-times, both in explicit tp and not", func(t *testing.T) { + pMap := map[string]*core.LabelValue{ + "partition1": {Value: &core.LabelValue_InputBinding{InputBinding: &core.InputBindingData{Var: "aTime"}}}, + "partition2": {Value: &core.LabelValue_StaticValue{StaticValue: "my value 2"}}, + } + p := &core.Partitions{Value: pMap} + + q := core.ArtifactQuery{ + Identifier: &core.ArtifactQuery_ArtifactId{ + ArtifactId: &core.ArtifactID{ + ArtifactKey: ak, + Partitions: p, + TimePartition: &core.TimePartition{Value: &core.LabelValue{Value: &core.LabelValue_InputBinding{InputBinding: &core.InputBindingData{Var: "aTime"}}}}, + }, + }, + } + + filledQuery, err := m.fillInTemplateArgs(ctx, q, otherInputs.Literals) + assert.NoError(t, err) + staticTime := filledQuery.GetArtifactId().Partitions.Value["partition1"].GetStaticValue() + assert.Equal(t, "2063-04-05", staticTime) + assert.Equal(t, int64(2942956800), filledQuery.GetArtifactId().TimePartition.Value.GetTimeValue().Seconds) + }) + + t.Run("something missing", func(t *testing.T) { + pMap := map[string]*core.LabelValue{ + "partition1": {Value: &core.LabelValue_StaticValue{StaticValue: "my value"}}, + "partition2": {Value: &core.LabelValue_StaticValue{StaticValue: "my value 2"}}, + } + p := &core.Partitions{Value: pMap} + + q := core.ArtifactQuery{ + Identifier: &core.ArtifactQuery_ArtifactId{ + ArtifactId: &core.ArtifactID{ + ArtifactKey: ak, + Partitions: p, + TimePartition: &core.TimePartition{Value: &core.LabelValue{Value: &core.LabelValue_InputBinding{InputBinding: &core.InputBindingData{Var: "wrong var"}}}}, + }, + }, + } + + _, err := m.fillInTemplateArgs(ctx, q, otherInputs.Literals) + assert.Error(t, err) + }) }