Skip to content

Commit

Permalink
Persistent Worker: support warm-up script (with optional timeout) and…
Browse files Browse the repository at this point in the history
… report more metrics (#812)

* Persistent Worker: support "warmup_script:" and report more metrics

* $ buf generate

* Support warm-up script timeout

* Correctly handle zero timeout

* Add "warmup-script" span
  • Loading branch information
edigaryev authored Nov 18, 2024
1 parent 09deaef commit 2c582d3
Show file tree
Hide file tree
Showing 12 changed files with 219 additions and 29 deletions.
16 changes: 15 additions & 1 deletion internal/executor/instance/abstract/abstract.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package abstract

import (
"context"
"errors"
"github.com/cirruslabs/cirrus-cli/internal/executor/instance/runconfig"
"github.com/cirruslabs/echelon"
"go.opentelemetry.io/otel/attribute"
"time"
)

type Instance interface {
Expand All @@ -14,7 +16,19 @@ type Instance interface {
Attributes() []attribute.KeyValue
}

var (
ErrWarmupScriptFailed = errors.New("warm-up script failed")
ErrWarmupTimeout = errors.New("warm-up script timed out")
)

type WarmableInstance interface {
// Warmup can be optionally called in case of a persistent worker is configured to be warm
Warmup(ctx context.Context, ident string, env map[string]string, logger *echelon.Logger) error
Warmup(
ctx context.Context,
ident string,
env map[string]string,
warmupScript string,
warmupTimeout time.Duration,
logger *echelon.Logger,
) error
}
87 changes: 85 additions & 2 deletions internal/executor/instance/persistentworker/isolation/tart/tart.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package tart

import (
"bufio"
"context"
"errors"
"fmt"
"github.com/cirruslabs/cirrus-cli/internal/executor/instance/abstract"
"github.com/cirruslabs/cirrus-cli/internal/executor/instance/persistentworker/projectdirsyncer"
"github.com/cirruslabs/cirrus-cli/internal/executor/instance/persistentworker/remoteagent"
"github.com/cirruslabs/cirrus-cli/internal/executor/instance/runconfig"
Expand All @@ -17,6 +19,7 @@ import (
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"golang.org/x/crypto/ssh"
"io"
"os"
"path"
"strings"
Expand Down Expand Up @@ -100,6 +103,8 @@ func (tart *Tart) Warmup(
ctx context.Context,
ident string,
additionalEnvironment map[string]string,
warmupScript string,
warmupTimeout time.Duration,
logger *echelon.Logger,
) error {
err := tart.bootVM(ctx, ident, additionalEnvironment, "", false, logger)
Expand All @@ -116,8 +121,86 @@ func (tart *Tart) Warmup(
if err != nil {
return err
}
_ = sshClient.Close()
return err
defer func() { _ = sshClient.Close() }()

if warmupScript == "" {
return nil
}

logger.Infof("running warm-up script...")

ctx, prepareInstanceSpan := tracer.Start(ctx, "warmup-script")
defer prepareInstanceSpan.End()

// Work around x/crypto/ssh not being context.Context-friendly (e.g. https://github.com/golang/go/issues/20288)
var monitorCtx context.Context
var monitorCancel context.CancelFunc
if warmupTimeout != 0 {
monitorCtx, monitorCancel = context.WithTimeoutCause(ctx, warmupTimeout, abstract.ErrWarmupTimeout)
} else {
monitorCtx, monitorCancel = context.WithCancel(ctx)
}
go func() {
<-monitorCtx.Done()
_ = sshClient.Close()
}()
defer monitorCancel()

sshSess, err := sshClient.NewSession()
if err != nil {
return fmt.Errorf("%w: failed to create new SSH session: %v", abstract.ErrWarmupScriptFailed, err)
}

// Log output from the virtual machine
stdout, err := sshSess.StdoutPipe()
if err != nil {
return fmt.Errorf("%w: failed to open SSH session stdout pipe: %v", abstract.ErrWarmupScriptFailed, err)
}
stderr, err := sshSess.StderrPipe()
if err != nil {
return fmt.Errorf("%w: failed to open SSH session stderr pipe: %v", abstract.ErrWarmupScriptFailed, err)
}
go func() {
output := io.MultiReader(stdout, stderr)

scanner := bufio.NewScanner(output)

for scanner.Scan() {
logger.Debugf("VM: %s", scanner.Text())
}
}()

stdinBuf, err := sshSess.StdinPipe()
if err != nil {
return fmt.Errorf("%w: failed to open SSH session stdin pipe: %v", abstract.ErrWarmupScriptFailed, err)
}

if err := sshSess.Shell(); err != nil {
return fmt.Errorf("%w: failed to invoke SSH shell on the VM: %v", abstract.ErrWarmupScriptFailed, err)
}

_, err = stdinBuf.Write([]byte(warmupScript + "\nexit\n"))
if err != nil {
return fmt.Errorf("%w: failed to write the warm-up script to the shell: %v",
abstract.ErrWarmupScriptFailed, err)
}

if err := sshSess.Wait(); err != nil {
// Work around x/crypto/ssh not being context.Context-friendly (e.g. https://github.com/golang/go/issues/20288)
if err := monitorCtx.Err(); err != nil {
if errors.Is(context.Cause(monitorCtx), abstract.ErrWarmupTimeout) {
logger.Warnf("%v, ignoring...", context.Cause(monitorCtx))

return nil
}

return err
}

return fmt.Errorf("%w: failed to execute the warm-up script: %v", abstract.ErrWarmupScriptFailed, err)
}

return nil
}

func PrePull(ctx context.Context, image string, logger *echelon.Logger) error {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ func (vetu *Vetu) Warmup(
ctx context.Context,
ident string,
env map[string]string,
_ string,
_ time.Duration,
logger *echelon.Logger,
) error {
return vetu.bootVM(ctx, ident, env, false, logger)
Expand Down
23 changes: 18 additions & 5 deletions internal/worker/standby.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,25 @@ import (
"github.com/cirruslabs/cirrus-cli/pkg/parser/parserkit"
"gopkg.in/yaml.v3"
"strconv"
"time"
)

type StandbyConfig struct {
Isolation *api.Isolation
Resources map[string]float64
Isolation *api.Isolation `yaml:"isolation"`
Resources map[string]float64 `yaml:"resources"`
Warmup Warmup `yaml:"warmup"`
}

type Warmup struct {
Script string `yaml:"script"`
Timeout time.Duration `yaml:"timeout"`
}

var ErrIsolationMissing = errors.New("isolation configuration is required for standby")
var ErrUnsupportedIsolation = errors.New("only Tart and Vetu instances are currently supported for standby")

func (standby *StandbyConfig) UnmarshalYAML(value *yaml.Node) error {
node, err := node.NewFromNodeWithMergeExemptions(yaml.Node{
documentNode, err := node.NewFromNodeWithMergeExemptions(yaml.Node{
Kind: yaml.DocumentNode,
Content: []*yaml.Node{
value,
Expand All @@ -32,7 +39,7 @@ func (standby *StandbyConfig) UnmarshalYAML(value *yaml.Node) error {
return err
}

isolationNode := node.FindChild("isolation")
isolationNode := documentNode.FindChild("isolation")
if isolationNode == nil {
return ErrIsolationMissing
}
Expand Down Expand Up @@ -60,7 +67,7 @@ func (standby *StandbyConfig) UnmarshalYAML(value *yaml.Node) error {

// Parse resources
standby.Resources = make(map[string]float64)
if resourcesNode := node.FindChild("resources"); resourcesNode != nil {
if resourcesNode := documentNode.FindChild("resources"); resourcesNode != nil {
for _, resourceNode := range resourcesNode.Children {
resourceValueRaw, err := resourceNode.FlattenedValue()
if err != nil {
Expand All @@ -74,5 +81,11 @@ func (standby *StandbyConfig) UnmarshalYAML(value *yaml.Node) error {
}
}

if warmupNode := documentNode.FindChild("warmup"); warmupNode != nil {
if err := warmupNode.YAMLNode.Decode(&standby.Warmup); err != nil {
return err
}
}

return nil
}
18 changes: 18 additions & 0 deletions internal/worker/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,13 @@ func (worker *Worker) getInstance(
resourcesToUse map[string]float64,
) (abstract.Instance, error) {
if standbyInstance := worker.standbyInstance; standbyInstance != nil {
// Record standby instance age before relinquishing the ownership
worker.standbyInstanceAgeHistogram.Record(ctx, time.Since(worker.standbyInstanceStartedAt).Seconds())

// Relinquish our ownership of the standby instance since
// we'll either return it to the task or terminate it
worker.standbyInstance = nil
worker.standbyInstanceStartedAt = time.Time{}

// Return the standby instance if matches the isolation required by the task
if proto.Equal(worker.standbyConfig.Isolation, isolation) {
Expand Down Expand Up @@ -184,9 +188,16 @@ func (worker *Worker) runTask(
worker.logger.Warnf("failed to set CLI's version for task %s: %v", taskID, err)
}

startedAt := time.Now()
err := inst.Run(ctx, &config)
stoppedAt := time.Now()

var taskExecutionAttributes []attribute.KeyValue

if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(ctx.Err(), context.Canceled) {
// Update task execution attributes
taskExecutionAttributes = append(taskExecutionAttributes, attribute.String("status", "failed"))

worker.logger.Errorf("failed to run task %s: %v", taskID, err)

boundedCtx, cancel := context.WithTimeout(context.Background(), perCallTimeout)
Expand Down Expand Up @@ -232,11 +243,18 @@ func (worker *Worker) runTask(
localHub.CaptureMessage(fmt.Sprintf("failed to notify the server about the failed task: %v", err))
})
}
} else {
// Update task execution attributes
taskExecutionAttributes = append(taskExecutionAttributes, attribute.String("status", "succeeded"))
}

boundedCtx, cancel := context.WithTimeout(context.Background(), perCallTimeout)
defer cancel()

// Record task execution time
worker.taskExecutionTimeHistogram.Record(boundedCtx, stoppedAt.Sub(startedAt).Seconds(),
metric.WithAttributes(taskExecutionAttributes...))

if md, ok := metadata.FromOutgoingContext(ctx); ok {
boundedCtx = metadata.NewOutgoingContext(boundedCtx, md)
}
Expand Down
35 changes: 29 additions & 6 deletions internal/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,18 @@ type Worker struct {
tasks *xsync.MapOf[string, *Task]
taskCompletions chan string

imagesCounter metric.Int64Counter
tasksCounter metric.Int64Counter
standbyHitCounter metric.Int64Counter
imagesCounter metric.Int64Counter
tasksCounter metric.Int64Counter
taskExecutionTimeHistogram metric.Float64Histogram
standbyHitCounter metric.Int64Counter
standbyInstanceAgeHistogram metric.Float64Histogram

logger logrus.FieldLogger
echelonLogger *echelon.Logger

standbyConfig *StandbyConfig
standbyInstance abstract.Instance
standbyConfig *StandbyConfig
standbyInstance abstract.Instance
standbyInstanceStartedAt time.Time

tartPrePull []string
}
Expand Down Expand Up @@ -149,6 +152,15 @@ func (worker *Worker) Run(ctx context.Context) error {
return err
}

worker.taskExecutionTimeHistogram, err = meter.Float64Histogram(
"org.cirruslabs.persistent_worker.tasks.execution_time",
metric.WithDescription("Task execution time."),
metric.WithUnit("s"),
)
if err != nil {
return err
}

// Resource-related metrics
_, err = meter.Float64ObservableGauge("org.cirruslabs.persistent_worker.resources.unused_count",
metric.WithDescription("Amount of resources available for use on the Persistent Worker."),
Expand Down Expand Up @@ -184,6 +196,15 @@ func (worker *Worker) Run(ctx context.Context) error {
return err
}

worker.standbyInstanceAgeHistogram, err = meter.Float64Histogram(
"org.cirruslabs.persistent_worker.standby.age",
metric.WithDescription("Standby instance age at the moment of relinquishing the ownership."),
metric.WithUnit("s"),
)
if err != nil {
return err
}

// https://github.com/cirruslabs/cirrus-cli/issues/571
if tart.Installed() {
if err := tart.Cleanup(); err != nil {
Expand Down Expand Up @@ -261,7 +282,8 @@ func (worker *Worker) tryCreateStandby(ctx context.Context) {

worker.logger.Debugf("warming-up the standby instance")

if err := standbyInstance.(abstract.WarmableInstance).Warmup(ctx, "standby", nil, worker.echelonLogger); err != nil {
if err := standbyInstance.(abstract.WarmableInstance).Warmup(ctx, "standby", nil,
worker.standbyConfig.Warmup.Script, worker.standbyConfig.Warmup.Timeout, worker.echelonLogger); err != nil {
worker.logger.Errorf("failed to warm-up a standby instance: %v", err)

if err := standbyInstance.Close(ctx); err != nil {
Expand All @@ -275,6 +297,7 @@ func (worker *Worker) tryCreateStandby(ctx context.Context) {
worker.logger.Debugf("standby instance had successfully warmed-up")

worker.standbyInstance = standbyInstance
worker.standbyInstanceStartedAt = time.Now()
}

func (worker *Worker) pollSingleUpstream(ctx context.Context, upstream *upstreampkg.Upstream) error {
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/cirrus_ci_service.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/parser/node/accessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (

func TestGetExpandedStringValue(t *testing.T) {
tree, err := node.NewFromText(`name: Batched $VALUE-${I}
`)
`, node.WithoutYAMLNode())
if err != nil {
t.Fatal(err)
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/parser/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package node
import (
"fmt"
"github.com/samber/lo"
"gopkg.in/yaml.v3"
"reflect"
"strings"
)
Expand All @@ -15,6 +16,8 @@ type Node struct {

Line int
Column int

YAMLNode *yaml.Node
}

type MapValue struct{}
Expand Down
9 changes: 9 additions & 0 deletions pkg/parser/node/option.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package node

type Option func(node *config)

func WithoutYAMLNode() Option {
return func(config *config) {
config.withoutYAMLNode = true
}
}
Loading

0 comments on commit 2c582d3

Please sign in to comment.