From c8f114e6b867c450ec19bd64abf5a259420c2025 Mon Sep 17 00:00:00 2001 From: Da-Yi Wu Date: Tue, 12 Sep 2023 14:25:12 +0800 Subject: [PATCH 1/3] Add UpdateTagfunc Signed-off-by: Da-Yi Wu --- .../pkg/manager/impl/execution_manager.go | 6 ++- .../repositories/transformers/execution.go | 38 +++++++++++++++++++ 2 files changed, 43 insertions(+), 1 deletion(-) diff --git a/flyteadmin/pkg/manager/impl/execution_manager.go b/flyteadmin/pkg/manager/impl/execution_manager.go index bd57d4fb14..68909fa806 100644 --- a/flyteadmin/pkg/manager/impl/execution_manager.go +++ b/flyteadmin/pkg/manager/impl/execution_manager.go @@ -1340,12 +1340,16 @@ func (m *ExecutionManager) UpdateExecution(ctx context.Context, request admin.Ex logger.Debugf(ctx, "Failed to get execution model for request [%+v] with err: %v", request, err) return nil, err } - + if err = transformers.UpdateExecutionModelStateChangeDetails(executionModel, request.State, requestedAt, getUser(ctx)); err != nil { return nil, err } + if err = transformers.UpdateExecutionModelTag(executionModel, request.Tags); err != nil { + return nil, err + } + if err := m.db.ExecutionRepo().Update(ctx, *executionModel); err != nil { return nil, err } diff --git a/flyteadmin/pkg/repositories/transformers/execution.go b/flyteadmin/pkg/repositories/transformers/execution.go index abd77413e0..71cf3df809 100644 --- a/flyteadmin/pkg/repositories/transformers/execution.go +++ b/flyteadmin/pkg/repositories/transformers/execution.go @@ -292,6 +292,44 @@ func UpdateExecutionModelStateChangeDetails(executionModel *models.Execution, st return nil } +//Update tag information of existing execution model. +func UpdateExecutionModelTag(executionModel *models.Execution, tagsUpdatedTo []string) error { + //tagUpdatedAt time.Time, tagUpdatedBy string) error { + + + + // First: I need to figure out why is executionclosure here.. Do I need it? + //var closure admin.ExecutionClosure + //err := proto.Unmarshal(executionModel.Closure, &closure) + //if err != nil { + // return errors.NewFlyteAdminErrorf(codes.Internal, "Failed to unmarshal execution closure: %v", err) + //} + + // Update the closure with the same + //var tagUpdatedAtProto *timestamppb.Timestamp + // Default use the createdAt timestamp as the state change occurredAt time + //if stateUpdatedAtProto, err = ptypes.TimestampProto(stateUpdatedAt); err != nil { + // return err + //} + + // combine tags and tagsUpdatedTo to one []models.AdminTag and then write back to executionModel.Tags + for _, tag := range tagsUpdatedTo { + executionModel.Tags = append(executionModel.Tags, models.AdminTag{Name: tag}) + } + //I need to figure out where is tag is wrote? + //closure.StateChangeDetails = &admin.ExecutionStateChangeDetails{ + // Tags: tagsUpdatedTo, + // Principal: stateUpdatedBy, + // OccurredAt: stateUpdatedAtProto, + //} + //marshaledClosure, err := proto.Marshal(&closure) + //if err != nil { + // return errors.NewFlyteAdminErrorf(codes.Internal, "Failed to marshal execution closure: %v", err) + //} + //executionModel.Closure = marshaledClosure + return nil +} + // The execution abort metadata is recorded but the phase is not actually updated *until* the abort event is propagated // by flytepropeller. The metadata is preemptively saved at the time of the abort. func SetExecutionAborting(execution *models.Execution, cause, principal string) error { From 46636e1621dcf663ee99f22b8608e738c1b9674a Mon Sep 17 00:00:00 2001 From: Da-Yi Wu Date: Wed, 20 Sep 2023 18:11:32 +0800 Subject: [PATCH 2/3] Modify UpdateExecutionModelTag Signed-off-by: Da-Yi Wu --- .../repositories/transformers/execution.go | 47 ++++++++++++++----- 1 file changed, 35 insertions(+), 12 deletions(-) diff --git a/flyteadmin/pkg/repositories/transformers/execution.go b/flyteadmin/pkg/repositories/transformers/execution.go index 71cf3df809..1dca800460 100644 --- a/flyteadmin/pkg/repositories/transformers/execution.go +++ b/flyteadmin/pkg/repositories/transformers/execution.go @@ -292,30 +292,53 @@ func UpdateExecutionModelStateChangeDetails(executionModel *models.Execution, st return nil } -//Update tag information of existing execution model. +// Update tag information of existing execution model. func UpdateExecutionModelTag(executionModel *models.Execution, tagsUpdatedTo []string) error { //tagUpdatedAt time.Time, tagUpdatedBy string) error { - - - // First: I need to figure out why is executionclosure here.. Do I need it? - //var closure admin.ExecutionClosure - //err := proto.Unmarshal(executionModel.Closure, &closure) - //if err != nil { - // return errors.NewFlyteAdminErrorf(codes.Internal, "Failed to unmarshal execution closure: %v", err) - //} - + var spec admin.ExecutionSpec + var err error + if err = proto.Unmarshal(executionModel.Spec, &spec); err != nil { + return errors.NewFlyteAdminErrorf(codes.Internal, "failed to unmarshal spec") + } + // Update the closure with the same //var tagUpdatedAtProto *timestamppb.Timestamp // Default use the createdAt timestamp as the state change occurredAt time //if stateUpdatedAtProto, err = ptypes.TimestampProto(stateUpdatedAt); err != nil { // return err //} - + // combine tags and tagsUpdatedTo to one []models.AdminTag and then write back to executionModel.Tags + // logger out the tags name of executionModel.Tags + //logger.Infof(context.Background(), "Before UpdateTag is executed") + //for _, tag := range spec.Tags { + // logger.Infof(context.Background(), "tag name: %v", tag) + //} + //Append tagsUpdatedTo to spec.Tags, but use set to avoid duplicate tags + tagSet := sets.NewString() + for _, tag := range spec.Tags { + tagSet.Insert(tag) + } for _, tag := range tagsUpdatedTo { - executionModel.Tags = append(executionModel.Tags, models.AdminTag{Name: tag}) + tagSet.Insert(tag) + } + spec.Tags = tagSet.List() + //logger.Infof(context.Background(), "After tag name: %v", spec.Tags) + //spec.Tags = append(spec.Tags, tagsUpdatedTo...) + //Write into DB + marshaledSpec, err := proto.Marshal(&spec) + if err != nil { + return errors.NewFlyteAdminErrorf(codes.Internal, "Failed to marshal execution spec: %v", err) } + executionModel.Spec = marshaledSpec + + // logger out the tags name of executionModel.Tags + + //logger.Infof(context.Background(), "After UpdateTag is executed") + //for _, tag := range executionModel.Tags { + // logger.Infof(context.Background(), "tag name: %v", tag.Name) + //} //I need to figure out where is tag is wrote? //closure.StateChangeDetails = &admin.ExecutionStateChangeDetails{ // Tags: tagsUpdatedTo, From 9b00bb9807034f053a2b3921e72ff7cd98aae21f Mon Sep 17 00:00:00 2001 From: Da-Yi Wu Date: Sat, 23 Sep 2023 15:47:40 +0800 Subject: [PATCH 3/3] Update the Tags into Execution.Tag Signed-off-by: Da-Yi Wu --- .../repositories/transformers/execution.go | 42 +++---------------- 1 file changed, 6 insertions(+), 36 deletions(-) diff --git a/flyteadmin/pkg/repositories/transformers/execution.go b/flyteadmin/pkg/repositories/transformers/execution.go index 1dca800460..202c4e003e 100644 --- a/flyteadmin/pkg/repositories/transformers/execution.go +++ b/flyteadmin/pkg/repositories/transformers/execution.go @@ -103,7 +103,7 @@ func CreateExecutionModel(input CreateExecutionModelInput) (*models.Execution, e for i, tag := range input.RequestSpec.Tags { tags[i] = models.AdminTag{Name: tag} } - + executionModel := &models.Execution{ ExecutionKey: models.ExecutionKey{ Project: input.WorkflowExecutionID.Project, @@ -294,7 +294,6 @@ func UpdateExecutionModelStateChangeDetails(executionModel *models.Execution, st // Update tag information of existing execution model. func UpdateExecutionModelTag(executionModel *models.Execution, tagsUpdatedTo []string) error { - //tagUpdatedAt time.Time, tagUpdatedBy string) error { var spec admin.ExecutionSpec var err error @@ -302,54 +301,25 @@ func UpdateExecutionModelTag(executionModel *models.Execution, tagsUpdatedTo []s return errors.NewFlyteAdminErrorf(codes.Internal, "failed to unmarshal spec") } - // Update the closure with the same - //var tagUpdatedAtProto *timestamppb.Timestamp - // Default use the createdAt timestamp as the state change occurredAt time - //if stateUpdatedAtProto, err = ptypes.TimestampProto(stateUpdatedAt); err != nil { - // return err - //} - - // combine tags and tagsUpdatedTo to one []models.AdminTag and then write back to executionModel.Tags - // logger out the tags name of executionModel.Tags - //logger.Infof(context.Background(), "Before UpdateTag is executed") - //for _, tag := range spec.Tags { - // logger.Infof(context.Background(), "tag name: %v", tag) - //} - //Append tagsUpdatedTo to spec.Tags, but use set to avoid duplicate tags tagSet := sets.NewString() for _, tag := range spec.Tags { tagSet.Insert(tag) } for _, tag := range tagsUpdatedTo { + // if tag not in tagSet, append into executionModel.Tags + if !tagSet.Has(tag) { + executionModel.Tags = append(executionModel.Tags, models.AdminTag{Name: tag}) + } tagSet.Insert(tag) } spec.Tags = tagSet.List() - //logger.Infof(context.Background(), "After tag name: %v", spec.Tags) - //spec.Tags = append(spec.Tags, tagsUpdatedTo...) - //Write into DB + marshaledSpec, err := proto.Marshal(&spec) if err != nil { return errors.NewFlyteAdminErrorf(codes.Internal, "Failed to marshal execution spec: %v", err) } executionModel.Spec = marshaledSpec - // logger out the tags name of executionModel.Tags - - //logger.Infof(context.Background(), "After UpdateTag is executed") - //for _, tag := range executionModel.Tags { - // logger.Infof(context.Background(), "tag name: %v", tag.Name) - //} - //I need to figure out where is tag is wrote? - //closure.StateChangeDetails = &admin.ExecutionStateChangeDetails{ - // Tags: tagsUpdatedTo, - // Principal: stateUpdatedBy, - // OccurredAt: stateUpdatedAtProto, - //} - //marshaledClosure, err := proto.Marshal(&closure) - //if err != nil { - // return errors.NewFlyteAdminErrorf(codes.Internal, "Failed to marshal execution closure: %v", err) - //} - //executionModel.Closure = marshaledClosure return nil }