From 2c582d381ac37f0145dc4876a90f8b71bcb9d5c6 Mon Sep 17 00:00:00 2001 From: Nikolay Edigaryev Date: Mon, 18 Nov 2024 18:59:06 +0100 Subject: [PATCH] Persistent Worker: support warm-up script (with optional timeout) and report more metrics (#812) * Persistent Worker: support "warmup_script:" and report more metrics * $ buf generate * Support warm-up script timeout * Correctly handle zero timeout * Add "warmup-script" span --- .../executor/instance/abstract/abstract.go | 16 +++- .../persistentworker/isolation/tart/tart.go | 87 ++++++++++++++++++- .../persistentworker/isolation/vetu/vetu.go | 2 + internal/worker/standby.go | 23 +++-- internal/worker/task.go | 18 ++++ internal/worker/worker.go | 35 ++++++-- pkg/api/cirrus_ci_service.pb.go | 2 +- pkg/parser/node/accessor_test.go | 2 +- pkg/parser/node/node.go | 3 + pkg/parser/node/option.go | 9 ++ pkg/parser/node/yaml.go | 49 ++++++++--- pkg/parser/node/yaml_test.go | 2 +- 12 files changed, 219 insertions(+), 29 deletions(-) create mode 100644 pkg/parser/node/option.go diff --git a/internal/executor/instance/abstract/abstract.go b/internal/executor/instance/abstract/abstract.go index ffa2a31d..fb906f5d 100644 --- a/internal/executor/instance/abstract/abstract.go +++ b/internal/executor/instance/abstract/abstract.go @@ -2,9 +2,11 @@ package abstract import ( "context" + "errors" "github.com/cirruslabs/cirrus-cli/internal/executor/instance/runconfig" "github.com/cirruslabs/echelon" "go.opentelemetry.io/otel/attribute" + "time" ) type Instance interface { @@ -14,7 +16,19 @@ type Instance interface { Attributes() []attribute.KeyValue } +var ( + ErrWarmupScriptFailed = errors.New("warm-up script failed") + ErrWarmupTimeout = errors.New("warm-up script timed out") +) + type WarmableInstance interface { // Warmup can be optionally called in case of a persistent worker is configured to be warm - Warmup(ctx context.Context, ident string, env map[string]string, logger *echelon.Logger) error + Warmup( + ctx context.Context, + ident string, + env map[string]string, + warmupScript string, + warmupTimeout time.Duration, + logger *echelon.Logger, + ) error } diff --git a/internal/executor/instance/persistentworker/isolation/tart/tart.go b/internal/executor/instance/persistentworker/isolation/tart/tart.go index c8c9181c..c04aa94d 100644 --- a/internal/executor/instance/persistentworker/isolation/tart/tart.go +++ b/internal/executor/instance/persistentworker/isolation/tart/tart.go @@ -1,9 +1,11 @@ package tart import ( + "bufio" "context" "errors" "fmt" + "github.com/cirruslabs/cirrus-cli/internal/executor/instance/abstract" "github.com/cirruslabs/cirrus-cli/internal/executor/instance/persistentworker/projectdirsyncer" "github.com/cirruslabs/cirrus-cli/internal/executor/instance/persistentworker/remoteagent" "github.com/cirruslabs/cirrus-cli/internal/executor/instance/runconfig" @@ -17,6 +19,7 @@ import ( "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "golang.org/x/crypto/ssh" + "io" "os" "path" "strings" @@ -100,6 +103,8 @@ func (tart *Tart) Warmup( ctx context.Context, ident string, additionalEnvironment map[string]string, + warmupScript string, + warmupTimeout time.Duration, logger *echelon.Logger, ) error { err := tart.bootVM(ctx, ident, additionalEnvironment, "", false, logger) @@ -116,8 +121,86 @@ func (tart *Tart) Warmup( if err != nil { return err } - _ = sshClient.Close() - return err + defer func() { _ = sshClient.Close() }() + + if warmupScript == "" { + return nil + } + + logger.Infof("running warm-up script...") + + ctx, prepareInstanceSpan := tracer.Start(ctx, "warmup-script") + defer prepareInstanceSpan.End() + + // Work around x/crypto/ssh not being context.Context-friendly (e.g. https://github.com/golang/go/issues/20288) + var monitorCtx context.Context + var monitorCancel context.CancelFunc + if warmupTimeout != 0 { + monitorCtx, monitorCancel = context.WithTimeoutCause(ctx, warmupTimeout, abstract.ErrWarmupTimeout) + } else { + monitorCtx, monitorCancel = context.WithCancel(ctx) + } + go func() { + <-monitorCtx.Done() + _ = sshClient.Close() + }() + defer monitorCancel() + + sshSess, err := sshClient.NewSession() + if err != nil { + return fmt.Errorf("%w: failed to create new SSH session: %v", abstract.ErrWarmupScriptFailed, err) + } + + // Log output from the virtual machine + stdout, err := sshSess.StdoutPipe() + if err != nil { + return fmt.Errorf("%w: failed to open SSH session stdout pipe: %v", abstract.ErrWarmupScriptFailed, err) + } + stderr, err := sshSess.StderrPipe() + if err != nil { + return fmt.Errorf("%w: failed to open SSH session stderr pipe: %v", abstract.ErrWarmupScriptFailed, err) + } + go func() { + output := io.MultiReader(stdout, stderr) + + scanner := bufio.NewScanner(output) + + for scanner.Scan() { + logger.Debugf("VM: %s", scanner.Text()) + } + }() + + stdinBuf, err := sshSess.StdinPipe() + if err != nil { + return fmt.Errorf("%w: failed to open SSH session stdin pipe: %v", abstract.ErrWarmupScriptFailed, err) + } + + if err := sshSess.Shell(); err != nil { + return fmt.Errorf("%w: failed to invoke SSH shell on the VM: %v", abstract.ErrWarmupScriptFailed, err) + } + + _, err = stdinBuf.Write([]byte(warmupScript + "\nexit\n")) + if err != nil { + return fmt.Errorf("%w: failed to write the warm-up script to the shell: %v", + abstract.ErrWarmupScriptFailed, err) + } + + if err := sshSess.Wait(); err != nil { + // Work around x/crypto/ssh not being context.Context-friendly (e.g. https://github.com/golang/go/issues/20288) + if err := monitorCtx.Err(); err != nil { + if errors.Is(context.Cause(monitorCtx), abstract.ErrWarmupTimeout) { + logger.Warnf("%v, ignoring...", context.Cause(monitorCtx)) + + return nil + } + + return err + } + + return fmt.Errorf("%w: failed to execute the warm-up script: %v", abstract.ErrWarmupScriptFailed, err) + } + + return nil } func PrePull(ctx context.Context, image string, logger *echelon.Logger) error { diff --git a/internal/executor/instance/persistentworker/isolation/vetu/vetu.go b/internal/executor/instance/persistentworker/isolation/vetu/vetu.go index bca2e5da..8c9836ca 100644 --- a/internal/executor/instance/persistentworker/isolation/vetu/vetu.go +++ b/internal/executor/instance/persistentworker/isolation/vetu/vetu.go @@ -94,6 +94,8 @@ func (vetu *Vetu) Warmup( ctx context.Context, ident string, env map[string]string, + _ string, + _ time.Duration, logger *echelon.Logger, ) error { return vetu.bootVM(ctx, ident, env, false, logger) diff --git a/internal/worker/standby.go b/internal/worker/standby.go index 5e4a4a8e..2ed96a63 100644 --- a/internal/worker/standby.go +++ b/internal/worker/standby.go @@ -11,18 +11,25 @@ import ( "github.com/cirruslabs/cirrus-cli/pkg/parser/parserkit" "gopkg.in/yaml.v3" "strconv" + "time" ) type StandbyConfig struct { - Isolation *api.Isolation - Resources map[string]float64 + Isolation *api.Isolation `yaml:"isolation"` + Resources map[string]float64 `yaml:"resources"` + Warmup Warmup `yaml:"warmup"` +} + +type Warmup struct { + Script string `yaml:"script"` + Timeout time.Duration `yaml:"timeout"` } var ErrIsolationMissing = errors.New("isolation configuration is required for standby") var ErrUnsupportedIsolation = errors.New("only Tart and Vetu instances are currently supported for standby") func (standby *StandbyConfig) UnmarshalYAML(value *yaml.Node) error { - node, err := node.NewFromNodeWithMergeExemptions(yaml.Node{ + documentNode, err := node.NewFromNodeWithMergeExemptions(yaml.Node{ Kind: yaml.DocumentNode, Content: []*yaml.Node{ value, @@ -32,7 +39,7 @@ func (standby *StandbyConfig) UnmarshalYAML(value *yaml.Node) error { return err } - isolationNode := node.FindChild("isolation") + isolationNode := documentNode.FindChild("isolation") if isolationNode == nil { return ErrIsolationMissing } @@ -60,7 +67,7 @@ func (standby *StandbyConfig) UnmarshalYAML(value *yaml.Node) error { // Parse resources standby.Resources = make(map[string]float64) - if resourcesNode := node.FindChild("resources"); resourcesNode != nil { + if resourcesNode := documentNode.FindChild("resources"); resourcesNode != nil { for _, resourceNode := range resourcesNode.Children { resourceValueRaw, err := resourceNode.FlattenedValue() if err != nil { @@ -74,5 +81,11 @@ func (standby *StandbyConfig) UnmarshalYAML(value *yaml.Node) error { } } + if warmupNode := documentNode.FindChild("warmup"); warmupNode != nil { + if err := warmupNode.YAMLNode.Decode(&standby.Warmup); err != nil { + return err + } + } + return nil } diff --git a/internal/worker/task.go b/internal/worker/task.go index b2588509..6b7c5384 100644 --- a/internal/worker/task.go +++ b/internal/worker/task.go @@ -83,9 +83,13 @@ func (worker *Worker) getInstance( resourcesToUse map[string]float64, ) (abstract.Instance, error) { if standbyInstance := worker.standbyInstance; standbyInstance != nil { + // Record standby instance age before relinquishing the ownership + worker.standbyInstanceAgeHistogram.Record(ctx, time.Since(worker.standbyInstanceStartedAt).Seconds()) + // Relinquish our ownership of the standby instance since // we'll either return it to the task or terminate it worker.standbyInstance = nil + worker.standbyInstanceStartedAt = time.Time{} // Return the standby instance if matches the isolation required by the task if proto.Equal(worker.standbyConfig.Isolation, isolation) { @@ -184,9 +188,16 @@ func (worker *Worker) runTask( worker.logger.Warnf("failed to set CLI's version for task %s: %v", taskID, err) } + startedAt := time.Now() err := inst.Run(ctx, &config) + stoppedAt := time.Now() + + var taskExecutionAttributes []attribute.KeyValue if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(ctx.Err(), context.Canceled) { + // Update task execution attributes + taskExecutionAttributes = append(taskExecutionAttributes, attribute.String("status", "failed")) + worker.logger.Errorf("failed to run task %s: %v", taskID, err) boundedCtx, cancel := context.WithTimeout(context.Background(), perCallTimeout) @@ -232,11 +243,18 @@ func (worker *Worker) runTask( localHub.CaptureMessage(fmt.Sprintf("failed to notify the server about the failed task: %v", err)) }) } + } else { + // Update task execution attributes + taskExecutionAttributes = append(taskExecutionAttributes, attribute.String("status", "succeeded")) } boundedCtx, cancel := context.WithTimeout(context.Background(), perCallTimeout) defer cancel() + // Record task execution time + worker.taskExecutionTimeHistogram.Record(boundedCtx, stoppedAt.Sub(startedAt).Seconds(), + metric.WithAttributes(taskExecutionAttributes...)) + if md, ok := metadata.FromOutgoingContext(ctx); ok { boundedCtx = metadata.NewOutgoingContext(boundedCtx, md) } diff --git a/internal/worker/worker.go b/internal/worker/worker.go index 90b64bc3..252807c4 100644 --- a/internal/worker/worker.go +++ b/internal/worker/worker.go @@ -46,15 +46,18 @@ type Worker struct { tasks *xsync.MapOf[string, *Task] taskCompletions chan string - imagesCounter metric.Int64Counter - tasksCounter metric.Int64Counter - standbyHitCounter metric.Int64Counter + imagesCounter metric.Int64Counter + tasksCounter metric.Int64Counter + taskExecutionTimeHistogram metric.Float64Histogram + standbyHitCounter metric.Int64Counter + standbyInstanceAgeHistogram metric.Float64Histogram logger logrus.FieldLogger echelonLogger *echelon.Logger - standbyConfig *StandbyConfig - standbyInstance abstract.Instance + standbyConfig *StandbyConfig + standbyInstance abstract.Instance + standbyInstanceStartedAt time.Time tartPrePull []string } @@ -149,6 +152,15 @@ func (worker *Worker) Run(ctx context.Context) error { return err } + worker.taskExecutionTimeHistogram, err = meter.Float64Histogram( + "org.cirruslabs.persistent_worker.tasks.execution_time", + metric.WithDescription("Task execution time."), + metric.WithUnit("s"), + ) + if err != nil { + return err + } + // Resource-related metrics _, err = meter.Float64ObservableGauge("org.cirruslabs.persistent_worker.resources.unused_count", metric.WithDescription("Amount of resources available for use on the Persistent Worker."), @@ -184,6 +196,15 @@ func (worker *Worker) Run(ctx context.Context) error { return err } + worker.standbyInstanceAgeHistogram, err = meter.Float64Histogram( + "org.cirruslabs.persistent_worker.standby.age", + metric.WithDescription("Standby instance age at the moment of relinquishing the ownership."), + metric.WithUnit("s"), + ) + if err != nil { + return err + } + // https://github.com/cirruslabs/cirrus-cli/issues/571 if tart.Installed() { if err := tart.Cleanup(); err != nil { @@ -261,7 +282,8 @@ func (worker *Worker) tryCreateStandby(ctx context.Context) { worker.logger.Debugf("warming-up the standby instance") - if err := standbyInstance.(abstract.WarmableInstance).Warmup(ctx, "standby", nil, worker.echelonLogger); err != nil { + if err := standbyInstance.(abstract.WarmableInstance).Warmup(ctx, "standby", nil, + worker.standbyConfig.Warmup.Script, worker.standbyConfig.Warmup.Timeout, worker.echelonLogger); err != nil { worker.logger.Errorf("failed to warm-up a standby instance: %v", err) if err := standbyInstance.Close(ctx); err != nil { @@ -275,6 +297,7 @@ func (worker *Worker) tryCreateStandby(ctx context.Context) { worker.logger.Debugf("standby instance had successfully warmed-up") worker.standbyInstance = standbyInstance + worker.standbyInstanceStartedAt = time.Now() } func (worker *Worker) pollSingleUpstream(ctx context.Context, upstream *upstreampkg.Upstream) error { diff --git a/pkg/api/cirrus_ci_service.pb.go b/pkg/api/cirrus_ci_service.pb.go index 598953af..d806c619 100644 --- a/pkg/api/cirrus_ci_service.pb.go +++ b/pkg/api/cirrus_ci_service.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.35.1 +// protoc-gen-go v1.35.2 // protoc (unknown) // source: api/cirrus_ci_service.proto diff --git a/pkg/parser/node/accessor_test.go b/pkg/parser/node/accessor_test.go index 3eb3747a..21e731b2 100644 --- a/pkg/parser/node/accessor_test.go +++ b/pkg/parser/node/accessor_test.go @@ -9,7 +9,7 @@ import ( func TestGetExpandedStringValue(t *testing.T) { tree, err := node.NewFromText(`name: Batched $VALUE-${I} -`) +`, node.WithoutYAMLNode()) if err != nil { t.Fatal(err) } diff --git a/pkg/parser/node/node.go b/pkg/parser/node/node.go index b81c5696..bbe96e72 100644 --- a/pkg/parser/node/node.go +++ b/pkg/parser/node/node.go @@ -3,6 +3,7 @@ package node import ( "fmt" "github.com/samber/lo" + "gopkg.in/yaml.v3" "reflect" "strings" ) @@ -15,6 +16,8 @@ type Node struct { Line int Column int + + YAMLNode *yaml.Node } type MapValue struct{} diff --git a/pkg/parser/node/option.go b/pkg/parser/node/option.go new file mode 100644 index 00000000..04d52b02 --- /dev/null +++ b/pkg/parser/node/option.go @@ -0,0 +1,9 @@ +package node + +type Option func(node *config) + +func WithoutYAMLNode() Option { + return func(config *config) { + config.withoutYAMLNode = true + } +} diff --git a/pkg/parser/node/yaml.go b/pkg/parser/node/yaml.go index 2197459d..e25dabee 100644 --- a/pkg/parser/node/yaml.go +++ b/pkg/parser/node/yaml.go @@ -11,6 +11,10 @@ import ( var ErrFailedToMarshal = errors.New("failed to marshal to YAML") +type config struct { + withoutYAMLNode bool +} + func (node *Node) MarshalYAML() (*yaml.Node, error) { switch obj := node.Value.(type) { case *MapValue: @@ -45,11 +49,11 @@ func (node *Node) MarshalYAML() (*yaml.Node, error) { } } -func NewFromText(text string) (*Node, error) { - return NewFromTextWithMergeExemptions(text, []nameable.Nameable{}) +func NewFromText(text string, opts ...Option) (*Node, error) { + return NewFromTextWithMergeExemptions(text, []nameable.Nameable{}, opts...) } -func NewFromTextWithMergeExemptions(text string, mergeExemptions []nameable.Nameable) (*Node, error) { +func NewFromTextWithMergeExemptions(text string, mergeExemptions []nameable.Nameable, opts ...Option) (*Node, error) { var yamlNode yaml.Node // Unmarshal YAML @@ -57,10 +61,14 @@ func NewFromTextWithMergeExemptions(text string, mergeExemptions []nameable.Name return nil, err } - return NewFromNodeWithMergeExemptions(yamlNode, mergeExemptions) + return NewFromNodeWithMergeExemptions(yamlNode, mergeExemptions, opts...) } -func NewFromNodeWithMergeExemptions(yamlNode yaml.Node, mergeExemptions []nameable.Nameable) (*Node, error) { +func NewFromNodeWithMergeExemptions( + yamlNode yaml.Node, + mergeExemptions []nameable.Nameable, + opts ...Option, +) (*Node, error) { // Empty document if yamlNode.Kind == 0 || len(yamlNode.Content) == 0 { return &Node{Name: "root"}, nil @@ -83,11 +91,18 @@ func NewFromNodeWithMergeExemptions(yamlNode yaml.Node, mergeExemptions []nameab " as it top-level element, but found %s", yamlKindToString(yamlNode.Kind)) } - return convert(nil, "root", yamlNode.Content[0], yamlNode.Line, yamlNode.Column, mergeExemptions) + config := &config{} + + for _, opt := range opts { + opt(config) + } + + return convert(config, nil, "root", yamlNode.Content[0], yamlNode.Line, yamlNode.Column, mergeExemptions) } //nolint:gocognit // splitting this into multiple functions would probably make this even less comprehensible func convert( + config *config, parent *Node, name string, yamlNode *yaml.Node, @@ -102,6 +117,10 @@ func convert( Column: column, } + if !config.withoutYAMLNode { + result.YAMLNode = yamlNode + } + switch yamlNode.Kind { case yaml.SequenceNode: result.Value = &ListValue{} @@ -114,7 +133,7 @@ func convert( continue } - listSubtree, err := convert(result, "", item, item.Line, item.Column, mergeExemptions) + listSubtree, err := convert(config, result, "", item, item.Line, item.Column, mergeExemptions) if err != nil { return nil, err } @@ -134,7 +153,7 @@ func convert( // Handle "<<" keys if key.Tag == "!!merge" { - if err := result.merge(yamlNode, key, value, mergeExemptions); err != nil { + if err := result.merge(config, yamlNode, key, value, mergeExemptions); err != nil { return nil, err } @@ -146,7 +165,7 @@ func convert( return nil, parsererror.NewRich(yamlNode.Line, yamlNode.Column, "map key is not a string") } - mapSubtree, err := convert(result, key.Value, value, key.Line, key.Column, mergeExemptions) + mapSubtree, err := convert(config, result, key.Value, value, key.Line, key.Column, mergeExemptions) if err != nil { return nil, err } @@ -159,7 +178,7 @@ func convert( // YAML aliases generally don't need line and column helper values // since they are merged into some other data structure afterwards // and this helps to find bugs easier in the future - resolvedAlias, err := convert(parent, name, yamlNode.Alias, 0, 0, mergeExemptions) + resolvedAlias, err := convert(config, parent, name, yamlNode.Alias, 0, 0, mergeExemptions) if err != nil { return nil, err } @@ -173,11 +192,17 @@ func convert( return result, nil } -func (node *Node) merge(parent *yaml.Node, _ *yaml.Node, value *yaml.Node, mergeExemptions []nameable.Nameable) error { +func (node *Node) merge( + config *config, + parent *yaml.Node, + _ *yaml.Node, + value *yaml.Node, + mergeExemptions []nameable.Nameable, +) error { // YAML aliases generally don't need line and column helper values // since they are merged into some other data structure afterwards // and this helps to find bugs easier in the future - aliasValue, err := convert(node, "", value, 0, 0, mergeExemptions) + aliasValue, err := convert(config, node, "", value, 0, 0, mergeExemptions) if err != nil { return err } diff --git a/pkg/parser/node/yaml_test.go b/pkg/parser/node/yaml_test.go index 9eeec6f6..50d823f7 100644 --- a/pkg/parser/node/yaml_test.go +++ b/pkg/parser/node/yaml_test.go @@ -23,7 +23,7 @@ aliases_test: <<: *defaults ` - actual, err := node.NewFromText(config) + actual, err := node.NewFromText(config, node.WithoutYAMLNode()) if err != nil { t.Fatal(err) }