Skip to content

Commit

Permalink
Renamed procs/jobs to steps in code (woodpecker-ci#1331)
Browse files Browse the repository at this point in the history
Renamed `procs` to `steps` in code for the issue woodpecker-ci#1288

Co-authored-by: Harikesh Prajapati <[email protected]>
Co-authored-by: qwerty287 <[email protected]>
Co-authored-by: qwerty287 <[email protected]>
Co-authored-by: 6543 <[email protected]>
  • Loading branch information
5 people authored Oct 28, 2022
1 parent b44e895 commit 36e4291
Show file tree
Hide file tree
Showing 109 changed files with 1,474 additions and 1,364 deletions.
2 changes: 1 addition & 1 deletion .gitpod.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ tasks:
env:
WOODPECKER_SERVER: localhost:9000
WOODPECKER_SECRET: "1234"
WOODPECKER_MAX_PROCS: 1
WOODPECKER_MAX_WORKFLOWS: 1
WOODPECKER_HEALTHCHECK: false
command: |
gp sync-await woodpecker-server
Expand Down
43 changes: 25 additions & 18 deletions agent/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (r *Runner) Run(ctx context.Context) error {
meta, _ := metadata.FromOutgoingContext(ctx)
ctxmeta := metadata.NewOutgoingContext(context.Background(), meta)

// get the next job from the queue
// get the next workflow from the queue
work, err := r.client.Next(ctx, r.filter)
if err != nil {
return err
Expand Down Expand Up @@ -150,10 +150,10 @@ func (r *Runner) Run(ctx context.Context) error {
}

var uploads sync.WaitGroup
defaultLogger := pipeline.LogFunc(func(proc *backend.Step, rc multipart.Reader) error {
defaultLogger := pipeline.LogFunc(func(step *backend.Step, rc multipart.Reader) error {
loglogger := logger.With().
Str("image", proc.Image).
Str("stage", proc.Alias).
Str("image", step.Image).
Str("stage", step.Alias).
Logger()

part, rerr := rc.NextPart()
Expand All @@ -172,7 +172,7 @@ func (r *Runner) Run(ctx context.Context) error {
loglogger.Debug().Msg("log stream opened")

limitedPart := io.LimitReader(part, maxLogsUpload)
logStream := rpc.NewLineWriter(r.client, work.ID, proc.Alias, secrets...)
logStream := rpc.NewLineWriter(r.client, work.ID, step.Alias, secrets...)
if _, err := io.Copy(logStream, limitedPart); err != nil {
log.Error().Err(err).Msg("copy limited logStream part")
}
Expand All @@ -186,7 +186,7 @@ func (r *Runner) Run(ctx context.Context) error {

file := &rpc.File{
Mime: "application/json+logs",
Proc: proc.Alias,
Step: step.Alias,
Name: "logs.json",
Data: data,
Size: len(data),
Expand Down Expand Up @@ -218,7 +218,7 @@ func (r *Runner) Run(ctx context.Context) error {

file = &rpc.File{
Mime: part.Header().Get("Content-Type"),
Proc: proc.Alias,
Step: step.Alias,
Name: part.FileName(),
Data: data,
Size: len(data),
Expand Down Expand Up @@ -250,35 +250,35 @@ func (r *Runner) Run(ctx context.Context) error {
})

defaultTracer := pipeline.TraceFunc(func(state *pipeline.State) error {
proclogger := logger.With().
steplogger := logger.With().
Str("image", state.Pipeline.Step.Image).
Str("stage", state.Pipeline.Step.Alias).
Err(state.Process.Error).
Int("exit_code", state.Process.ExitCode).
Bool("exited", state.Process.Exited).
Logger()

procState := rpc.State{
Proc: state.Pipeline.Step.Alias,
stepState := rpc.State{
Step: state.Pipeline.Step.Alias,
Exited: state.Process.Exited,
ExitCode: state.Process.ExitCode,
Started: time.Now().Unix(), // TODO do not do this
Finished: time.Now().Unix(),
}
if state.Process.Error != nil {
procState.Error = state.Process.Error.Error()
stepState.Error = state.Process.Error.Error()
}

defer func() {
proclogger.Debug().Msg("update step status")
steplogger.Debug().Msg("update step status")

if uerr := r.client.Update(ctxmeta, work.ID, procState); uerr != nil {
proclogger.Debug().
if uerr := r.client.Update(ctxmeta, work.ID, stepState); uerr != nil {
steplogger.Debug().
Err(uerr).
Msg("update step status error")
}

proclogger.Debug().Msg("update step status complete")
steplogger.Debug().Msg("update step status complete")
}()
if state.Process.Exited {
return nil
Expand All @@ -289,22 +289,29 @@ func (r *Runner) Run(ctx context.Context) error {

// TODO: find better way to update this state and move it to pipeline to have the same env in cli-exec
state.Pipeline.Step.Environment["CI_MACHINE"] = r.hostname

state.Pipeline.Step.Environment["CI_PIPELINE_STATUS"] = "success"
state.Pipeline.Step.Environment["CI_PIPELINE_STARTED"] = strconv.FormatInt(state.Pipeline.Time, 10)
state.Pipeline.Step.Environment["CI_PIPELINE_FINISHED"] = strconv.FormatInt(time.Now().Unix(), 10)

state.Pipeline.Step.Environment["CI_STEP_STATUS"] = "success"
state.Pipeline.Step.Environment["CI_STEP_STARTED"] = strconv.FormatInt(state.Pipeline.Time, 10)
state.Pipeline.Step.Environment["CI_STEP_FINISHED"] = strconv.FormatInt(time.Now().Unix(), 10)

state.Pipeline.Step.Environment["CI_SYSTEM_ARCH"] = runtime.GOOS + "/" + runtime.GOARCH

// DEPRECATED
state.Pipeline.Step.Environment["CI_BUILD_STATUS"] = "success"
state.Pipeline.Step.Environment["CI_BUILD_STARTED"] = strconv.FormatInt(state.Pipeline.Time, 10)
state.Pipeline.Step.Environment["CI_BUILD_FINISHED"] = strconv.FormatInt(time.Now().Unix(), 10)

state.Pipeline.Step.Environment["CI_JOB_STATUS"] = "success"
state.Pipeline.Step.Environment["CI_JOB_STARTED"] = strconv.FormatInt(state.Pipeline.Time, 10)
state.Pipeline.Step.Environment["CI_JOB_FINISHED"] = strconv.FormatInt(time.Now().Unix(), 10)

state.Pipeline.Step.Environment["CI_SYSTEM_ARCH"] = runtime.GOOS + "/" + runtime.GOARCH

if state.Pipeline.Error != nil {
state.Pipeline.Step.Environment["CI_PIPELINE_STATUS"] = "failure"
state.Pipeline.Step.Environment["CI_STEP_STATUS"] = "failure"

// DEPRECATED
state.Pipeline.Step.Environment["CI_BUILD_STATUS"] = "failure"
state.Pipeline.Step.Environment["CI_JOB_STATUS"] = "failure"
Expand Down
2 changes: 1 addition & 1 deletion cli/deploy/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ var Command = &cli.Command{
&cli.StringSliceFlag{
Name: "param",
Aliases: []string{"p"},
Usage: "custom parameters to be injected into the job environment. Format: KEY=value",
Usage: "custom parameters to be injected into the step environment. Format: KEY=value",
},
),
}
Expand Down
10 changes: 5 additions & 5 deletions cli/exec/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func execWithAxis(c *cli.Context, file, repoPath string, axis matrix.Axis) error
metadata := metadataFromContext(c, axis)
environ := metadata.Environ()
var secrets []compiler.Secret
for key, val := range metadata.Job.Matrix {
for key, val := range metadata.Step.Matrix {
environ[key] = val
secrets = append(secrets, compiler.Secret{
Name: key,
Expand Down Expand Up @@ -289,8 +289,8 @@ func metadataFromContext(c *cli.Context, axis matrix.Axis) frontend.Metadata {
},
},
},
Job: frontend.Job{
Number: c.Int("job-number"),
Step: frontend.Step{
Number: c.Int("step-number"),
Matrix: axis,
},
Sys: frontend.System{
Expand All @@ -312,13 +312,13 @@ func convertPathForWindows(path string) string {
return filepath.ToSlash(path)
}

var defaultLogger = pipeline.LogFunc(func(proc *backendTypes.Step, rc multipart.Reader) error {
var defaultLogger = pipeline.LogFunc(func(step *backendTypes.Step, rc multipart.Reader) error {
part, err := rc.NextPart()
if err != nil {
return err
}

logStream := NewLineWriter(proc.Alias)
logStream := NewLineWriter(step.Alias)
_, err = io.Copy(logStream, part)
return err
})
4 changes: 2 additions & 2 deletions cli/exec/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,8 +260,8 @@ var flags = []cli.Flag{
Name: "prev-commit-author-email",
},
&cli.IntFlag{
EnvVars: []string{"CI_JOB_NUMBER"},
Name: "job-number",
EnvVars: []string{"CI_STEP_NUMBER", "CI_JOB_NUMBER"},
Name: "step-number",
},
&cli.StringSliceFlag{
EnvVars: []string{"CI_ENV"},
Expand Down
4 changes: 2 additions & 2 deletions cli/exec/line.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ const (

// Line is a line of console output.
type Line struct {
Proc string `json:"proc,omitempty"`
Step string `json:"step,omitempty"`
Time int64 `json:"time,omitempty"`
Type int `json:"type,omitempty"`
Pos int `json:"pos,omitempty"`
Expand Down Expand Up @@ -66,7 +66,7 @@ func (w *LineWriter) Write(p []byte) (n int, err error) {

line := &Line{
Out: out,
Proc: w.name,
Step: w.name,
Pos: w.num,
Time: int64(time.Since(w.now).Seconds()),
Type: LineStdout,
Expand Down
6 changes: 3 additions & 3 deletions cli/pipeline/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
var pipelineLogsCmd = &cli.Command{
Name: "logs",
Usage: "show pipeline logs",
ArgsUsage: "<repo/name> [pipeline] [job]",
ArgsUsage: "<repo/name> [pipeline] [step]",
Action: pipelineLogs,
Flags: common.GlobalFlags,
}
Expand All @@ -44,7 +44,7 @@ func pipelineLogs(c *cli.Context) error {
return err
}

job, err := strconv.Atoi(c.Args().Get(2))
step, err := strconv.Atoi(c.Args().Get(2))
if err != nil {
return err
}
Expand All @@ -54,7 +54,7 @@ func pipelineLogs(c *cli.Context) error {
return err
}

logs, err := client.PipelineLogs(owner, name, number, job)
logs, err := client.PipelineLogs(owner, name, number, step)
if err != nil {
return err
}
Expand Down
6 changes: 3 additions & 3 deletions cli/pipeline/ps.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ func pipelinePs(c *cli.Context) error {
return err
}

for _, proc := range pipeline.Procs {
for _, child := range proc.Children {
for _, step := range pipeline.Steps {
for _, child := range step.Children {
if err := tmpl.Execute(os.Stdout, child); err != nil {
return err
}
Expand All @@ -88,7 +88,7 @@ func pipelinePs(c *cli.Context) error {
}

// template for pipeline ps information
var tmplPipelinePs = "\x1b[33mProc #{{ .PID }} \x1b[0m" + `
var tmplPipelinePs = "\x1b[33mStep #{{ .PID }} \x1b[0m" + `
Step: {{ .Name }}
State: {{ .State }}
`
4 changes: 2 additions & 2 deletions cli/pipeline/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ var pipelineStartCmd = &cli.Command{
&cli.StringSliceFlag{
Name: "param",
Aliases: []string{"p"},
Usage: "custom parameters to be injected into the job environment. Format: KEY=value",
Usage: "custom parameters to be injected into the step environment. Format: KEY=value",
},
),
}
Expand Down Expand Up @@ -62,7 +62,7 @@ func pipelineStart(c *cli.Context) (err error) {
number = pipeline.Number
} else {
if len(pipelineArg) == 0 {
return errors.New("missing job number")
return errors.New("missing step number")
}
number, err = strconv.Atoi(pipelineArg)
if err != nil {
Expand Down
12 changes: 6 additions & 6 deletions cli/pipeline/stop.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
var pipelineStopCmd = &cli.Command{
Name: "stop",
Usage: "stop a pipeline",
ArgsUsage: "<repo/name> [pipeline] [job]",
ArgsUsage: "<repo/name> [pipeline] [step]",
Flags: common.GlobalFlags,
Action: pipelineStop,
}
Expand All @@ -42,21 +42,21 @@ func pipelineStop(c *cli.Context) (err error) {
if err != nil {
return err
}
job, _ := strconv.Atoi(c.Args().Get(2))
if job == 0 {
job = 1
step, _ := strconv.Atoi(c.Args().Get(2))
if step == 0 {
step = 1
}

client, err := internal.NewClient(c)
if err != nil {
return err
}

err = client.PipelineStop(owner, name, number, job)
err = client.PipelineStop(owner, name, number, step)
if err != nil {
return err
}

fmt.Printf("Stopping pipeline %s/%s#%d.%d\n", owner, name, number, job)
fmt.Printf("Stopping pipeline %s/%s#%d.%d\n", owner, name, number, step)
return nil
}
6 changes: 3 additions & 3 deletions cmd/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func loop(c *cli.Context) error {
log.Logger = log.With().Caller().Logger()
}

counter.Polling = c.Int("max-procs")
counter.Polling = c.Int("max-workflows")
counter.Running = 0

if c.Bool("healthcheck") {
Expand Down Expand Up @@ -139,7 +139,7 @@ func loop(c *cli.Context) error {
backend.Init(context.WithValue(ctx, types.CliContext, c))

var wg sync.WaitGroup
parallel := c.Int("max-procs")
parallel := c.Int("max-workflows")
wg.Add(parallel)

// new engine
Expand Down Expand Up @@ -169,7 +169,7 @@ func loop(c *cli.Context) error {
return
}

log.Debug().Msg("polling new jobs")
log.Debug().Msg("polling new steps")
if err := r.Run(ctx); err != nil {
log.Error().Err(err).Msg("pipeline done with error")
return
Expand Down
4 changes: 2 additions & 2 deletions cmd/agent/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ var flags = []cli.Flag{
Usage: "List of labels to filter tasks on. An agent must be assigned every tag listed in a task to be selected.",
},
&cli.IntFlag{
EnvVars: []string{"WOODPECKER_MAX_PROCS"},
Name: "max-procs",
EnvVars: []string{"WOODPECKER_MAX_WORKFLOWS", "WOODPECKER_MAX_PROCS"},
Name: "max-workflows",
Usage: "agent parallel workflows",
Value: 1,
},
Expand Down
22 changes: 11 additions & 11 deletions cmd/server/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,20 +304,20 @@ func setupCoding(c *cli.Context) (remote.Remote, error) {
}

func setupMetrics(g *errgroup.Group, _store store.Store) {
pendingJobs := promauto.NewGauge(prometheus.GaugeOpts{
pendingSteps := promauto.NewGauge(prometheus.GaugeOpts{
Namespace: "woodpecker",
Name: "pending_jobs",
Help: "Total number of pending pipeline processes.",
Name: "pending_steps",
Help: "Total number of pending pipeline steps.",
})
waitingJobs := promauto.NewGauge(prometheus.GaugeOpts{
waitingSteps := promauto.NewGauge(prometheus.GaugeOpts{
Namespace: "woodpecker",
Name: "waiting_jobs",
Name: "waiting_steps",
Help: "Total number of pipeline waiting on deps.",
})
runningJobs := promauto.NewGauge(prometheus.GaugeOpts{
runningSteps := promauto.NewGauge(prometheus.GaugeOpts{
Namespace: "woodpecker",
Name: "running_jobs",
Help: "Total number of running pipeline processes.",
Name: "running_steps",
Help: "Total number of running pipeline steps.",
})
workers := promauto.NewGauge(prometheus.GaugeOpts{
Namespace: "woodpecker",
Expand All @@ -343,9 +343,9 @@ func setupMetrics(g *errgroup.Group, _store store.Store) {
g.Go(func() error {
for {
stats := server.Config.Services.Queue.Info(context.TODO())
pendingJobs.Set(float64(stats.Stats.Pending))
waitingJobs.Set(float64(stats.Stats.WaitingOnDeps))
runningJobs.Set(float64(stats.Stats.Running))
pendingSteps.Set(float64(stats.Stats.Pending))
waitingSteps.Set(float64(stats.Stats.WaitingOnDeps))
runningSteps.Set(float64(stats.Stats.Running))
workers.Set(float64(stats.Stats.Workers))
time.Sleep(500 * time.Millisecond)
}
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.example.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,4 @@ services:
environment:
- WOODPECKER_SERVER=woodpecker-server:9000
- WOODPECKER_AGENT_SECRET=${WOODPECKER_AGENT_SECRET}
- WOODPECKER_MAX_PROCS=2
- WOODPECKER_MAX_WORKFLOWS=2
Loading

0 comments on commit 36e4291

Please sign in to comment.