Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: container outputs and dynamic environments #591

Merged
merged 21 commits into from
Sep 11, 2024
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions cmd/vela-worker/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import (
"context"
"encoding/json"
"fmt"
"net/http"
"sync"
"time"
Expand All @@ -26,8 +27,8 @@
// exec is a helper function to poll the queue
// and execute Vela pipelines for the Worker.
//
//nolint:nilerr,funlen // ignore returning nil - don't want to crash worker

Check failure on line 30 in cmd/vela-worker/exec.go

View workflow job for this annotation

GitHub Actions / golangci

[golangci] cmd/vela-worker/exec.go#L30

directive `//nolint:nilerr,funlen // ignore returning nil - don't want to crash worker` is unused for linter "nilerr" (nolintlint)
Raw output
cmd/vela-worker/exec.go:30:1: directive `//nolint:nilerr,funlen // ignore returning nil - don't want to crash worker` is unused for linter "nilerr" (nolintlint)
//nolint:nilerr,funlen // ignore returning nil - don't want to crash worker
^
func (w *Worker) exec(index int, config *api.Worker) error {

Check failure on line 31 in cmd/vela-worker/exec.go

View workflow job for this annotation

GitHub Actions / golangci

[golangci] cmd/vela-worker/exec.go#L31

cyclomatic complexity 31 of func `(*Worker).exec` is high (> 30) (gocyclo)
Raw output
cmd/vela-worker/exec.go:31:1: cyclomatic complexity 31 of func `(*Worker).exec` is high (> 30) (gocyclo)
func (w *Worker) exec(index int, config *api.Worker) error {
^

Check failure on line 31 in cmd/vela-worker/exec.go

View workflow job for this annotation

GitHub Actions / full-review

cyclomatic complexity 31 of func `(*Worker).exec` is high (> 30) (gocyclo)

Check failure on line 31 in cmd/vela-worker/exec.go

View workflow job for this annotation

GitHub Actions / diff-review

cyclomatic complexity 31 of func `(*Worker).exec` is high (> 30) (gocyclo)
var err error

// setup the version
Expand Down Expand Up @@ -145,6 +146,9 @@
break
}

// set the outputs container ID
w.Config.Executor.OutputCtn.ID = fmt.Sprintf("outputs_%s", p.ID)

// create logger with extra metadata
//
// https://pkg.go.dev/github.com/sirupsen/logrus#WithFields
Expand Down Expand Up @@ -236,6 +240,7 @@
Build: item.Build,
Pipeline: p.Sanitize(w.Config.Runtime.Driver),
Version: v.Semantic(),
OutputCtn: w.Config.Executor.OutputCtn,
})

// add the executor to the worker
Expand Down
13 changes: 13 additions & 0 deletions cmd/vela-worker/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (

api "github.com/go-vela/server/api/types"
"github.com/go-vela/server/queue"
"github.com/go-vela/types/constants"
"github.com/go-vela/types/pipeline"
"github.com/go-vela/worker/executor"
"github.com/go-vela/worker/runtime"
)
Expand Down Expand Up @@ -73,6 +75,16 @@ func run(c *cli.Context) error {
return fmt.Errorf("unable to parse address: %w", err)
}

outputsCtn := new(pipeline.Container)
if len(c.String("executor.outputs-image")) > 0 {
outputsCtn = &pipeline.Container{
Detach: true,
Image: c.String("executor.outputs-image"),
Environment: make(map[string]string),
Pull: constants.PullNotPresent,
}
}

// create the worker
w := &Worker{
// worker configuration
Expand All @@ -94,6 +106,7 @@ func run(c *cli.Context) error {
MaxLogSize: c.Uint("executor.max_log_size"),
LogStreamingTimeout: c.Duration("executor.log_streaming_timeout"),
EnforceTrustedRepos: c.Bool("executor.enforce-trusted-repos"),
OutputCtn: outputsCtn,
},
// logger configuration
Logger: &Logger{
Expand Down
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ services:
VELA_SERVER_SECRET: 'zB7mrKDTZqNeNTD8z47yG4DHywspAh'
WORKER_ADDR: 'http://worker:8080'
WORKER_CHECK_IN: 2m
VELA_EXECUTOR_OUTPUTS_IMAGE: 'alpine:latest'
restart: always
ports:
- "8081:8080"
Expand Down
6 changes: 6 additions & 0 deletions executor/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,10 @@ var Flags = []cli.Flag{
Usage: "enforce trusted repo restrictions for privileged images",
Value: true,
},
&cli.StringFlag{
EnvVars: []string{"VELA_EXECUTOR_OUTPUTS_IMAGE", "EXECUTOR_OUTPUTS_IMAGE"},
FilePath: "/vela/executor/outputs_image",
Name: "executor.outputs-image",
Usage: "image used for the outputs container sidecar",
},
}
164 changes: 77 additions & 87 deletions executor/linux/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ func (c *client) PlanBuild(ctx context.Context) error {

// AssembleBuild prepares the containers within a build for execution.
//
//nolint:gocyclo,funlen // ignore cyclomatic complexity and function length due to comments and logging messages
//nolint:funlen // consider abstracting parts here but for now this is fine
func (c *client) AssembleBuild(ctx context.Context) error {
// defer taking a snapshot of the build
//
Expand Down Expand Up @@ -329,9 +329,10 @@ func (c *client) AssembleBuild(ctx context.Context) error {
continue
}

c.Logger.Infof("creating %s step", s.Name)

_log.AppendData([]byte(fmt.Sprintf("> Preparing step image %s...\n", s.Image)))

c.Logger.Infof("creating %s step", s.Name)
// create the step
c.err = c.CreateStep(ctx, s)
if c.err != nil {
Expand Down Expand Up @@ -364,6 +365,18 @@ func (c *client) AssembleBuild(ctx context.Context) error {
continue
}

// verify secret image is allowed to run
if c.enforceTrustedRepos {
priv, err := image.IsPrivilegedImage(s.Origin.Image, c.privilegedImages)
if err != nil {
return err
}

if priv && !c.build.GetRepo().GetTrusted() {
return fmt.Errorf("attempting to use privileged image (%s) as untrusted repo", s.Origin.Image)
}
}

c.Logger.Infof("creating %s secret", s.Origin.Name)
// create the service
c.err = c.secret.create(ctx, s.Origin)
Expand All @@ -384,92 +397,11 @@ func (c *client) AssembleBuild(ctx context.Context) error {
// https://pkg.go.dev/github.com/go-vela/types/library#Log.AppendData
_log.AppendData(image)
}
// enforce repo.trusted is set for pipelines containing privileged images
// if not enforced, allow all that exist in the list of runtime privileged images
// this configuration is set as an executor flag
if c.enforceTrustedRepos {
// group steps services stages and secret origins together
containers := c.pipeline.Steps

containers = append(containers, c.pipeline.Services...)

for _, stage := range c.pipeline.Stages {
containers = append(containers, stage.Steps...)
}

for _, secret := range c.pipeline.Secrets {
containers = append(containers, secret.Origin)
}

// assume no privileged images are in use
containsPrivilegedImages := false
privImages := []string{}

// verify all pipeline containers
for _, container := range containers {
// TODO: remove hardcoded reference
if container.Image == "#init" {
continue
}

// skip over non-plugin secrets origins
if container.Empty() {
continue
}

c.Logger.Infof("verifying privileges for container %s", container.Name)

// update the init log with image info
//
// https://pkg.go.dev/github.com/go-vela/types/library#Log.AppendData
_log.AppendData([]byte(fmt.Sprintf("Verifying privileges for image %s...\n", container.Image)))

for _, pattern := range c.privilegedImages {
// check if image matches privileged pattern
privileged, err := image.IsPrivilegedImage(container.Image, pattern)
if err != nil {
// wrap the error
c.err = fmt.Errorf("unable to verify privileges for image %s: %w", container.Image, err)

// update the init log with image info
//
// https://pkg.go.dev/github.com/go-vela/types/library#Log.AppendData
_log.AppendData([]byte(fmt.Sprintf("ERROR: %s\n", c.err.Error())))

// return error and destroy the build
// ignore checking more images
return c.err
}

if privileged {
// pipeline contains at least one privileged image
containsPrivilegedImages = privileged

privImages = append(privImages, container.Image)
}
}

// update the init log with image info
//
// https://pkg.go.dev/github.com/go-vela/types/library#Log.AppendData
_log.AppendData([]byte(fmt.Sprintf("Privileges verified for image %s\n", container.Image)))
}

localBool := c.build.GetRepo().GetTrusted()

// ensure pipelines containing privileged images are only permitted to run by trusted repos
if (containsPrivilegedImages) && !localBool {
// update error including privileged image
c.err = fmt.Errorf("unable to assemble build. pipeline contains privileged images and repo is not trusted. privileged image: %v", privImages)

// update the init log with image info
//
// https://pkg.go.dev/github.com/go-vela/types/library#Log.AppendData
_log.AppendData([]byte(fmt.Sprintf("ERROR: %s\n", c.err.Error())))

// return error and destroy the build
return c.err
}
// create outputs container with a timeout equal to the repo timeout
c.err = c.outputs.create(ctx, c.OutputCtn, (int64(60) * c.build.GetRepo().GetTimeout()))
if c.err != nil {
return fmt.Errorf("unable to create outputs container: %w", c.err)
}

// inspect the runtime build (eg a kubernetes pod) for the pipeline
Expand Down Expand Up @@ -502,6 +434,8 @@ func (c *client) AssembleBuild(ctx context.Context) error {
}

// ExecBuild runs a pipeline for a build.
//
//nolint:funlen // there is a lot going on here and will probably always be long
func (c *client) ExecBuild(ctx context.Context) error {
defer func() {
// Exec* calls are responsible for sending StreamRequest messages.
Expand All @@ -515,6 +449,18 @@ func (c *client) ExecBuild(ctx context.Context) error {
build.Upload(c.build, c.Vela, c.err, c.Logger)
}()

// output maps for dynamic environment variables captured from volume
var opEnv, maskEnv map[string]string

// fire up output container to run with the build
c.Logger.Infof("creating outputs container %s", c.OutputCtn.ID)

// execute outputs container
c.err = c.outputs.exec(ctx, c.OutputCtn)
if c.err != nil {
return fmt.Errorf("unable to exec outputs container: %w", c.err)
}

c.Logger.Info("executing secret images")
// execute the secret
c.err = c.secret.exec(ctx, &c.pipeline.Secrets)
Expand Down Expand Up @@ -571,6 +517,42 @@ func (c *client) ExecBuild(ctx context.Context) error {
return fmt.Errorf("unable to plan step: %w", c.err)
}

// poll outputs
opEnv, maskEnv, c.err = c.outputs.poll(ctx, c.OutputCtn)
if c.err != nil {
return fmt.Errorf("unable to exec outputs container: %w", c.err)
}

// merge env from outputs
//
//nolint:errcheck // only errors with empty environment input, which does not matter here
_step.MergeEnv(opEnv)

// merge env from masked outputs
//
//nolint:errcheck // only errors with empty environment input, which does not matter here
_step.MergeEnv(maskEnv)

// add masked outputs to secret map so they can be masked in logs
for key := range maskEnv {
sec := &pipeline.StepSecret{
Target: key,
}
_step.Secrets = append(_step.Secrets, sec)
}

// perform any substitution on dynamic variables
err = _step.Substitute()
if err != nil {
return err
}

// inject no-substitution secrets for container
err = injectSecrets(_step, c.NoSubSecrets)
if err != nil {
return err
}

c.Logger.Infof("executing %s step", _step.Name)
// execute the step
c.err = c.ExecStep(ctx, _step)
Expand Down Expand Up @@ -706,6 +688,8 @@ func (c *client) StreamBuild(ctx context.Context) error {
// loadLazySecrets is a helper function that injects secrets
// into the container right before execution, rather than
// during build planning. It is only available for the Docker runtime.
//
//nolint:funlen // explanation takes up a lot of lines
func loadLazySecrets(c *client, _step *pipeline.Container) error {
_log := new(library.Log)

Expand Down Expand Up @@ -941,6 +925,12 @@ func (c *client) DestroyBuild(ctx context.Context) error {
}
}

// destroy output container
err = c.outputs.destroy(ctx, c.OutputCtn)
if err != nil {
c.Logger.Errorf("unable to destroy output container: %v", err)
}

c.Logger.Info("deleting volume")
// remove the runtime volume for the pipeline
err = c.Runtime.RemoveVolume(ctx, c.pipeline)
Expand Down
Loading
Loading