Skip to content

Commit

Permalink
revert admin changes to make in another PR
Browse files Browse the repository at this point in the history
Signed-off-by: Paul Dittamo <[email protected]>
  • Loading branch information
pvditt committed Jan 13, 2024
1 parent f338e7e commit 361134b
Show file tree
Hide file tree
Showing 8 changed files with 352 additions and 66 deletions.
12 changes: 9 additions & 3 deletions flyteadmin/pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1724,21 +1724,27 @@ func (m *ExecutionManager) GetExecutionData(
return nil, err
}
}
inputs, err := util.GetInputs(ctx, m.storageClient, executionModel.InputsURI.String())
inputs, inputURLBlob, err := util.GetInputs(ctx, m.urlData, m.config.ApplicationConfiguration().GetRemoteDataConfig(),
m.storageClient, executionModel.InputsURI.String())
if err != nil {
return nil, err
}
outputs, err := util.GetOutputs(ctx, m.config.ApplicationConfiguration().GetRemoteDataConfig(),
outputs, outputURLBlob, err := util.GetOutputs(ctx, m.urlData, m.config.ApplicationConfiguration().GetRemoteDataConfig(),
m.storageClient, util.ToExecutionClosureInterface(execution.Closure))
if err != nil {
return nil, err
}
response := &admin.WorkflowExecutionGetDataResponse{
Inputs: inputURLBlob,
Outputs: outputURLBlob,
FullInputs: inputs,
FullOutputs: outputs,
}

if response.FullOutputs != nil {
m.userMetrics.WorkflowExecutionInputBytes.Observe(float64(response.Inputs.Bytes))
if response.Outputs.Bytes > 0 {
m.userMetrics.WorkflowExecutionOutputBytes.Observe(float64(response.Outputs.Bytes))
} else if response.FullOutputs != nil {
m.userMetrics.WorkflowExecutionOutputBytes.Observe(float64(proto.Size(response.FullOutputs)))
}
return response, nil
Expand Down
104 changes: 104 additions & 0 deletions flyteadmin/pkg/manager/impl/execution_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -3594,7 +3595,22 @@ func TestGetExecutionData(t *testing.T) {
}, nil
}
mockExecutionRemoteURL := dataMocks.NewMockRemoteURL()
mockExecutionRemoteURL.(*dataMocks.MockRemoteURL).GetCallback = func(
ctx context.Context, uri string) (admin.UrlBlob, error) {
if uri == outputURI {
return admin.UrlBlob{
Url: "outputs",
Bytes: 200,
}, nil
} else if strings.HasSuffix(uri, shared.Inputs) {
return admin.UrlBlob{
Url: "inputs",
Bytes: 200,
}, nil
}

return admin.UrlBlob{}, errors.New("unexpected input")
}
mockStorage := commonMocks.GetMockStorageClient()
fullInputs := &core.LiteralMap{
Literals: map[string]*core.Literal{
Expand Down Expand Up @@ -3629,6 +3645,14 @@ func TestGetExecutionData(t *testing.T) {
})
assert.Nil(t, err)
assert.True(t, proto.Equal(&admin.WorkflowExecutionGetDataResponse{
Outputs: &admin.UrlBlob{
Url: "outputs",
Bytes: 200,
},
Inputs: &admin.UrlBlob{
Url: "inputs",
Bytes: 200,
},
FullInputs: fullInputs,
FullOutputs: fullOutputs,
}, dataResponse))
Expand Down Expand Up @@ -3757,6 +3781,86 @@ func TestGetExecution_Legacy(t *testing.T) {
assert.True(t, proto.Equal(getLegacyClosure(), execution.Closure))
}

func TestGetExecutionData_LegacyModel(t *testing.T) {
repository := repositoryMocks.NewMockRepository()
startedAt := time.Date(2018, 8, 30, 0, 0, 0, 0, time.UTC)
closure := getLegacyClosure()
closure.OutputResult = &admin.ExecutionClosure_Outputs{
Outputs: &admin.LiteralMapBlob{
Data: &admin.LiteralMapBlob_Uri{
Uri: outputURI,
},
},
}
var closureBytes, _ = proto.Marshal(closure)

executionGetFunc := func(ctx context.Context, input interfaces.Identifier) (models.Execution, error) {
return models.Execution{
BaseModel: models.BaseModel{
CreatedAt: testutils.MockCreatedAtValue,
},
ExecutionKey: models.ExecutionKey{
Project: "project",
Domain: "domain",
Name: "name",
},
Spec: getLegacySpecBytes(),
Phase: phase,
Closure: closureBytes,
LaunchPlanID: uint(1),
WorkflowID: uint(2),
StartedAt: &startedAt,
}, nil
}
mockExecutionRemoteURL := dataMocks.NewMockRemoteURL()
mockExecutionRemoteURL.(*dataMocks.MockRemoteURL).GetCallback = func(
ctx context.Context, uri string) (admin.UrlBlob, error) {
if uri == outputURI {
return admin.UrlBlob{
Url: "outputs",
Bytes: 200,
}, nil
} else if strings.HasSuffix(uri, shared.Inputs) {
return admin.UrlBlob{
Url: "inputs",
Bytes: 200,
}, nil
}

return admin.UrlBlob{}, errors.New("unexpected input")
}

repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetGetCallback(executionGetFunc)
storageClient := getMockStorageForExecTest(context.Background())
r := plugins.NewRegistry()
r.RegisterDefault(plugins.PluginIDWorkflowExecutor, &defaultTestExecutor)
execManager := NewExecutionManager(repository, r, getMockExecutionsConfigProvider(), storageClient, mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil, nil, &eventWriterMocks.WorkflowExecutionEventWriter{}, artifacts.NewArtifactRegistry(context.Background(), nil))
dataResponse, err := execManager.GetExecutionData(context.Background(), admin.WorkflowExecutionGetDataRequest{
Id: &executionIdentifier,
})
assert.Nil(t, err)
assert.True(t, proto.Equal(&admin.WorkflowExecutionGetDataResponse{
Outputs: &admin.UrlBlob{
Url: "outputs",
Bytes: 200,
},
Inputs: &admin.UrlBlob{
Url: "inputs",
Bytes: 200,
},
FullInputs: &core.LiteralMap{
Literals: map[string]*core.Literal{
"foo": testutils.MakeStringLiteral("foo-value-1"),
},
},
FullOutputs: &core.LiteralMap{},
}, dataResponse))
var inputs core.LiteralMap
err = storageClient.ReadProtobuf(context.Background(), storage.DataReference("s3://bucket/metadata/project/domain/name/inputs"), &inputs)
assert.Nil(t, err)
assert.True(t, proto.Equal(&inputs, closure.ComputedInputs))
}

func TestCreateExecution_LegacyClient(t *testing.T) {
repository := getMockRepositoryForExecTest()
setDefaultLpCallbackForExecTest(repository)
Expand Down
12 changes: 9 additions & 3 deletions flyteadmin/pkg/manager/impl/node_execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,18 +495,21 @@ func (m *NodeExecutionManager) GetNodeExecutionData(
return nil, err
}

inputs, err := util.GetInputs(ctx, m.storageClient, nodeExecution.InputUri)
inputs, inputURLBlob, err := util.GetInputs(ctx, m.urlData, m.config.ApplicationConfiguration().GetRemoteDataConfig(),
m.storageClient, nodeExecution.InputUri)
if err != nil {
return nil, err
}

outputs, err := util.GetOutputs(ctx, m.config.ApplicationConfiguration().GetRemoteDataConfig(),
outputs, outputURLBlob, err := util.GetOutputs(ctx, m.urlData, m.config.ApplicationConfiguration().GetRemoteDataConfig(),
m.storageClient, nodeExecution.Closure)
if err != nil {
return nil, err
}

response := &admin.NodeExecutionGetDataResponse{
Inputs: inputURLBlob,
Outputs: outputURLBlob,
FullInputs: inputs,
FullOutputs: outputs,
FlyteUrls: common.FlyteURLsFromNodeExecutionID(*request.Id, nodeExecution.GetClosure() != nil && nodeExecution.GetClosure().GetDeckUri() != ""),
Expand All @@ -533,7 +536,10 @@ func (m *NodeExecutionManager) GetNodeExecutionData(
}
}

if response.FullOutputs != nil {
m.metrics.NodeExecutionInputBytes.Observe(float64(response.Inputs.Bytes))
if response.Outputs.Bytes > 0 {
m.metrics.NodeExecutionOutputBytes.Observe(float64(response.Outputs.Bytes))
} else if response.FullOutputs != nil {
m.metrics.NodeExecutionOutputBytes.Observe(float64(proto.Size(response.FullOutputs)))
}

Expand Down
13 changes: 13 additions & 0 deletions flyteadmin/pkg/manager/impl/node_execution_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1254,6 +1254,11 @@ func TestGetNodeExecutionData(t *testing.T) {
Url: "inputs",
Bytes: 100,
}, nil
} else if uri == util.OutputsFile {
return admin.UrlBlob{
Url: "outputs",
Bytes: 200,
}, nil
}

return admin.UrlBlob{}, errors.New("unexpected input")
Expand Down Expand Up @@ -1293,6 +1298,14 @@ func TestGetNodeExecutionData(t *testing.T) {
})
assert.NoError(t, err)
assert.True(t, proto.Equal(&admin.NodeExecutionGetDataResponse{
Inputs: &admin.UrlBlob{
Url: "inputs",
Bytes: 100,
},
Outputs: &admin.UrlBlob{
Url: "outputs",
Bytes: 200,
},
FullInputs: fullInputs,
FullOutputs: fullOutputs,
DynamicWorkflow: &admin.DynamicWorkflowNodeMetadata{
Expand Down
12 changes: 9 additions & 3 deletions flyteadmin/pkg/manager/impl/task_execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,23 +310,29 @@ func (m *TaskExecutionManager) GetTaskExecutionData(
return nil, err
}

inputs, err := util.GetInputs(ctx, m.storageClient, taskExecution.InputUri)
inputs, inputURLBlob, err := util.GetInputs(ctx, m.urlData, m.config.ApplicationConfiguration().GetRemoteDataConfig(),
m.storageClient, taskExecution.InputUri)
if err != nil {
return nil, err
}
outputs, err := util.GetOutputs(ctx, m.config.ApplicationConfiguration().GetRemoteDataConfig(),
outputs, outputURLBlob, err := util.GetOutputs(ctx, m.urlData, m.config.ApplicationConfiguration().GetRemoteDataConfig(),
m.storageClient, taskExecution.Closure)
if err != nil {
return nil, err
}

response := &admin.TaskExecutionGetDataResponse{
Inputs: inputURLBlob,
Outputs: outputURLBlob,
FullInputs: inputs,
FullOutputs: outputs,
FlyteUrls: common.FlyteURLsFromTaskExecutionID(*request.Id, false),
}

if response.FullOutputs != nil {
m.metrics.TaskExecutionInputBytes.Observe(float64(response.Inputs.Bytes))
if response.Outputs.Bytes > 0 {
m.metrics.TaskExecutionOutputBytes.Observe(float64(response.Outputs.Bytes))
} else if response.FullOutputs != nil {
m.metrics.TaskExecutionOutputBytes.Observe(float64(proto.Size(response.FullOutputs)))
}
return response, nil
Expand Down
22 changes: 22 additions & 0 deletions flyteadmin/pkg/manager/impl/task_execution_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -897,7 +897,21 @@ func TestGetTaskExecutionData(t *testing.T) {
}, nil
})
mockTaskExecutionRemoteURL = dataMocks.NewMockRemoteURL()
mockTaskExecutionRemoteURL.(*dataMocks.MockRemoteURL).GetCallback = func(ctx context.Context, uri string) (admin.UrlBlob, error) {
if uri == "input-uri.pb" {
return admin.UrlBlob{
Url: "inputs",
Bytes: 100,
}, nil
} else if uri == "test-output.pb" {
return admin.UrlBlob{
Url: "outputs",
Bytes: 200,
}, nil
}

return admin.UrlBlob{}, errors.New("unexpected input")
}
mockStorage := commonMocks.GetMockStorageClient()
fullInputs := &core.LiteralMap{
Literals: map[string]*core.Literal{
Expand Down Expand Up @@ -933,6 +947,14 @@ func TestGetTaskExecutionData(t *testing.T) {
assert.Nil(t, err)
assert.True(t, getTaskCalled)
assert.True(t, proto.Equal(&admin.TaskExecutionGetDataResponse{
Inputs: &admin.UrlBlob{
Url: "inputs",
Bytes: 100,
},
Outputs: &admin.UrlBlob{
Url: "outputs",
Bytes: 200,
},
FullInputs: fullInputs,
FullOutputs: fullOutputs,
FlyteUrls: &admin.FlyteURLs{
Expand Down
Loading

0 comments on commit 361134b

Please sign in to comment.