From c331161f55477c1a8f238797a8045693945798dd Mon Sep 17 00:00:00 2001 From: Vadim Berezniker Date: Wed, 7 Aug 2024 15:19:36 -0700 Subject: [PATCH] Add support for "teeing" shared executor pool work to a separate pool. (#7165) --- .../remote_execution/execution_server/BUILD | 3 + .../execution_server/execution_server.go | 119 ++++++++++++++---- .../scheduler_server/scheduler_server.go | 5 +- server/interfaces/interfaces.go | 3 + 4 files changed, 105 insertions(+), 25 deletions(-) diff --git a/enterprise/server/remote_execution/execution_server/BUILD b/enterprise/server/remote_execution/execution_server/BUILD index fffb8874fad..1ded0b0d76b 100644 --- a/enterprise/server/remote_execution/execution_server/BUILD +++ b/enterprise/server/remote_execution/execution_server/BUILD @@ -32,6 +32,7 @@ go_library( "//server/util/background", "//server/util/bazel_request", "//server/util/db", + "//server/util/flag", "//server/util/log", "//server/util/perms", "//server/util/prefix", @@ -42,8 +43,10 @@ go_library( "@com_github_go_redis_redis_v8//:redis", "@com_github_prometheus_client_golang//prometheus", "@org_golang_google_genproto//googleapis/longrunning", + "@org_golang_google_grpc//metadata", "@org_golang_google_grpc//status", "@org_golang_google_protobuf//types/known/timestamppb", + "@org_golang_x_time//rate", ], ) diff --git a/enterprise/server/remote_execution/execution_server/execution_server.go b/enterprise/server/remote_execution/execution_server/execution_server.go index 1d60e885652..bd1fd7f6e49 100644 --- a/enterprise/server/remote_execution/execution_server/execution_server.go +++ b/enterprise/server/remote_execution/execution_server/execution_server.go @@ -3,7 +3,6 @@ package execution_server import ( "context" "encoding/base64" - "flag" "fmt" "io" "path/filepath" @@ -30,6 +29,7 @@ import ( "github.com/buildbuddy-io/buildbuddy/server/util/background" "github.com/buildbuddy-io/buildbuddy/server/util/bazel_request" "github.com/buildbuddy-io/buildbuddy/server/util/db" + "github.com/buildbuddy-io/buildbuddy/server/util/flag" "github.com/buildbuddy-io/buildbuddy/server/util/log" "github.com/buildbuddy-io/buildbuddy/server/util/perms" "github.com/buildbuddy-io/buildbuddy/server/util/prefix" @@ -39,7 +39,9 @@ import ( "github.com/buildbuddy-io/buildbuddy/server/util/usageutil" "github.com/go-redis/redis/v8" "github.com/prometheus/client_golang/prometheus" + "golang.org/x/time/rate" "google.golang.org/genproto/googleapis/longrunning" + "google.golang.org/grpc/metadata" "google.golang.org/protobuf/types/known/timestamppb" espb "github.com/buildbuddy-io/buildbuddy/proto/execution_stats" @@ -58,10 +60,19 @@ const ( // be discarded after this time. There may be multiple waiters for a single // action so we cannot discard the channel immediately. completedPubSubChanExpiration = 15 * time.Minute + + // When teeing executor work to experiment executors, use this instance name + // to identify teed tasks and avoid populating ActionResults under the + // real instance name. + teeInstanceName = "tee20240805" + // When teeing executor work, the experiment executors are expected to be + // registered under this name. + teePoolName = "tee" ) var ( enableRedisAvailabilityMonitoring = flag.Bool("remote_execution.enable_redis_availability_monitoring", false, "If enabled, the execution server will detect if Redis has lost state and will ask Bazel to retry executions.") + sharedExecutorPoolTeeRate = flag.Float64("remote_execution.shared_executor_pool_tee_rate", 0, "If non-zero, work for the default shared executor pool will be teed to a separate experiment pool at this rate.", flag.Internal) ) func fillExecutionFromActionMetadata(md *repb.ExecutedActionMetadata, execution *tables.Execution) { @@ -124,6 +135,7 @@ type ExecutionServer struct { rdb redis.UniversalClient streamPubSub *pubsub.StreamPubSub enableRedisAvailabilityMonitoring bool + teeLimiter *rate.Limiter } func Register(env *real_environment.RealEnv) error { @@ -148,12 +160,17 @@ func NewExecutionServer(env environment.Env) (*ExecutionServer, error) { if env.GetRemoteExecutionRedisClient() == nil || env.GetRemoteExecutionRedisPubSubClient() == nil { return nil, status.FailedPreconditionErrorf("Redis is required for remote execution") } + var teeLimiter *rate.Limiter + if *sharedExecutorPoolTeeRate > 0 { + teeLimiter = rate.NewLimiter(rate.Limit(*sharedExecutorPoolTeeRate), 1) + } return &ExecutionServer{ env: env, cache: cache, rdb: env.GetRemoteExecutionRedisClient(), streamPubSub: pubsub.NewStreamPubSub(env.GetRemoteExecutionRedisPubSubClient()), enableRedisAvailabilityMonitoring: remote_execution_config.RemoteExecutionEnabled() && *enableRedisAvailabilityMonitoring, + teeLimiter: teeLimiter, }, nil } @@ -421,43 +438,90 @@ func (s *ExecutionServer) Execute(req *repb.ExecuteRequest, stream repb.Executio return s.execute(req, stream) } +func (s *ExecutionServer) teeExecution(ctx context.Context, originalExecutionID string, req *repb.ExecuteRequest) error { + if s.teeLimiter == nil { + return nil + } + + if !s.teeLimiter.Allow() { + return nil + } + + ctx, cancel := background.ExtendContextForFinalization(ctx, 5*time.Second) + // Tee the work in the background so that the original request is not impacted. + go func() { + defer cancel() + + log.CtxInfof(ctx, "Teeing execution corresponding to original execution %q", originalExecutionID) + teeReq := proto.Clone(req).(*repb.ExecuteRequest) + teeReq.InstanceName = teeInstanceName + + md, ok := metadata.FromIncomingContext(ctx) + if !ok { + return + } + md["x-buildbuddy-platform.pool"] = []string{teePoolName} + ctx = metadata.NewIncomingContext(ctx, md) + + id, _, err := s.dispatch(ctx, teeReq, &dispatchOpts{teedRequest: true}) + if err != nil { + log.CtxWarningf(ctx, "Could not tee execution %q: %s", originalExecutionID, err) + return + } + log.CtxInfof(ctx, "Teed execution %q for original execution %q", id, originalExecutionID) + }() + return nil +} + func (s *ExecutionServer) Dispatch(ctx context.Context, req *repb.ExecuteRequest) (string, error) { - return s.dispatch(ctx, req, true /*=recordActionMergingState*/) + id, pool, err := s.dispatch(ctx, req, &dispatchOpts{recordActionMergingState: true}) + if err == nil && pool.IsShared && pool.Name == "" { + if err := s.teeExecution(ctx, id, req); err != nil { + log.CtxWarningf(ctx, "Could not tee execution: %s", err) + } + } + return id, err } func (s *ExecutionServer) dispatchHedge(ctx context.Context, req *repb.ExecuteRequest) (string, error) { - return s.dispatch(ctx, req, false /*=recordActionMergingState*/) + id, _, err := s.dispatch(ctx, req, &dispatchOpts{recordActionMergingState: false}) + return id, err } -func (s *ExecutionServer) dispatch(ctx context.Context, req *repb.ExecuteRequest, recordActionMergingState bool) (string, error) { +type dispatchOpts struct { + recordActionMergingState bool + teedRequest bool +} + +func (s *ExecutionServer) dispatch(ctx context.Context, req *repb.ExecuteRequest, opts *dispatchOpts) (string, *interfaces.PoolInfo, error) { ctx, span := tracing.StartSpan(ctx) defer span.End() scheduler := s.env.GetSchedulerService() if scheduler == nil { - return "", status.FailedPreconditionErrorf("No scheduler service configured") + return "", nil, status.FailedPreconditionErrorf("No scheduler service configured") } sizer := s.env.GetTaskSizer() if sizer == nil { - return "", status.FailedPreconditionError("No task sizer configured") + return "", nil, status.FailedPreconditionError("No task sizer configured") } adInstanceDigest := digest.NewResourceName(req.GetActionDigest(), req.GetInstanceName(), rspb.CacheType_CAS, req.GetDigestFunction()) action := &repb.Action{} if err := cachetools.ReadProtoFromCAS(ctx, s.cache, adInstanceDigest, action); err != nil { log.CtxWarningf(ctx, "Error fetching action: %s", err.Error()) - return "", err + return "", nil, err } cmdInstanceDigest := digest.NewResourceName(action.GetCommandDigest(), req.GetInstanceName(), rspb.CacheType_CAS, req.GetDigestFunction()) command := &repb.Command{} if err := cachetools.ReadProtoFromCAS(ctx, s.cache, cmdInstanceDigest, command); err != nil { log.CtxWarningf(ctx, "Error fetching command: %s", err.Error()) - return "", err + return "", nil, err } r := digest.NewResourceName(req.GetActionDigest(), req.GetInstanceName(), rspb.CacheType_CAS, req.GetDigestFunction()) executionID, err := r.UploadString() if err != nil { - return "", err + return "", nil, err } tracing.AddStringAttributeToCurrentSpan(ctx, "task_id", executionID) @@ -476,10 +540,14 @@ func (s *ExecutionServer) dispatch(ctx context.Context, req *repb.ExecuteRequest } if err := s.insertExecution(ctx, executionID, invocationID, generateCommandSnippet(command), repb.ExecutionStage_UNKNOWN); err != nil { - return "", err + return "", nil, err } - if err := s.insertInvocationLink(ctx, executionID, invocationID, sipb.StoredInvocationLink_NEW); err != nil { - return "", err + + // Don't associate teed requests with the original invocation. + if !opts.teedRequest { + if err := s.insertInvocationLink(ctx, executionID, invocationID, sipb.StoredInvocationLink_NEW); err != nil { + return "", nil, err + } } executionTask := &repb.ExecutionTask{ @@ -507,22 +575,22 @@ func (s *ExecutionServer) dispatch(ctx context.Context, req *repb.ExecuteRequest props, err := platform.ParseProperties(executionTask) if err != nil { - return "", err + return "", nil, err } // Add in secrets for any action explicitly requesting secrets, and all workflows. secretService := s.env.GetSecretService() if props.IncludeSecrets { if secretService == nil { - return "", status.FailedPreconditionError("Secrets requested but secret service not available") + return "", nil, status.FailedPreconditionError("Secrets requested but secret service not available") } envVars, err := secretService.GetSecretEnvVars(ctx, taskGroupID) if err != nil { - return "", err + return "", nil, err } envVars, err = gcplink.ExchangeRefreshTokenForAuthToken(ctx, envVars, platform.IsCICommand(command)) if err != nil { - return "", err + return "", nil, err } executionTask.Command.EnvironmentVariables = append(executionTask.Command.EnvironmentVariables, envVars...) } @@ -531,7 +599,7 @@ func (s *ExecutionServer) dispatch(ctx context.Context, req *repb.ExecuteRequest serializedTask, err := proto.Marshal(executionTask) if err != nil { // Should never happen. - return "", status.InternalErrorf("Error marshalling execution task %q: %s", executionID, err) + return "", nil, status.InternalErrorf("Error marshalling execution task %q: %s", executionID, err) } taskSize := tasksize.Estimate(executionTask) @@ -543,14 +611,14 @@ func (s *ExecutionServer) dispatch(ctx context.Context, req *repb.ExecuteRequest pool, err := s.env.GetSchedulerService().GetPoolInfo(ctx, props.OS, props.Pool, props.WorkflowID, props.UseSelfHostedExecutors) if err != nil { - return "", err + return "", nil, err } metrics.RemoteExecutionRequests.With(prometheus.Labels{metrics.GroupID: taskGroupID, metrics.OS: props.OS, metrics.Arch: props.Arch}).Inc() if s.enableRedisAvailabilityMonitoring { if err := s.streamPubSub.CreateMonitoredChannel(ctx, redisKeyForMonitoredTaskStatusStream(executionID)); err != nil { - return "", err + return "", nil, err } } @@ -570,7 +638,7 @@ func (s *ExecutionServer) dispatch(ctx context.Context, req *repb.ExecuteRequest SerializedTask: serializedTask, } - if recordActionMergingState { + if opts.recordActionMergingState { if err := action_merger.RecordQueuedExecution(ctx, s.rdb, executionID, r); err != nil { log.CtxWarningf(ctx, "could not record queued pending execution %q: %s", executionID, err) } @@ -579,13 +647,13 @@ func (s *ExecutionServer) dispatch(ctx context.Context, req *repb.ExecuteRequest if _, err := scheduler.ScheduleTask(ctx, scheduleReq); err != nil { ctx, cancel := background.ExtendContextForFinalization(ctx, 10*time.Second) defer cancel() - if recordActionMergingState { + if opts.recordActionMergingState { _ = action_merger.DeletePendingExecution(ctx, s.rdb, executionID) } - return "", status.UnavailableErrorf("Error scheduling execution task %q: %s", executionID, err) + return "", nil, status.UnavailableErrorf("Error scheduling execution task %q: %s", executionID, err) } - return executionID, nil + return executionID, pool, nil } func (s *ExecutionServer) execute(req *repb.ExecuteRequest, stream streamLike) error { @@ -1024,6 +1092,11 @@ func (s *ExecutionServer) markTaskComplete(ctx context.Context, taskID string, e router.MarkComplete(ctx, cmd, actionResourceName.GetInstanceName(), executorHostID) } + // Skip sizer and usage updates for teed work. + if actionResourceName.GetInstanceName() == teeInstanceName { + return nil + } + if sizer := s.env.GetTaskSizer(); sizer != nil { md := executeResponse.GetResult().GetExecutionMetadata() if err := sizer.Update(ctx, cmd, md); err != nil { diff --git a/enterprise/server/scheduling/scheduler_server/scheduler_server.go b/enterprise/server/scheduling/scheduler_server/scheduler_server.go index 5fe4fce9125..e2063c4e4ff 100644 --- a/enterprise/server/scheduling/scheduler_server/scheduler_server.go +++ b/enterprise/server/scheduling/scheduler_server/scheduler_server.go @@ -1000,8 +1000,9 @@ func (s *SchedulerServer) GetPoolInfo(ctx context.Context, os, requestedPool, wo } sharedPool := &interfaces.PoolInfo{ - GroupID: *sharedExecutorPoolGroupID, - Name: sharedPoolName, + GroupID: *sharedExecutorPoolGroupID, + IsShared: true, + Name: sharedPoolName, } // Linux workflows use shared executors unless self_hosted is set. diff --git a/server/interfaces/interfaces.go b/server/interfaces/interfaces.go index 8cdaf800e46..e8f6c08df4e 100644 --- a/server/interfaces/interfaces.go +++ b/server/interfaces/interfaces.go @@ -846,6 +846,9 @@ type PoolInfo struct { // IsSelfHosted is whether the pool consists of self-hosted executors. IsSelfHosted bool + + // True if the GroupID corresponds to the shared executor group ID. + IsShared bool } type ExecutionService interface {