diff --git a/enterprise/server/remote_execution/execution_server/BUILD b/enterprise/server/remote_execution/execution_server/BUILD index 3b492986e19..f025fac6aab 100644 --- a/enterprise/server/remote_execution/execution_server/BUILD +++ b/enterprise/server/remote_execution/execution_server/BUILD @@ -66,6 +66,7 @@ go_test( "//proto:remote_execution_go_proto", "//proto:resource_go_proto", "//proto:scheduler_go_proto", + "//proto:stored_invocation_go_proto", "//server/environment", "//server/interfaces", "//server/real_environment", @@ -80,6 +81,7 @@ go_test( "//server/util/prefix", "//server/util/proto", "//server/util/status", + "//server/util/testing/flags", "@com_github_go_redis_redis_v8//:redis", "@com_github_google_go_cmp//cmp", "@com_github_google_uuid//:uuid", diff --git a/enterprise/server/remote_execution/execution_server/execution_server.go b/enterprise/server/remote_execution/execution_server/execution_server.go index abc89bad8f7..b1d504f12a1 100644 --- a/enterprise/server/remote_execution/execution_server/execution_server.go +++ b/enterprise/server/remote_execution/execution_server/execution_server.go @@ -327,19 +327,16 @@ func (s *ExecutionServer) updateExecution(ctx context.Context, executionID strin dbErr = status.NotFoundErrorf("Unable to update execution; no execution exists with id %s.", executionID) } } - - if stage == repb.ExecutionStage_COMPLETED { - if err := s.recordExecution(ctx, executionID, executeResponse.GetResult().GetExecutionMetadata()); err != nil { - log.CtxErrorf(ctx, "failed to record execution %q: %s", executionID, err) - } - } return dbErr } -func (s *ExecutionServer) recordExecution(ctx context.Context, executionID string, md *repb.ExecutedActionMetadata) error { +func (s *ExecutionServer) recordExecution(ctx context.Context, executionID string, md *repb.ExecutedActionMetadata, auxMeta *espb.ExecutionAuxiliaryMetadata, properties *platform.Properties) error { if s.env.GetExecutionCollector() == nil || !olapdbconfig.WriteExecutionsToOLAPDBEnabled() { return nil } + if s.env.GetDBHandle() == nil { + return status.FailedPreconditionError("database not configured") + } var executionPrimaryDB tables.Execution if err := s.env.GetDBHandle().NewQuery(ctx, "execution_server_lookup_execution").Raw( @@ -367,6 +364,28 @@ func (s *ExecutionServer) recordExecution(ctx context.Context, executionID strin executionProto.DiskBytesWritten = md.GetUsageStats().GetCgroupIoStats().GetWbytes() executionProto.DiskWriteOperations = md.GetUsageStats().GetCgroupIoStats().GetWios() executionProto.DiskReadOperations = md.GetUsageStats().GetCgroupIoStats().GetRios() + + executionProto.EffectiveIsolationType = auxMeta.GetIsolationType() + executionProto.RequestedIsolationType = platform.CoerceContainerType(properties.WorkloadIsolationType) + + executionProto.RequestedComputeUnits = properties.EstimatedComputeUnits + executionProto.RequestedMemoryBytes = properties.EstimatedMemoryBytes + executionProto.RequestedMilliCpu = properties.EstimatedMilliCPU + executionProto.RequestedFreeDiskBytes = properties.EstimatedFreeDiskBytes + + schedulingMeta := auxMeta.GetSchedulingMetadata() + executionProto.EstimatedFreeDiskBytes = schedulingMeta.GetTaskSize().GetEstimatedFreeDiskBytes() + executionProto.PreviousMeasuredMemoryBytes = schedulingMeta.GetMeasuredTaskSize().GetEstimatedMemoryBytes() + executionProto.PreviousMeasuredMilliCpu = schedulingMeta.GetMeasuredTaskSize().GetEstimatedMilliCpu() + executionProto.PreviousMeasuredFreeDiskBytes = schedulingMeta.GetMeasuredTaskSize().GetEstimatedFreeDiskBytes() + executionProto.PredictedMemoryBytes = schedulingMeta.GetPredictedTaskSize().GetEstimatedMemoryBytes() + executionProto.PredictedMilliCpu = schedulingMeta.GetPredictedTaskSize().GetEstimatedMilliCpu() + executionProto.PredictedFreeDiskBytes = schedulingMeta.GetPredictedTaskSize().GetEstimatedFreeDiskBytes() + + request := auxMeta.GetExecuteRequest() + executionProto.SkipCacheLookup = request.GetSkipCacheLookup() + executionProto.ExecutionPriority = request.GetExecutionPolicy().GetPriority() + inv, err := s.env.GetExecutionCollector().GetInvocation(ctx, link.GetInvocationId()) if err != nil { log.CtxErrorf(ctx, "failed to get invocation %q from ExecutionCollector: %s", link.GetInvocationId(), err) @@ -1007,6 +1026,17 @@ func (s *ExecutionServer) PublishOperation(stream repb.Execution_PublishOperatio return err } + response := operation.ExtractExecuteResponse(op) + trimmedResponse := response.CloneVT() + if trimmedResponse.GetResult().GetExecutionMetadata() != nil { + // Auxiliary metadata shouldn't be sent to bazel or saved in + // the action cache. + trimmedResponse.GetResult().GetExecutionMetadata().AuxiliaryMetadata = nil + if err := op.GetResponse().MarshalFrom(trimmedResponse); err != nil { + return status.InternalErrorf("Failed to marshall trimmed response: %s", err) + } + } + mu.Lock() lastOp = op taskID = op.GetName() @@ -1020,24 +1050,33 @@ func (s *ExecutionServer) PublishOperation(stream repb.Execution_PublishOperatio log.CtxDebugf(ctx, "PublishOperation: stage: %s", stage) - var response *repb.ExecuteResponse // Only set if stage == COMPLETE - if stage == repb.ExecutionStage_COMPLETED { - response = operation.ExtractExecuteResponse(op) - } - if response != nil { // The execution completed - arn, err := digest.ParseUploadResourceName(taskID) + var auxMeta *espb.ExecutionAuxiliaryMetadata + var properties *platform.Properties + if stage == repb.ExecutionStage_COMPLETED && response != nil { + auxMeta = new(espb.ExecutionAuxiliaryMetadata) + ok, err := rexec.AuxiliaryMetadata(response.GetResult().GetExecutionMetadata(), auxMeta) + if err != nil { + log.CtxWarningf(ctx, "Failed to parse ExecutionAuxiliaryMetadata: %s", err) + } else if !ok { + log.CtxWarningf(ctx, "Failed to find ExecutionAuxiliaryMetadata: %s", err) + } + actionRN, err := digest.ParseUploadResourceName(taskID) if err != nil { return status.WrapErrorf(err, "Failed to parse taskID") } - arn = digest.NewResourceName(arn.GetDigest(), arn.GetInstanceName(), rspb.CacheType_AC, arn.GetDigestFunction()) - action, cmd, err := s.fetchActionAndCommand(ctx, arn) + actionRN = digest.NewResourceName(actionRN.GetDigest(), actionRN.GetInstanceName(), rspb.CacheType_AC, actionRN.GetDigestFunction()) + action, cmd, err := s.fetchActionAndCommand(ctx, actionRN) if err != nil { return status.UnavailableErrorf("Failed to fetch action and command: %s", err) } - if err := s.cacheActionResult(ctx, arn, response, action); err != nil { + properties, err = platform.ParseProperties(&repb.ExecutionTask{Action: action, Command: cmd, PlatformOverrides: auxMeta.GetPlatformOverrides()}) + if err != nil { + log.CtxWarningf(ctx, "Failed to parse platform properties: %s", err) + } + if err := s.cacheActionResult(ctx, actionRN, trimmedResponse, action); err != nil { return status.UnavailableErrorf("Error uploading action result: %s", err.Error()) } - if err := s.markTaskComplete(ctx, arn, response, action, cmd); err != nil { + if err := s.markTaskComplete(ctx, actionRN, response, action, cmd, properties); err != nil { // Errors updating the router or recording usage are non-fatal. log.CtxErrorf(ctx, "Could not update post-completion metadata: %s", err) } @@ -1061,6 +1100,9 @@ func (s *ExecutionServer) PublishOperation(stream repb.Execution_PublishOperatio return status.WrapErrorf(err, "failed to update execution %q", taskID) } lastWrite = time.Now() + if err := s.recordExecution(ctx, taskID, response.GetResult().GetExecutionMetadata(), auxMeta, properties); err != nil { + log.CtxErrorf(ctx, "failed to record execution %q: %s", taskID, err) + } return nil }() if err != nil { @@ -1068,6 +1110,8 @@ func (s *ExecutionServer) PublishOperation(stream repb.Execution_PublishOperatio } if response != nil { + // TODO(vanja) should this be done when the executor got a + // cache hit? if err := s.cacheExecuteResponse(ctx, taskID, response); err != nil { log.CtxErrorf(ctx, "Failed to cache execute response: %s", err) } @@ -1112,7 +1156,7 @@ func (s *ExecutionServer) cacheActionResult(ctx context.Context, actionResourceN // markTaskComplete contains logic to be run when the task is complete but // before letting the client know that the task has completed. -func (s *ExecutionServer) markTaskComplete(ctx context.Context, actionResourceName *digest.ResourceName, executeResponse *repb.ExecuteResponse, action *repb.Action, cmd *repb.Command) error { +func (s *ExecutionServer) markTaskComplete(ctx context.Context, actionResourceName *digest.ResourceName, executeResponse *repb.ExecuteResponse, action *repb.Action, cmd *repb.Command, properties *platform.Properties) error { execErr := gstatus.ErrorProto(executeResponse.GetStatus()) router := s.env.GetTaskRouter() // Only update the router if a task was actually executed @@ -1127,20 +1171,22 @@ func (s *ExecutionServer) markTaskComplete(ctx context.Context, actionResourceNa } if sizer := s.env.GetTaskSizer(); sizer != nil && execErr == nil && executeResponse.GetResult().GetExitCode() == 0 { + // TODO(vanja) should this be done when the executor got a cache hit? md := executeResponse.GetResult().GetExecutionMetadata() if err := sizer.Update(ctx, cmd, md); err != nil { log.CtxWarningf(ctx, "Failed to update task size: %s", err) } } - if err := s.updateUsage(ctx, action, cmd, executeResponse); err != nil { + if err := s.updateUsage(ctx, executeResponse, properties); err != nil { + // TODO(vanja) should this be done when the executor got a cache hit? log.CtxWarningf(ctx, "Failed to update usage for ExecuteResponse %+v: %s", executeResponse, err) } return nil } -func (s *ExecutionServer) updateUsage(ctx context.Context, action *repb.Action, cmd *repb.Command, executeResponse *repb.ExecuteResponse) error { +func (s *ExecutionServer) updateUsage(ctx context.Context, executeResponse *repb.ExecuteResponse, plat *platform.Properties) error { ut := s.env.GetUsageTracker() if ut == nil { return nil @@ -1159,21 +1205,6 @@ func (s *ExecutionServer) updateUsage(ctx context.Context, action *repb.Action, return err } - // Fill out an ExecutionTask with enough info to be able to parse the - // effective platform. - task := &repb.ExecutionTask{Action: action, Command: cmd} - md := &espb.ExecutionAuxiliaryMetadata{} - ok, err := rexec.AuxiliaryMetadata(executeResponse.Result.GetExecutionMetadata(), md) - if err != nil { - log.CtxWarningf(ctx, "Failed to parse auxiliary metadata: %s", err) - } else if ok { - task.PlatformOverrides = md.GetPlatformOverrides() - } - plat, err := platform.ParseProperties(task) - if err != nil { - return err - } - pool, err := s.env.GetSchedulerService().GetPoolInfo(ctx, plat.OS, plat.Pool, plat.WorkflowID, plat.PoolType) if err != nil { return status.InternalErrorf("failed to determine executor pool: %s", err) diff --git a/enterprise/server/remote_execution/execution_server/execution_server_test.go b/enterprise/server/remote_execution/execution_server/execution_server_test.go index 43740459584..4f4cd2d040a 100644 --- a/enterprise/server/remote_execution/execution_server/execution_server_test.go +++ b/enterprise/server/remote_execution/execution_server/execution_server_test.go @@ -3,6 +3,7 @@ package execution_server_test import ( "context" "io" + "strings" "testing" "time" @@ -26,6 +27,7 @@ import ( "github.com/buildbuddy-io/buildbuddy/server/util/prefix" "github.com/buildbuddy-io/buildbuddy/server/util/proto" "github.com/buildbuddy-io/buildbuddy/server/util/status" + "github.com/buildbuddy-io/buildbuddy/server/util/testing/flags" "github.com/go-redis/redis/v8" "github.com/google/go-cmp/cmp" "github.com/google/uuid" @@ -40,6 +42,7 @@ import ( repb "github.com/buildbuddy-io/buildbuddy/proto/remote_execution" rspb "github.com/buildbuddy-io/buildbuddy/proto/resource" scpb "github.com/buildbuddy-io/buildbuddy/proto/scheduler" + sipb "github.com/buildbuddy-io/buildbuddy/proto/stored_invocation" gstatus "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/anypb" tspb "google.golang.org/protobuf/types/known/timestamppb" @@ -123,7 +126,7 @@ func TestDispatch(t *testing.T) { ctx, err := env.GetAuthenticator().(*testauth.TestAuthenticator).WithAuthenticatedUser(ctx, "US1") require.NoError(t, err) - arn := uploadEmptyAction(ctx, t, env, "" /*=instanceName*/, repb.DigestFunction_SHA256, false /*=doNotCache*/) + arn := uploadAction(ctx, t, env, "" /*=instanceName*/, repb.DigestFunction_SHA256, &repb.Action{}) ad := arn.GetDigest() // note: AttachUserPrefix is normally done by Execute(), which wraps @@ -297,6 +300,11 @@ func TestExecuteAndPublishOperation(t *testing.T) { expectedExecutionUsage: tables.UsageCounts{LinuxExecutionDurationUsec: durationUsec}, status: status.AbortedError("foo"), }, + { + name: "PublishMoreMetadata", + expectedExecutionUsage: tables.UsageCounts{LinuxExecutionDurationUsec: durationUsec}, + publishMoreMetadata: true, + }, } { t.Run(test.name, func(t *testing.T) { testExecuteAndPublishOperation(t, test) @@ -311,11 +319,50 @@ type publishTest struct { cachedResult, doNotCache bool status error exitCode int32 + publishMoreMetadata bool +} + +type fakeCollector struct { + interfaces.ExecutionCollector + invocationLinks []*sipb.StoredInvocationLink + executions []*repb.StoredExecution +} + +func (fc *fakeCollector) DeleteInvocationLinks(_ context.Context, _ string) error { + return nil +} + +func (fc *fakeCollector) AddInvocationLink(_ context.Context, link *sipb.StoredInvocationLink) error { + fc.invocationLinks = append(fc.invocationLinks, link) + return nil +} + +func (fc *fakeCollector) GetInvocationLinks(_ context.Context, executionID string) ([]*sipb.StoredInvocationLink, error) { + var res []*sipb.StoredInvocationLink + for _, link := range fc.invocationLinks { + if link.GetExecutionId() == executionID { + res = append(res, link) + } + } + return res, nil +} + +func (fc *fakeCollector) GetInvocation(_ context.Context, invocationID string) (*sipb.StoredInvocation, error) { + // return nil to always force AppendExecution calls. + return nil, nil +} + +func (fc *fakeCollector) AppendExecution(_ context.Context, _ string, execution *repb.StoredExecution) error { + fc.executions = append(fc.executions, execution) + return nil } func testExecuteAndPublishOperation(t *testing.T, test publishTest) { ctx := context.Background() + flags.Set(t, "app.enable_write_executions_to_olap_db", true) env, conn := setupEnv(t) + execCollector := new(fakeCollector) + env.SetExecutionCollector(execCollector) client := repb.NewExecutionClient(conn) const instanceName = "test-instance" @@ -323,11 +370,23 @@ func testExecuteAndPublishOperation(t *testing.T, test publishTest) { const digestFunction = repb.DigestFunction_SHA256 // Schedule execution - clientCtx := ctx + clientCtx, err := bazel_request.WithRequestMetadata(ctx, &repb.RequestMetadata{ + ToolInvocationId: invocationID, + }) + require.NoError(t, err) for k, v := range test.platformOverrides { clientCtx = metadata.AppendToOutgoingContext(clientCtx, "x-buildbuddy-platform."+k, v) } - arn := uploadEmptyAction(clientCtx, t, env, instanceName, digestFunction, test.doNotCache) + arn := uploadAction(clientCtx, t, env, instanceName, digestFunction, &repb.Action{ + DoNotCache: test.doNotCache, + Platform: &repb.Platform{Properties: []*repb.Platform_Property{ + {Name: "EstimatedComputeUnits", Value: "2.5"}, + {Name: "EstimatedFreeDiskBytes", Value: "1000"}, + {Name: "EstimatedCPU", Value: "1.5"}, + {Name: "EstimatedMemory", Value: "2000"}, + {Name: "workload-isolation-type", Value: "oci"}, + }}, + }) executionClient, err := client.Execute(clientCtx, &repb.ExecuteRequest{ InstanceName: arn.GetInstanceName(), ActionDigest: arn.GetDigest(), @@ -344,10 +403,7 @@ func testExecuteAndPublishOperation(t *testing.T, test publishTest) { // Simulate execution: set up a PublishOperation stream and publish an // ExecuteResponse to it. - executorCtx, err := bazel_request.WithRequestMetadata(ctx, &repb.RequestMetadata{ - ToolInvocationId: invocationID, - }) - executorCtx = metadata.AppendToOutgoingContext(executorCtx, "x-buildbuddy-client", "executor") + executorCtx := metadata.AppendToOutgoingContext(clientCtx, "x-buildbuddy-client", "executor") require.NoError(t, err) stream, err := client.PublishOperation(executorCtx) require.NoError(t, err) @@ -361,6 +417,26 @@ func testExecuteAndPublishOperation(t *testing.T, test publishTest) { &repb.Platform_Property{Name: k, Value: v}, ) } + if test.publishMoreMetadata { + aux.IsolationType = "firecracker" + aux.ExecuteRequest = &repb.ExecuteRequest{ + SkipCacheLookup: true, // This is only used for writing to clickhouse + ExecutionPolicy: &repb.ExecutionPolicy{Priority: 999}, + } + aux.SchedulingMetadata = &scpb.SchedulingMetadata{ + TaskSize: &scpb.TaskSize{EstimatedFreeDiskBytes: 1001}, + MeasuredTaskSize: &scpb.TaskSize{ + EstimatedMemoryBytes: 2001, + EstimatedMilliCpu: 2002, + EstimatedFreeDiskBytes: 2003, + }, + PredictedTaskSize: &scpb.TaskSize{ + EstimatedMemoryBytes: 3001, + EstimatedMilliCpu: 3002, + EstimatedFreeDiskBytes: 3003, + }, + } + } auxAny, err := anypb.New(aux) require.NoError(t, err) actionResult := &repb.ActionResult{ @@ -371,6 +447,7 @@ func testExecuteAndPublishOperation(t *testing.T, test publishTest) { WorkerStartTimestamp: tspb.New(workerStartTime), WorkerCompletedTimestamp: tspb.New(workerEndTime), AuxiliaryMetadata: []*anypb.Any{auxAny}, + DoNotCache: test.doNotCache, }, } expectedExecuteResponse := &repb.ExecuteResponse{ @@ -389,6 +466,9 @@ func testExecuteAndPublishOperation(t *testing.T, test publishTest) { _, err = stream.CloseAndRecv() require.NoError(t, err) + trimmedExecuteResponse := expectedExecuteResponse.CloneVT() + trimmedExecuteResponse.GetResult().GetExecutionMetadata().AuxiliaryMetadata = nil + // Wait for the execute response to be streamed back on our initial // /Execute stream. var executeResponse *repb.ExecuteResponse @@ -404,14 +484,14 @@ func testExecuteAndPublishOperation(t *testing.T, test publishTest) { } executeResponse = operation.ExtractExecuteResponse(op) } - assert.Empty(t, cmp.Diff(expectedExecuteResponse, executeResponse, protocmp.Transform())) + assert.Empty(t, cmp.Diff(trimmedExecuteResponse, executeResponse, protocmp.Transform())) // Check that the action cache contains the right entry, if any. arn.ToProto().CacheType = rspb.CacheType_AC cachedActionResult, err := cachetools.GetActionResult(ctx, env.GetActionCacheClient(), arn) if !test.doNotCache && test.exitCode == 0 && test.status == nil && !test.cachedResult { require.NoError(t, err) - assert.Empty(t, cmp.Diff(expectedExecuteResponse.GetResult(), cachedActionResult, protocmp.Transform())) + assert.Empty(t, cmp.Diff(trimmedExecuteResponse.GetResult(), cachedActionResult, protocmp.Transform())) } else { require.Equal(t, codes.NotFound, gstatus.Code(err), "Error should be NotFound, but is %v", err) } @@ -437,6 +517,46 @@ func testExecuteAndPublishOperation(t *testing.T, test publishTest) { counts: test.expectedExecutionUsage, }, }, executionUsages) + + // Check that we recorded the executions + assert.Equal(t, 1, len(execCollector.executions)) + expectedExecution := &repb.StoredExecution{ + ExecutionId: taskID, + InvocationLinkType: 1, + InvocationUuid: strings.ReplaceAll(invocationID, "-", ""), + Stage: 4, + StatusCode: int32(gstatus.Code(test.status)), + ExitCode: test.exitCode, + DoNotCache: test.doNotCache, + CachedResult: test.cachedResult, + RequestedComputeUnits: 2.5, + RequestedFreeDiskBytes: 1000, + RequestedMemoryBytes: 2000, + RequestedMilliCpu: 1500, + RequestedIsolationType: "oci", + } + if test.publishMoreMetadata { + expectedExecution.ExecutionPriority = 999 + expectedExecution.SkipCacheLookup = true + expectedExecution.EffectiveIsolationType = "firecracker" + expectedExecution.EstimatedFreeDiskBytes = 1001 + expectedExecution.PreviousMeasuredMemoryBytes = 2001 + expectedExecution.PreviousMeasuredMilliCpu = 2002 + expectedExecution.PreviousMeasuredFreeDiskBytes = 2003 + expectedExecution.PredictedMemoryBytes = 3001 + expectedExecution.PredictedMilliCpu = 3002 + expectedExecution.PredictedFreeDiskBytes = 3003 + } + diff := cmp.Diff( + expectedExecution, + execCollector.executions[0], + protocmp.Transform(), + protocmp.IgnoreFields( + &repb.StoredExecution{}, + "created_at_usec", + "updated_at_usec", + )) + assert.Emptyf(t, diff, "Recorded execution didn't match the expected one: %s", expectedExecution) } func TestMarkFailed(t *testing.T) { @@ -488,11 +608,11 @@ func TestMarkFailed(t *testing.T) { } -func uploadEmptyAction(ctx context.Context, t *testing.T, env *real_environment.RealEnv, instanceName string, df repb.DigestFunction_Value, doNotCache bool) *digest.ResourceName { +func uploadAction(ctx context.Context, t *testing.T, env *real_environment.RealEnv, instanceName string, df repb.DigestFunction_Value, action *repb.Action) *digest.ResourceName { cmd := &repb.Command{Arguments: []string{"test"}} cd, err := cachetools.UploadProto(ctx, env.GetByteStreamClient(), instanceName, df, cmd) require.NoError(t, err) - action := &repb.Action{CommandDigest: cd, DoNotCache: doNotCache} + action.CommandDigest = cd ad, err := cachetools.UploadProto(ctx, env.GetByteStreamClient(), instanceName, df, action) require.NoError(t, err) return digest.NewResourceName(ad, instanceName, rspb.CacheType_CAS, df) diff --git a/enterprise/server/remote_execution/executor/executor.go b/enterprise/server/remote_execution/executor/executor.go index 990af824370..51ae69f9b3b 100644 --- a/enterprise/server/remote_execution/executor/executor.go +++ b/enterprise/server/remote_execution/executor/executor.go @@ -199,7 +199,7 @@ func (s *Executor) ExecuteTaskAndStreamResults(ctx context.Context, st *repb.Sch defer stop() } - task := st.ExecutionTask + task := st.GetExecutionTask() req := task.GetExecuteRequest() taskID := task.GetExecutionId() adInstanceDigest := digest.NewResourceName(req.GetActionDigest(), req.GetInstanceName(), rspb.CacheType_AC, req.GetDigestFunction()) @@ -207,7 +207,20 @@ func (s *Executor) ExecuteTaskAndStreamResults(ctx context.Context, st *repb.Sch task.ExecuteRequest.DigestFunction = digestFunction acClient := s.env.GetActionCacheClient() - stateChangeFn := operation.GetStateChangeFunc(stream, taskID, adInstanceDigest) + auxMetadata := &espb.ExecutionAuxiliaryMetadata{ + PlatformOverrides: task.GetPlatformOverrides(), + ExecuteRequest: task.GetExecuteRequest(), + SchedulingMetadata: st.GetSchedulingMetadata(), + } + opStateChangeFn := operation.GetStateChangeFunc(stream, taskID, adInstanceDigest) + stateChangeFn := operation.StateChangeFunc(func(stage repb.ExecutionStage_Value, execResponse *repb.ExecuteResponse) error { + if stage == repb.ExecutionStage_COMPLETED { + if err := appendAuxiliaryMetadata(execResponse.GetResult().GetExecutionMetadata(), auxMetadata); err != nil { + log.CtxWarningf(ctx, "Failed to append ExecutionAuxiliaryMetadata: %s", err) + } + } + return opStateChangeFn(stage, execResponse) + }) md := &repb.ExecutedActionMetadata{ Worker: s.hostID, QueuedTimestamp: task.QueuedTimestamp, @@ -218,15 +231,15 @@ func (s *Executor) ExecuteTaskAndStreamResults(ctx context.Context, st *repb.Sch EstimatedTaskSize: st.GetSchedulingMetadata().GetTaskSize(), DoNotCache: task.GetAction().GetDoNotCache(), } - auxMetadata := &espb.ExecutionAuxiliaryMetadata{ - PlatformOverrides: task.PlatformOverrides, - } finishWithErrFn := func(finalErr error) (retry bool, err error) { if shouldRetry(task, finalErr) { return true, finalErr } resp := operation.ErrorResponse(finalErr) md.WorkerCompletedTimestamp = timestamppb.Now() + if err := appendAuxiliaryMetadata(md, auxMetadata); err != nil { + log.CtxWarningf(ctx, "Failed to append ExecutionAuxiliaryMetadata: %s", err) + } resp.Result = &repb.ActionResult{ ExecutionMetadata: md, } @@ -235,9 +248,6 @@ func (s *Executor) ExecuteTaskAndStreamResults(ctx context.Context, st *repb.Sch } return false, finalErr } - if err := appendAuxiliaryMetadata(md, auxMetadata); err != nil { - return finishWithErrFn(status.InternalErrorf("append auxiliary metadata: %s", err)) - } stage := &stagedGauge{estimatedSize: md.EstimatedTaskSize} defer stage.End() @@ -268,6 +278,7 @@ func (s *Executor) ExecuteTaskAndStreamResults(ctx context.Context, st *repb.Sch if err != nil { return finishWithErrFn(status.WrapErrorf(err, "error creating runner for command")) } + auxMetadata.IsolationType = r.GetIsolationType() actionMetrics.Isolation = r.GetIsolationType() finishedCleanly := false defer func() { diff --git a/enterprise/server/remote_execution/platform/platform.go b/enterprise/server/remote_execution/platform/platform.go index 48a78c398c2..c6e4f9daf01 100644 --- a/enterprise/server/remote_execution/platform/platform.go +++ b/enterprise/server/remote_execution/platform/platform.go @@ -5,6 +5,7 @@ import ( "encoding/base64" "fmt" "runtime" + "slices" "strconv" "strings" "time" @@ -136,11 +137,25 @@ const ( FirecrackerContainerType ContainerType = "firecracker" OCIContainerType ContainerType = "oci" SandboxContainerType ContainerType = "sandbox" + // If you add a container type, also add it to KnownContainerTypes // The app will mint a signed client identity token to workflows. workflowClientIdentityTokenLifetime = 12 * time.Hour ) +// KnownContainerTypes are all the types that are currently supported, or were +// previously supported. +var KnownContainerTypes []ContainerType = []ContainerType{BareContainerType, PodmanContainerType, DockerContainerType, FirecrackerContainerType, OCIContainerType, SandboxContainerType} + +// CoerceContainerType returns t if it is in KnownContainerTypes. Otherwise it +// returns "Unknown". +func CoerceContainerType(t string) string { + if slices.Contains(KnownContainerTypes, ContainerType(t)) { + return t + } + return "unknown" +} + func VFSEnabled() bool { return *enableVFS } diff --git a/enterprise/server/test/integration/remote_execution/remote_execution_test.go b/enterprise/server/test/integration/remote_execution/remote_execution_test.go index d532152d403..b4d46376da3 100644 --- a/enterprise/server/test/integration/remote_execution/remote_execution_test.go +++ b/enterprise/server/test/integration/remote_execution/remote_execution_test.go @@ -197,6 +197,9 @@ func TestSimpleCommand_Timeout_StdoutStderrStillVisible(t *testing.T) { assert.Equal(t, 1, int(taskCount-initialTaskCount), "unexpected number of tasks started") execRes, err := execution.GetCachedExecuteResponse(ctx, rbe.GetActionResultStorageClient(), res.ID) require.NoError(t, err) + // The ExecuteResponse will have auxiliary metadata, while the ActionResult will not. + assert.NotEmpty(t, execRes.GetResult().GetExecutionMetadata().GetAuxiliaryMetadata()) + execRes.GetResult().GetExecutionMetadata().AuxiliaryMetadata = nil assert.Empty( t, cmp.Diff(res.ActionResult, execRes.GetResult(), protocmp.Transform()), diff --git a/proto/execution_stats.proto b/proto/execution_stats.proto index 08216b0636b..9ee0fb9ea88 100644 --- a/proto/execution_stats.proto +++ b/proto/execution_stats.proto @@ -135,6 +135,9 @@ message Execution { message ExecutionAuxiliaryMetadata { // Platform overrides set via remote header. build.bazel.remote.execution.v2.Platform platform_overrides = 1; + string isolation_type = 2; + build.bazel.remote.execution.v2.ExecuteRequest execute_request = 3; + scheduler.SchedulingMetadata scheduling_metadata = 4; } message ExecutionLookup { diff --git a/proto/remote_execution.proto b/proto/remote_execution.proto index 1d81d266873..e272d5d9e75 100644 --- a/proto/remote_execution.proto +++ b/proto/remote_execution.proto @@ -2412,9 +2412,26 @@ message StoredExecution { int64 disk_read_operations = 36; int64 disk_write_operations = 37; - // Task Sizing + // Effective task sizing int64 estimated_memory_bytes = 17; int64 estimated_milli_cpu = 18; + int64 estimated_free_disk_bytes = 40; + + // Task size requested by the user. + double requested_compute_units = 53; + int64 requested_memory_bytes = 41; + int64 requested_milli_cpu = 42; + int64 requested_free_disk_bytes = 43; + + // Measured task size from previous executions. + int64 previous_measured_memory_bytes = 44; + int64 previous_measured_milli_cpu = 45; + int64 previous_measured_free_disk_bytes = 46; + + // Task size predicted by machine learning. + int64 predicted_memory_bytes = 47; + int64 predicted_milli_cpu = 48; + int64 predicted_free_disk_bytes = 49; // ExecutedActionMetadata int64 queued_timestamp_usec = 19; @@ -2434,6 +2451,11 @@ message StoredExecution { bool cached_result = 38; bool do_not_cache = 39; + bool skip_cache_lookup = 50; + + int32 execution_priority = 51; + string requested_isolation_type = 52; + string effective_isolation_type = 54; string output_path = 31; string status_message = 32; diff --git a/server/util/clickhouse/clickhouse.go b/server/util/clickhouse/clickhouse.go index 0814929ea9a..867863265ac 100644 --- a/server/util/clickhouse/clickhouse.go +++ b/server/util/clickhouse/clickhouse.go @@ -219,6 +219,17 @@ func buildExecution(in *repb.StoredExecution, inv *sipb.StoredInvocation) *schem DiskWriteOperations: in.GetDiskWriteOperations(), EstimatedMemoryBytes: in.GetEstimatedMemoryBytes(), EstimatedMilliCPU: in.GetEstimatedMilliCpu(), + EstimatedFreeDiskBytes: in.GetEstimatedFreeDiskBytes(), + RequestedComputeUnits: in.GetRequestedComputeUnits(), + RequestedMemoryBytes: in.GetRequestedMemoryBytes(), + RequestedMilliCPU: in.GetRequestedMilliCpu(), + RequestedFreeDiskBytes: in.GetRequestedFreeDiskBytes(), + PreviousMeasuredMemoryBytes: in.GetPreviousMeasuredMemoryBytes(), + PreviousMeasuredMilliCPU: in.GetPreviousMeasuredMilliCpu(), + PreviousMeasuredFreeDiskBytes: in.GetPreviousMeasuredFreeDiskBytes(), + PredictedMemoryBytes: in.GetPredictedMemoryBytes(), + PredictedMilliCPU: in.GetPredictedMilliCpu(), + PredictedFreeDiskBytes: in.GetPredictedFreeDiskBytes(), QueuedTimestampUsec: in.GetQueuedTimestampUsec(), WorkerStartTimestampUsec: in.GetWorkerStartTimestampUsec(), WorkerCompletedTimestampUsec: in.GetWorkerCompletedTimestampUsec(), @@ -230,8 +241,11 @@ func buildExecution(in *repb.StoredExecution, inv *sipb.StoredInvocation) *schem OutputUploadCompletedTimestampUsec: in.GetOutputUploadCompletedTimestampUsec(), StatusCode: in.GetStatusCode(), ExitCode: in.GetExitCode(), - DoNotCache: in.GetDoNotCache(), CachedResult: in.GetCachedResult(), + DoNotCache: in.GetDoNotCache(), + SkipCacheLookup: in.GetSkipCacheLookup(), + RequestedIsolationType: in.GetRequestedIsolationType(), + EffectiveIsolationType: in.GetEffectiveIsolationType(), InvocationLinkType: int8(in.GetInvocationLinkType()), User: inv.GetUser(), Host: inv.GetHost(), diff --git a/server/util/clickhouse/schema/schema.go b/server/util/clickhouse/schema/schema.go index 3714e84d96f..43d34e89145 100644 --- a/server/util/clickhouse/schema/schema.go +++ b/server/util/clickhouse/schema/schema.go @@ -30,7 +30,7 @@ const ( // Making a new table? Please make sure you: // 1) Add your table in getAllTables() // 2) Add the table in clickhouse_test.go TestSchemaInSync -// 3) Make sure all the fields in the corresponding Table deinition in tables.go +// 3) Make sure all the fields in the corresponding Table definition in tables.go // are present in clickhouse Table definition or in ExcludedFields() type Table interface { TableName() string @@ -179,8 +179,19 @@ type Execution struct { DiskWriteOperations int64 // Task sizing - EstimatedMemoryBytes int64 - EstimatedMilliCPU int64 + EstimatedMemoryBytes int64 + EstimatedMilliCPU int64 + EstimatedFreeDiskBytes int64 + RequestedComputeUnits float64 + RequestedMemoryBytes int64 + RequestedMilliCPU int64 + RequestedFreeDiskBytes int64 + PreviousMeasuredMemoryBytes int64 + PreviousMeasuredMilliCPU int64 + PreviousMeasuredFreeDiskBytes int64 + PredictedMemoryBytes int64 + PredictedMilliCPU int64 + PredictedFreeDiskBytes int64 // ExecutedActionMetadata (in addition to Worker above) QueuedTimestampUsec int64 @@ -196,8 +207,13 @@ type Execution struct { StatusCode int32 ExitCode int32 - CachedResult bool - DoNotCache bool + CachedResult bool + DoNotCache bool + SkipCacheLookup bool + + ExecutionPriority int32 + RequestedIsolationType string + EffectiveIsolationType string `gorm:"type:LowCardinality(String)"` // This values comes from the executor // Long string fields OutputPath string @@ -256,6 +272,21 @@ func (e *Execution) AdditionalFields() []string { "DiskBytesWritten", "DiskReadOperations", "DiskWriteOperations", + "EstimatedFreeDiskBytes", + "RequestedComputeUnits", + "RequestedMemoryBytes", + "RequestedMilliCPU", + "RequestedFreeDiskBytes", + "PreviousMeasuredMemoryBytes", + "PreviousMeasuredMilliCPU", + "PreviousMeasuredFreeDiskBytes", + "PredictedMemoryBytes", + "PredictedMilliCPU", + "PredictedFreeDiskBytes", + "SkipCacheLookup", + "ExecutionPriority", + "RequestedIsolationType", + "EffectiveIsolationType", } }