Skip to content

Commit

Permalink
Add Tags in UpdateExecutionModel(flyteorg#4185)
Browse files Browse the repository at this point in the history
Signed-off-by: Da-Yi Wu <[email protected]>

Modify UpdateExecutionModelTag

Signed-off-by: Da-Yi Wu <[email protected]>

Update the Tags into Execution.Tag

Signed-off-by: Da-Yi Wu <[email protected]>

Update Execution Tag(flyteorg#4142)

Signed-off-by: Da-Yi Wu <[email protected]>

Add unit test for add tags

Signed-off-by: Da-Yi Wu <[email protected]>

Add unit test for add tag (flyteorg#4185)

Signed-off-by: Da-Yi Wu <[email protected]>

Add Tags

Signed-off-by: Da-Yi Wu <[email protected]>
  • Loading branch information
ericwudayi committed Oct 15, 2023
1 parent c6476cc commit d40d544
Show file tree
Hide file tree
Showing 20 changed files with 835 additions and 175 deletions.
4 changes: 4 additions & 0 deletions flyteadmin/pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1346,6 +1346,10 @@ func (m *ExecutionManager) UpdateExecution(ctx context.Context, request admin.Ex
return nil, err
}

if err = transformers.UpdateExecutionModelAddTag(executionModel, request.AddTags); err != nil {
return nil, err
}

if err := m.db.ExecutionRepo().Update(ctx, *executionModel); err != nil {
return nil, err
}
Expand Down
32 changes: 32 additions & 0 deletions flyteadmin/pkg/manager/impl/execution_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2912,6 +2912,38 @@ func TestUpdateExecution(t *testing.T) {
assert.True(t, updateExecFuncCalled)
})

t.Run("add tag", func(t *testing.T) {
repository := repositoryMocks.NewMockRepository()
updateExecFuncCalled := false
updateExecFunc := func(ctx context.Context, execModel models.Execution) error {
// add tags to the execution, list of string
tags := []string{"tag1", "tag2"}
// read execModel.Tags
execTags := execModel.Tags
// Extract string of execTags
var execTagsString []string
for _, tag := range execTags {
execTagsString = append(execTagsString, tag.Name)
}

// Compare the tags
assert.Equal(t, tags, execTagsString)
updateExecFuncCalled = true
return nil
}
repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetUpdateCallback(updateExecFunc)
r := plugins.NewRegistry()
r.RegisterDefault(plugins.PluginIDWorkflowExecutor, &defaultTestExecutor)
execManager := NewExecutionManager(repository, r, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil, nil, &eventWriterMocks.WorkflowExecutionEventWriter{})
updateResponse, err := execManager.UpdateExecution(context.Background(), admin.ExecutionUpdateRequest{
Id: &executionIdentifier,
AddTags: []string{"tag1", "tag2"},
}, time.Now())
assert.NoError(t, err)
assert.NotNil(t, updateResponse)
assert.True(t, updateExecFuncCalled)
})

t.Run("update error", func(t *testing.T) {
repository := repositoryMocks.NewMockRepository()
updateExecFunc := func(ctx context.Context, execModel models.Execution) error {
Expand Down
40 changes: 40 additions & 0 deletions flyteadmin/pkg/repositories/transformers/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,46 @@ func UpdateExecutionModelStateChangeDetails(executionModel *models.Execution, st
return nil
}

// Update tag information of existing execution model.
func UpdateExecutionModelAddTag(executionModel *models.Execution, addTags []string) error {

var spec admin.ExecutionSpec
var err error
if err = proto.Unmarshal(executionModel.Spec, &spec); err != nil {
return errors.NewFlyteAdminErrorf(codes.Internal, "failed to unmarshal spec")
}

//if spec.Tags is not list, create a new list an assign to spec.Tags
if spec.Tags == nil {
spec.Tags = make([]string, 0)
}

tagSet := sets.NewString()
for _, tag := range spec.Tags {
tagSet.Insert(tag)
}
for _, tag := range addTags {
// if len(tag) == 0, skip
if len(tag) == 0 {
continue
}
// if tag not in tagSet, append into executionModel.Tags
if !tagSet.Has(tag) {
executionModel.Tags = append(executionModel.Tags, models.AdminTag{Name: tag})
}
tagSet.Insert(tag)
}
spec.Tags = tagSet.List()

marshaledSpec, err := proto.Marshal(&spec)
if err != nil {
return errors.NewFlyteAdminErrorf(codes.Internal, "Failed to marshal execution spec: %v", err)
}
executionModel.Spec = marshaledSpec

return nil
}

// The execution abort metadata is recorded but the phase is not actually updated *until* the abort event is propagated
// by flytepropeller. The metadata is preemptively saved at the time of the abort.
func SetExecutionAborting(execution *models.Execution, cause, principal string) error {
Expand Down
67 changes: 67 additions & 0 deletions flyteadmin/pkg/repositories/transformers/execution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -901,6 +901,73 @@ func TestUpdateExecutionModelStateChangeDetails(t *testing.T) {
})
}

func TestUpdateExecutionModelAddTags(t *testing.T) {
t.Run("empty closure", func(t *testing.T) {
execModel := &models.Execution{}
tags := []string{"tags1", "tags2"}

err := UpdateExecutionModelAddTag(execModel, tags)
assert.Nil(t, err)
execTags := execModel.Tags
// Extract string of execTags
var execTagsString []string
for _, tag := range execTags {
execTagsString = append(execTagsString, tag.Name)
}
assert.Equal(t, tags, execTagsString)

var spec admin.ExecutionSpec
err = proto.Unmarshal(execModel.Spec, &spec)
assert.Nil(t, err)
assert.NotNil(t, spec)
assert.Equal(t, tags, spec.Tags)
})

t.Run("empty add tags", func(t *testing.T) {
execModel := &models.Execution{}
execModel.Tags = []models.AdminTag{
{Name: "tags1"},
{Name: "tags2"},
}
refTags := []string{"tags1", "tags2"}
tags := []string{""}

err := UpdateExecutionModelAddTag(execModel, tags)
assert.Nil(t, err)
var execTagsString []string
for _, tag := range execModel.Tags {
execTagsString = append(execTagsString, tag.Name)
}
assert.Nil(t, err)
assert.Equal(t, refTags, execTagsString)
})
t.Run("duplicate tags", func(t *testing.T) {
spec := testutils.GetExecutionRequest().Spec
spec.Tags = []string{"tags1", "tags2"}
specBytes, _ := proto.Marshal(spec)

execModel := &models.Execution{
Spec: specBytes,
}

execModel.Tags = []models.AdminTag{
{Name: "tags1"},
{Name: "tags2"},
}

tags := []string{"tags2", "tags3"}
err := UpdateExecutionModelAddTag(execModel, tags)
assert.Nil(t, err)

var execTagsString []string
for _, tag := range execModel.Tags {
execTagsString = append(execTagsString, tag.Name)
}
assert.Nil(t, err)
assert.Equal(t, []string{"tags1", "tags2", "tags3"}, execTagsString)
})
}

func TestTrimErrorMessage(t *testing.T) {
errMsg := "[1/1] currentAttempt done. Last Error: USER:: │\n│ ❱ 760 │ │ │ │ return __callback(*args, **kwargs) │\n││"
trimmedErrMessage := TrimErrorMessage(errMsg)
Expand Down
2 changes: 1 addition & 1 deletion flyteidl/boilerplate/flyte/golang_test_targets/go-gen.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ go generate ./...
if [ -n "$DELTA_CHECK" ]; then
DIRTY=$(git status --porcelain)
if [ -n "$DIRTY" ]; then
echo "FAILED: Go code updated without committing generated code."
echo "FAILED: Go code updated without commiting generated code."
echo "Ensure make generate has run and all changes are committed."
DIFF=$(git diff)
echo "diff detected: $DIFF"
Expand Down
118 changes: 96 additions & 22 deletions flyteidl/gen/pb-cpp/flyteidl/admin/execution.pb.cc

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit d40d544

Please sign in to comment.