Skip to content

Commit

Permalink
Add support for "teeing" shared executor pool work to a separate pool. (
Browse files Browse the repository at this point in the history
  • Loading branch information
vadimberezniker authored Aug 7, 2024
1 parent e535d48 commit c331161
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 25 deletions.
3 changes: 3 additions & 0 deletions enterprise/server/remote_execution/execution_server/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ go_library(
"//server/util/background",
"//server/util/bazel_request",
"//server/util/db",
"//server/util/flag",
"//server/util/log",
"//server/util/perms",
"//server/util/prefix",
Expand All @@ -42,8 +43,10 @@ go_library(
"@com_github_go_redis_redis_v8//:redis",
"@com_github_prometheus_client_golang//prometheus",
"@org_golang_google_genproto//googleapis/longrunning",
"@org_golang_google_grpc//metadata",
"@org_golang_google_grpc//status",
"@org_golang_google_protobuf//types/known/timestamppb",
"@org_golang_x_time//rate",
],
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package execution_server
import (
"context"
"encoding/base64"
"flag"
"fmt"
"io"
"path/filepath"
Expand All @@ -30,6 +29,7 @@ import (
"github.com/buildbuddy-io/buildbuddy/server/util/background"
"github.com/buildbuddy-io/buildbuddy/server/util/bazel_request"
"github.com/buildbuddy-io/buildbuddy/server/util/db"
"github.com/buildbuddy-io/buildbuddy/server/util/flag"
"github.com/buildbuddy-io/buildbuddy/server/util/log"
"github.com/buildbuddy-io/buildbuddy/server/util/perms"
"github.com/buildbuddy-io/buildbuddy/server/util/prefix"
Expand All @@ -39,7 +39,9 @@ import (
"github.com/buildbuddy-io/buildbuddy/server/util/usageutil"
"github.com/go-redis/redis/v8"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/time/rate"
"google.golang.org/genproto/googleapis/longrunning"
"google.golang.org/grpc/metadata"
"google.golang.org/protobuf/types/known/timestamppb"

espb "github.com/buildbuddy-io/buildbuddy/proto/execution_stats"
Expand All @@ -58,10 +60,19 @@ const (
// be discarded after this time. There may be multiple waiters for a single
// action so we cannot discard the channel immediately.
completedPubSubChanExpiration = 15 * time.Minute

// When teeing executor work to experiment executors, use this instance name
// to identify teed tasks and avoid populating ActionResults under the
// real instance name.
teeInstanceName = "tee20240805"
// When teeing executor work, the experiment executors are expected to be
// registered under this name.
teePoolName = "tee"
)

var (
enableRedisAvailabilityMonitoring = flag.Bool("remote_execution.enable_redis_availability_monitoring", false, "If enabled, the execution server will detect if Redis has lost state and will ask Bazel to retry executions.")
sharedExecutorPoolTeeRate = flag.Float64("remote_execution.shared_executor_pool_tee_rate", 0, "If non-zero, work for the default shared executor pool will be teed to a separate experiment pool at this rate.", flag.Internal)
)

func fillExecutionFromActionMetadata(md *repb.ExecutedActionMetadata, execution *tables.Execution) {
Expand Down Expand Up @@ -124,6 +135,7 @@ type ExecutionServer struct {
rdb redis.UniversalClient
streamPubSub *pubsub.StreamPubSub
enableRedisAvailabilityMonitoring bool
teeLimiter *rate.Limiter
}

func Register(env *real_environment.RealEnv) error {
Expand All @@ -148,12 +160,17 @@ func NewExecutionServer(env environment.Env) (*ExecutionServer, error) {
if env.GetRemoteExecutionRedisClient() == nil || env.GetRemoteExecutionRedisPubSubClient() == nil {
return nil, status.FailedPreconditionErrorf("Redis is required for remote execution")
}
var teeLimiter *rate.Limiter
if *sharedExecutorPoolTeeRate > 0 {
teeLimiter = rate.NewLimiter(rate.Limit(*sharedExecutorPoolTeeRate), 1)
}
return &ExecutionServer{
env: env,
cache: cache,
rdb: env.GetRemoteExecutionRedisClient(),
streamPubSub: pubsub.NewStreamPubSub(env.GetRemoteExecutionRedisPubSubClient()),
enableRedisAvailabilityMonitoring: remote_execution_config.RemoteExecutionEnabled() && *enableRedisAvailabilityMonitoring,
teeLimiter: teeLimiter,
}, nil
}

Expand Down Expand Up @@ -421,43 +438,90 @@ func (s *ExecutionServer) Execute(req *repb.ExecuteRequest, stream repb.Executio
return s.execute(req, stream)
}

func (s *ExecutionServer) teeExecution(ctx context.Context, originalExecutionID string, req *repb.ExecuteRequest) error {
if s.teeLimiter == nil {
return nil
}

if !s.teeLimiter.Allow() {
return nil
}

ctx, cancel := background.ExtendContextForFinalization(ctx, 5*time.Second)
// Tee the work in the background so that the original request is not impacted.
go func() {
defer cancel()

log.CtxInfof(ctx, "Teeing execution corresponding to original execution %q", originalExecutionID)
teeReq := proto.Clone(req).(*repb.ExecuteRequest)
teeReq.InstanceName = teeInstanceName

md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return
}
md["x-buildbuddy-platform.pool"] = []string{teePoolName}
ctx = metadata.NewIncomingContext(ctx, md)

id, _, err := s.dispatch(ctx, teeReq, &dispatchOpts{teedRequest: true})
if err != nil {
log.CtxWarningf(ctx, "Could not tee execution %q: %s", originalExecutionID, err)
return
}
log.CtxInfof(ctx, "Teed execution %q for original execution %q", id, originalExecutionID)
}()
return nil
}

func (s *ExecutionServer) Dispatch(ctx context.Context, req *repb.ExecuteRequest) (string, error) {
return s.dispatch(ctx, req, true /*=recordActionMergingState*/)
id, pool, err := s.dispatch(ctx, req, &dispatchOpts{recordActionMergingState: true})
if err == nil && pool.IsShared && pool.Name == "" {
if err := s.teeExecution(ctx, id, req); err != nil {
log.CtxWarningf(ctx, "Could not tee execution: %s", err)
}
}
return id, err
}

func (s *ExecutionServer) dispatchHedge(ctx context.Context, req *repb.ExecuteRequest) (string, error) {
return s.dispatch(ctx, req, false /*=recordActionMergingState*/)
id, _, err := s.dispatch(ctx, req, &dispatchOpts{recordActionMergingState: false})
return id, err
}

func (s *ExecutionServer) dispatch(ctx context.Context, req *repb.ExecuteRequest, recordActionMergingState bool) (string, error) {
type dispatchOpts struct {
recordActionMergingState bool
teedRequest bool
}

func (s *ExecutionServer) dispatch(ctx context.Context, req *repb.ExecuteRequest, opts *dispatchOpts) (string, *interfaces.PoolInfo, error) {
ctx, span := tracing.StartSpan(ctx)
defer span.End()

scheduler := s.env.GetSchedulerService()
if scheduler == nil {
return "", status.FailedPreconditionErrorf("No scheduler service configured")
return "", nil, status.FailedPreconditionErrorf("No scheduler service configured")
}
sizer := s.env.GetTaskSizer()
if sizer == nil {
return "", status.FailedPreconditionError("No task sizer configured")
return "", nil, status.FailedPreconditionError("No task sizer configured")
}
adInstanceDigest := digest.NewResourceName(req.GetActionDigest(), req.GetInstanceName(), rspb.CacheType_CAS, req.GetDigestFunction())
action := &repb.Action{}
if err := cachetools.ReadProtoFromCAS(ctx, s.cache, adInstanceDigest, action); err != nil {
log.CtxWarningf(ctx, "Error fetching action: %s", err.Error())
return "", err
return "", nil, err
}
cmdInstanceDigest := digest.NewResourceName(action.GetCommandDigest(), req.GetInstanceName(), rspb.CacheType_CAS, req.GetDigestFunction())
command := &repb.Command{}
if err := cachetools.ReadProtoFromCAS(ctx, s.cache, cmdInstanceDigest, command); err != nil {
log.CtxWarningf(ctx, "Error fetching command: %s", err.Error())
return "", err
return "", nil, err
}

r := digest.NewResourceName(req.GetActionDigest(), req.GetInstanceName(), rspb.CacheType_CAS, req.GetDigestFunction())
executionID, err := r.UploadString()
if err != nil {
return "", err
return "", nil, err
}

tracing.AddStringAttributeToCurrentSpan(ctx, "task_id", executionID)
Expand All @@ -476,10 +540,14 @@ func (s *ExecutionServer) dispatch(ctx context.Context, req *repb.ExecuteRequest
}

if err := s.insertExecution(ctx, executionID, invocationID, generateCommandSnippet(command), repb.ExecutionStage_UNKNOWN); err != nil {
return "", err
return "", nil, err
}
if err := s.insertInvocationLink(ctx, executionID, invocationID, sipb.StoredInvocationLink_NEW); err != nil {
return "", err

// Don't associate teed requests with the original invocation.
if !opts.teedRequest {
if err := s.insertInvocationLink(ctx, executionID, invocationID, sipb.StoredInvocationLink_NEW); err != nil {
return "", nil, err
}
}

executionTask := &repb.ExecutionTask{
Expand Down Expand Up @@ -507,22 +575,22 @@ func (s *ExecutionServer) dispatch(ctx context.Context, req *repb.ExecuteRequest

props, err := platform.ParseProperties(executionTask)
if err != nil {
return "", err
return "", nil, err
}

// Add in secrets for any action explicitly requesting secrets, and all workflows.
secretService := s.env.GetSecretService()
if props.IncludeSecrets {
if secretService == nil {
return "", status.FailedPreconditionError("Secrets requested but secret service not available")
return "", nil, status.FailedPreconditionError("Secrets requested but secret service not available")
}
envVars, err := secretService.GetSecretEnvVars(ctx, taskGroupID)
if err != nil {
return "", err
return "", nil, err
}
envVars, err = gcplink.ExchangeRefreshTokenForAuthToken(ctx, envVars, platform.IsCICommand(command))
if err != nil {
return "", err
return "", nil, err
}
executionTask.Command.EnvironmentVariables = append(executionTask.Command.EnvironmentVariables, envVars...)
}
Expand All @@ -531,7 +599,7 @@ func (s *ExecutionServer) dispatch(ctx context.Context, req *repb.ExecuteRequest
serializedTask, err := proto.Marshal(executionTask)
if err != nil {
// Should never happen.
return "", status.InternalErrorf("Error marshalling execution task %q: %s", executionID, err)
return "", nil, status.InternalErrorf("Error marshalling execution task %q: %s", executionID, err)
}

taskSize := tasksize.Estimate(executionTask)
Expand All @@ -543,14 +611,14 @@ func (s *ExecutionServer) dispatch(ctx context.Context, req *repb.ExecuteRequest

pool, err := s.env.GetSchedulerService().GetPoolInfo(ctx, props.OS, props.Pool, props.WorkflowID, props.UseSelfHostedExecutors)
if err != nil {
return "", err
return "", nil, err
}

metrics.RemoteExecutionRequests.With(prometheus.Labels{metrics.GroupID: taskGroupID, metrics.OS: props.OS, metrics.Arch: props.Arch}).Inc()

if s.enableRedisAvailabilityMonitoring {
if err := s.streamPubSub.CreateMonitoredChannel(ctx, redisKeyForMonitoredTaskStatusStream(executionID)); err != nil {
return "", err
return "", nil, err
}
}

Expand All @@ -570,7 +638,7 @@ func (s *ExecutionServer) dispatch(ctx context.Context, req *repb.ExecuteRequest
SerializedTask: serializedTask,
}

if recordActionMergingState {
if opts.recordActionMergingState {
if err := action_merger.RecordQueuedExecution(ctx, s.rdb, executionID, r); err != nil {
log.CtxWarningf(ctx, "could not record queued pending execution %q: %s", executionID, err)
}
Expand All @@ -579,13 +647,13 @@ func (s *ExecutionServer) dispatch(ctx context.Context, req *repb.ExecuteRequest
if _, err := scheduler.ScheduleTask(ctx, scheduleReq); err != nil {
ctx, cancel := background.ExtendContextForFinalization(ctx, 10*time.Second)
defer cancel()
if recordActionMergingState {
if opts.recordActionMergingState {
_ = action_merger.DeletePendingExecution(ctx, s.rdb, executionID)
}
return "", status.UnavailableErrorf("Error scheduling execution task %q: %s", executionID, err)
return "", nil, status.UnavailableErrorf("Error scheduling execution task %q: %s", executionID, err)
}

return executionID, nil
return executionID, pool, nil
}

func (s *ExecutionServer) execute(req *repb.ExecuteRequest, stream streamLike) error {
Expand Down Expand Up @@ -1024,6 +1092,11 @@ func (s *ExecutionServer) markTaskComplete(ctx context.Context, taskID string, e
router.MarkComplete(ctx, cmd, actionResourceName.GetInstanceName(), executorHostID)
}

// Skip sizer and usage updates for teed work.
if actionResourceName.GetInstanceName() == teeInstanceName {
return nil
}

if sizer := s.env.GetTaskSizer(); sizer != nil {
md := executeResponse.GetResult().GetExecutionMetadata()
if err := sizer.Update(ctx, cmd, md); err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1000,8 +1000,9 @@ func (s *SchedulerServer) GetPoolInfo(ctx context.Context, os, requestedPool, wo
}

sharedPool := &interfaces.PoolInfo{
GroupID: *sharedExecutorPoolGroupID,
Name: sharedPoolName,
GroupID: *sharedExecutorPoolGroupID,
IsShared: true,
Name: sharedPoolName,
}

// Linux workflows use shared executors unless self_hosted is set.
Expand Down
3 changes: 3 additions & 0 deletions server/interfaces/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -846,6 +846,9 @@ type PoolInfo struct {

// IsSelfHosted is whether the pool consists of self-hosted executors.
IsSelfHosted bool

// True if the GroupID corresponds to the shared executor group ID.
IsShared bool
}

type ExecutionService interface {
Expand Down

0 comments on commit c331161

Please sign in to comment.