Skip to content

Commit

Permalink
fix(status): properly reflect worker status in db (#525)
Browse files Browse the repository at this point in the history
* pre work

* working code

* cleanup

* reset docker compose

* Update cmd/vela-worker/register.go

Co-authored-by: Jacob Floyd <[email protected]>

* Update cmd/vela-worker/register.go

Co-authored-by: Jacob Floyd <[email protected]>

* Update cmd/vela-worker/operate.go

Co-authored-by: Jacob Floyd <[email protected]>

* fix typos

* fix import

* add logging

* make clean

---------

Co-authored-by: TimHuynh <[email protected]>
Co-authored-by: Jacob Floyd <[email protected]>
  • Loading branch information
3 people authored Nov 1, 2023
1 parent 6494030 commit 3249252
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 26 deletions.
37 changes: 17 additions & 20 deletions cmd/vela-worker/operate.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/go-vela/server/queue"
"github.com/go-vela/types/constants"
"github.com/go-vela/types/library"

"github.com/sirupsen/logrus"

"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -47,12 +46,10 @@ func (w *Worker) operate(ctx context.Context) error {
}

logrus.Trace("getting queue creds")

// fetching queue credentials using registration token
creds, _, err := w.VelaClient.Queue.GetInfo()
if err != nil {
logrus.Trace("error getting creds")

return err
}

Expand All @@ -66,23 +63,8 @@ func (w *Worker) operate(ctx context.Context) error {
w.Queue, err = queue.New(w.Config.Queue)
if err != nil {
logrus.Error("queue setup failed")

registryWorker.SetStatus(constants.WorkerStatusError)
_, resp, logErr := w.VelaClient.Worker.Update(registryWorker.GetHostname(), registryWorker)

if resp == nil {
// log the error instead of returning so the operation doesn't block worker deployment
logrus.Error("worker status update response is nil")
}

if logErr != nil {
if resp != nil {
// log the error instead of returning so the operation doesn't block worker deployment
logrus.Errorf("status code: %v, unable to update worker %s status with the server: %v", resp.StatusCode, registryWorker.GetHostname(), logErr)
}
}

return err
// set to error as queue setup fails
w.updateWorkerStatus(registryWorker, constants.WorkerStatusError)
}

// spawn goroutine for phoning home
Expand Down Expand Up @@ -130,6 +112,15 @@ func (w *Worker) operate(ctx context.Context) error {

continue
}
w.QueueCheckedIn, err = w.queueCheckIn(gctx, registryWorker)
if err != nil {
// queue check in failed, retry
logrus.Errorf("unable to ping queue %v", err)
logrus.Info("retrying check-in...")

time.Sleep(5 * time.Second)
continue
}

// successful check in breaks the loop
break
Expand Down Expand Up @@ -171,6 +162,12 @@ func (w *Worker) operate(ctx context.Context) error {
logrus.Info("worker not checked in, skipping queue read")
continue
}
// do not pull from queue unless queue setup is done and connected
if !w.QueueCheckedIn {
time.Sleep(5 * time.Second)
logrus.Info("queue ping failed, skipping queue read")
continue
}
select {
case <-gctx.Done():
logrus.WithFields(logrus.Fields{
Expand Down
50 changes: 50 additions & 0 deletions cmd/vela-worker/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package main

import (
"context"
"fmt"
"net/http"

Expand All @@ -19,6 +20,7 @@ func (w *Worker) checkIn(config *library.Worker) (bool, string, error) {
_, resp, err := w.VelaClient.Worker.Get(config.GetHostname())
if err != nil {
respErr := fmt.Errorf("unable to retrieve worker %s from the server: %w", config.GetHostname(), err)
// if server is down, the worker status will not be updated
if resp == nil {
return false, "", respErr
}
Expand All @@ -35,8 +37,12 @@ func (w *Worker) checkIn(config *library.Worker) (bool, string, error) {

tkn, _, err := w.VelaClient.Worker.RefreshAuth(config.GetHostname())
if err != nil {
// set to error when check in fails
w.updateWorkerStatus(config, constants.WorkerStatusError)
return false, "", fmt.Errorf("unable to refresh auth for worker %s on the server: %w", config.GetHostname(), err)
}
// update worker status to Idle when checkIn is successful.
w.updateWorkerStatus(config, constants.WorkerStatusIdle)

return true, tkn.GetToken(), nil
}
Expand All @@ -45,6 +51,8 @@ func (w *Worker) checkIn(config *library.Worker) (bool, string, error) {
func (w *Worker) register(config *library.Worker) (bool, string, error) {
logrus.Infof("worker %s not found, registering it with the server", config.GetHostname())

// status Idle will be set for worker upon first time registration
// if worker cannot be registered, no status will be set.
config.SetStatus(constants.WorkerStatusIdle)

tkn, _, err := w.VelaClient.Worker.Add(config)
Expand All @@ -58,3 +66,45 @@ func (w *Worker) register(config *library.Worker) (bool, string, error) {
// successfully added the worker so return nil
return true, tkn.GetToken(), nil
}

// queueCheckIn is a helper function to phone home to the redis.
func (w *Worker) queueCheckIn(ctx context.Context, registryWorker *library.Worker) (bool, error) {
pErr := w.Queue.Ping(ctx)
if pErr != nil {
logrus.Errorf("worker %s unable to contact the queue: %v", registryWorker.GetHostname(), pErr)
// set status to error as queue is not available
w.updateWorkerStatus(registryWorker, constants.WorkerStatusError)

return false, pErr
}

// update worker status to Idle when setup and ping are good.
w.updateWorkerStatus(registryWorker, constants.WorkerStatusIdle)

return true, nil
}

// updateWorkerStatus is a helper function to update worker status
// logs the error if it can't update status
func (w *Worker) updateWorkerStatus(config *library.Worker, status string) {
config.SetStatus(status)
_, resp, logErr := w.VelaClient.Worker.Update(config.GetHostname(), config)

if resp == nil {
// log the error instead of returning so the operation doesn't block worker deployment
logrus.Error("worker status update response is nil")
}

if logErr != nil {
if resp != nil {
// log the error instead of returning so the operation doesn't block worker deployment
logrus.Errorf("status code: %v, unable to update worker %s status with the server: %v",
resp.StatusCode, config.GetHostname(), logErr)
}
if resp == nil {
// log the error instead of returning so the operation doesn't block worker deployment
logrus.Errorf("worker status update response is nil, unable to update worker %s status with the server: %v",
config.GetHostname(), logErr)
}
}
}
1 change: 1 addition & 0 deletions cmd/vela-worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ type (
VelaClient *vela.Client
RegisterToken chan string
CheckedIn bool
QueueCheckedIn bool
RunningBuildIDs []string
RunningBuildIDsMutex sync.Mutex
}
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ require (
github.com/docker/go-units v0.5.0
github.com/gin-gonic/gin v1.9.1
github.com/go-vela/sdk-go v0.21.0
github.com/go-vela/server v0.21.1-0.20231017135815-bb35e76f6056
github.com/go-vela/types v0.21.1-0.20231012142227-0c0b890487af
github.com/go-vela/server v0.21.1-0.20231031195602-6708cea79580
github.com/go-vela/types v0.21.1-0.20231024201126-19101a5b1346
github.com/golang-jwt/jwt/v5 v5.0.0
github.com/google/go-cmp v0.5.9
github.com/joho/godotenv v1.5.1
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,10 @@ github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEe
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572/go.mod h1:9Pwr4B2jHnOSGXyyzV8ROjYa2ojvAY6HCGYYfMoC3Ls=
github.com/go-vela/sdk-go v0.21.0 h1:Hedak1Yk9rGn3ZBOvLvxLrMcyvBf3+RB6/wMgHNxyxw=
github.com/go-vela/sdk-go v0.21.0/go.mod h1:fNMQxSqBCXQH6bK3Ej0aCj/iugEDZNEIWW3Xj/m22AQ=
github.com/go-vela/server v0.21.1-0.20231017135815-bb35e76f6056 h1:TqLmvWRU3sqflw7kRUxDRw4H5g9JupLIp0JAGI8biG8=
github.com/go-vela/server v0.21.1-0.20231017135815-bb35e76f6056/go.mod h1:SFAAje/TsPxW+9iDo38CotLX0ralvPRLxbaS9fffT+A=
github.com/go-vela/types v0.21.1-0.20231012142227-0c0b890487af h1:qiP6pXFDyPDDP+hy8zY+nhmoWv9aoQrrnNmfAAT6yCA=
github.com/go-vela/types v0.21.1-0.20231012142227-0c0b890487af/go.mod h1:Jn8K28uj7mACc55fkFgaIzL0q45iXydOFGEeoSeHUtQ=
github.com/go-vela/server v0.21.1-0.20231031195602-6708cea79580 h1:u1B9VAj6eMouR37zRXQ7ThXrXu282ud6+rvefQa8TOE=
github.com/go-vela/server v0.21.1-0.20231031195602-6708cea79580/go.mod h1:1nQQLHtK1EJWW/0sQ/62ryYsg4OccDt8JRmbzoIe+IA=
github.com/go-vela/types v0.21.1-0.20231024201126-19101a5b1346 h1:8VqRJ02KcAxV+gHuxLzuPuNaf7EOE/zfBomEV+UPj/E=
github.com/go-vela/types v0.21.1-0.20231024201126-19101a5b1346/go.mod h1:Jn8K28uj7mACc55fkFgaIzL0q45iXydOFGEeoSeHUtQ=
github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU=
github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
Expand Down

0 comments on commit 3249252

Please sign in to comment.