Skip to content

Commit

Permalink
Save requested and effective timeouts to clickhouse (#8081)
Browse files Browse the repository at this point in the history
I expect each of these to add about 1% to the write throughput.
  • Loading branch information
vanja-p authored Dec 19, 2024
1 parent 0f1ef9a commit 1cad61d
Show file tree
Hide file tree
Showing 10 changed files with 45 additions and 4 deletions.
1 change: 1 addition & 0 deletions enterprise/server/remote_execution/execution_server/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ go_test(
"@org_golang_google_grpc//status",
"@org_golang_google_protobuf//testing/protocmp",
"@org_golang_google_protobuf//types/known/anypb",
"@org_golang_google_protobuf//types/known/durationpb",
"@org_golang_google_protobuf//types/known/timestamppb",
],
)
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,14 @@ func (s *ExecutionServer) updateExecution(ctx context.Context, executionID strin
return dbErr
}

func (s *ExecutionServer) recordExecution(ctx context.Context, executionID string, md *repb.ExecutedActionMetadata, auxMeta *espb.ExecutionAuxiliaryMetadata, properties *platform.Properties) error {
func (s *ExecutionServer) recordExecution(
ctx context.Context,
executionID string,
action *repb.Action,
md *repb.ExecutedActionMetadata,
auxMeta *espb.ExecutionAuxiliaryMetadata,
properties *platform.Properties) error {

if s.env.GetExecutionCollector() == nil || !olapdbconfig.WriteExecutionsToOLAPDBEnabled() {
return nil
}
Expand Down Expand Up @@ -369,6 +376,9 @@ func (s *ExecutionServer) recordExecution(ctx context.Context, executionID strin
executionProto.EffectiveIsolationType = auxMeta.GetIsolationType()
executionProto.RequestedIsolationType = platform.CoerceContainerType(properties.WorkloadIsolationType)

executionProto.EffectiveTimeoutUsec = auxMeta.GetTimeout().AsDuration().Microseconds()
executionProto.RequestedTimeoutUsec = action.GetTimeout().AsDuration().Microseconds()

executionProto.RequestedComputeUnits = properties.EstimatedComputeUnits
executionProto.RequestedMemoryBytes = properties.EstimatedMemoryBytes
executionProto.RequestedMilliCpu = properties.EstimatedMilliCPU
Expand Down Expand Up @@ -1053,6 +1063,7 @@ func (s *ExecutionServer) PublishOperation(stream repb.Execution_PublishOperatio

var auxMeta *espb.ExecutionAuxiliaryMetadata
var properties *platform.Properties
var action *repb.Action
if stage == repb.ExecutionStage_COMPLETED && response != nil {
auxMeta = new(espb.ExecutionAuxiliaryMetadata)
ok, err := rexec.AuxiliaryMetadata(response.GetResult().GetExecutionMetadata(), auxMeta)
Expand All @@ -1066,7 +1077,8 @@ func (s *ExecutionServer) PublishOperation(stream repb.Execution_PublishOperatio
return status.WrapErrorf(err, "Failed to parse taskID")
}
actionRN = digest.NewResourceName(actionRN.GetDigest(), actionRN.GetInstanceName(), rspb.CacheType_AC, actionRN.GetDigestFunction())
action, cmd, err := s.fetchActionAndCommand(ctx, actionRN)
var cmd *repb.Command
action, cmd, err = s.fetchActionAndCommand(ctx, actionRN)
if err != nil {
return status.UnavailableErrorf("Failed to fetch action and command: %s", err)
}
Expand Down Expand Up @@ -1101,7 +1113,7 @@ func (s *ExecutionServer) PublishOperation(stream repb.Execution_PublishOperatio
return status.WrapErrorf(err, "failed to update execution %q", taskID)
}
lastWrite = time.Now()
if err := s.recordExecution(ctx, taskID, response.GetResult().GetExecutionMetadata(), auxMeta, properties); err != nil {
if err := s.recordExecution(ctx, taskID, action, response.GetResult().GetExecutionMetadata(), auxMeta, properties); err != nil {
log.CtxErrorf(ctx, "failed to record execution %q: %s", taskID, err)
}
return nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
sipb "github.com/buildbuddy-io/buildbuddy/proto/stored_invocation"
gstatus "google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/durationpb"
tspb "google.golang.org/protobuf/types/known/timestamppb"
)

Expand Down Expand Up @@ -378,6 +379,7 @@ func testExecuteAndPublishOperation(t *testing.T, test publishTest) {
clientCtx = metadata.AppendToOutgoingContext(clientCtx, "x-buildbuddy-platform."+k, v)
}
arn := uploadAction(clientCtx, t, env, instanceName, digestFunction, &repb.Action{
Timeout: &durationpb.Duration{Seconds: 10},
DoNotCache: test.doNotCache,
Platform: &repb.Platform{Properties: []*repb.Platform_Property{
{Name: "EstimatedComputeUnits", Value: "2.5"},
Expand Down Expand Up @@ -419,6 +421,7 @@ func testExecuteAndPublishOperation(t *testing.T, test publishTest) {
}
if test.publishMoreMetadata {
aux.IsolationType = "firecracker"
aux.Timeout = &durationpb.Duration{Seconds: 11}
aux.ExecuteRequest = &repb.ExecuteRequest{
SkipCacheLookup: true, // This is only used for writing to clickhouse
ExecutionPolicy: &repb.ExecutionPolicy{Priority: 999},
Expand Down Expand Up @@ -534,18 +537,20 @@ func testExecuteAndPublishOperation(t *testing.T, test publishTest) {
RequestedMemoryBytes: 2000,
RequestedMilliCpu: 1500,
RequestedIsolationType: "oci",
RequestedTimeoutUsec: 10000000,
}
if test.publishMoreMetadata {
expectedExecution.ExecutionPriority = 999
expectedExecution.SkipCacheLookup = true
expectedExecution.EffectiveIsolationType = "firecracker"
expectedExecution.EstimatedFreeDiskBytes = 1001
expectedExecution.PreviousMeasuredMemoryBytes = 2001
expectedExecution.PreviousMeasuredMilliCpu = 2002
expectedExecution.PreviousMeasuredFreeDiskBytes = 2003
expectedExecution.PredictedMemoryBytes = 3001
expectedExecution.PredictedMilliCpu = 3002
expectedExecution.PredictedFreeDiskBytes = 3003
expectedExecution.EffectiveIsolationType = "firecracker"
expectedExecution.EffectiveTimeoutUsec = 11000000
}
diff := cmp.Diff(
expectedExecution,
Expand Down
1 change: 1 addition & 0 deletions enterprise/server/remote_execution/executor/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ go_library(
"//server/util/tracing",
"@com_github_prometheus_client_golang//prometheus",
"@org_golang_google_protobuf//types/known/anypb",
"@org_golang_google_protobuf//types/known/durationpb",
"@org_golang_google_protobuf//types/known/timestamppb",
],
)
2 changes: 2 additions & 0 deletions enterprise/server/remote_execution/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/buildbuddy-io/buildbuddy/server/util/tracing"
"github.com/prometheus/client_golang/prometheus"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"

espb "github.com/buildbuddy-io/buildbuddy/proto/execution_stats"
Expand Down Expand Up @@ -313,6 +314,7 @@ func (s *Executor) ExecuteTaskAndStreamResults(ctx context.Context, st *repb.Sch
// These errors are failure-specific. Pass through unchanged.
return finishWithErrFn(err)
}
auxMetadata.Timeout = durationpb.New(execTimeouts.TerminateAfter)

now := time.Now()
terminateAt := now.Add(execTimeouts.TerminateAfter)
Expand Down
2 changes: 2 additions & 0 deletions proto/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ proto_library(
":remote_execution_proto",
":scheduler_proto",
":stat_filter_proto",
"@com_google_protobuf//:duration_proto",
"@com_google_protobuf//:timestamp_proto",
"@googleapis//google/longrunning:operations_proto",
"@googleapis//google/rpc:status_proto",
Expand Down Expand Up @@ -1961,6 +1962,7 @@ ts_proto_library(
deps = [
":acl_ts_proto",
":context_ts_proto",
":duration_ts_proto",
":google_longrunning_ts_proto",
":grpc_status_ts_proto",
":invocation_status_ts_proto",
Expand Down
8 changes: 8 additions & 0 deletions proto/execution_stats.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ syntax = "proto3";

import "google/longrunning/operations.proto";
import "google/protobuf/timestamp.proto";
import "google/protobuf/duration.proto";
import "google/rpc/status.proto";
import "proto/acl.proto";
import "proto/context.proto";
Expand Down Expand Up @@ -135,9 +136,16 @@ message Execution {
message ExecutionAuxiliaryMetadata {
// Platform overrides set via remote header.
build.bazel.remote.execution.v2.Platform platform_overrides = 1;

// The effective isolation type. Usually either user requested or the
// default.
string isolation_type = 2;

build.bazel.remote.execution.v2.ExecuteRequest execute_request = 3;
scheduler.SchedulingMetadata scheduling_metadata = 4;

// The effective action timeout. Either user requested or the default.
google.protobuf.Duration timeout = 6;
}

message ExecutionLookup {
Expand Down
3 changes: 3 additions & 0 deletions proto/remote_execution.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2444,6 +2444,9 @@ message StoredExecution {
int64 output_upload_start_timestamp_usec = 26;
int64 output_upload_completed_timestamp_usec = 27;

int64 requested_timeout_usec = 55;
int64 effective_timeout_usec = 56;

int32 invocation_link_type = 28;

int32 status_code = 29;
Expand Down
2 changes: 2 additions & 0 deletions server/util/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,8 @@ func buildExecution(in *repb.StoredExecution, inv *sipb.StoredInvocation) *schem
SkipCacheLookup: in.GetSkipCacheLookup(),
RequestedIsolationType: in.GetRequestedIsolationType(),
EffectiveIsolationType: in.GetEffectiveIsolationType(),
EffectiveTimeoutUsec: in.GetRequestedTimeoutUsec(),
RequestedTimeoutUsec: in.GetEffectiveTimeoutUsec(),
InvocationLinkType: int8(in.GetInvocationLinkType()),
User: inv.GetUser(),
Host: inv.GetHost(),
Expand Down
5 changes: 5 additions & 0 deletions server/util/clickhouse/schema/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,9 @@ type Execution struct {
RequestedIsolationType string
EffectiveIsolationType string `gorm:"type:LowCardinality(String)"` // This values comes from the executor

RequestedTimeoutUsec int64
EffectiveTimeoutUsec int64

// Long string fields
OutputPath string
StatusMessage string
Expand Down Expand Up @@ -287,6 +290,8 @@ func (e *Execution) AdditionalFields() []string {
"ExecutionPriority",
"RequestedIsolationType",
"EffectiveIsolationType",
"RequestedTimeoutUsec",
"EffectiveTimeoutUsec",
}
}

Expand Down

0 comments on commit 1cad61d

Please sign in to comment.