Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(status): properly reflect worker status in db #525

Merged
merged 12 commits into from
Nov 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 17 additions & 20 deletions cmd/vela-worker/operate.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/go-vela/server/queue"
"github.com/go-vela/types/constants"
"github.com/go-vela/types/library"

"github.com/sirupsen/logrus"

"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -47,12 +46,10 @@ func (w *Worker) operate(ctx context.Context) error {
}

logrus.Trace("getting queue creds")

// fetching queue credentials using registration token
creds, _, err := w.VelaClient.Queue.GetInfo()
if err != nil {
logrus.Trace("error getting creds")

return err
}

Expand All @@ -66,23 +63,8 @@ func (w *Worker) operate(ctx context.Context) error {
w.Queue, err = queue.New(w.Config.Queue)
if err != nil {
logrus.Error("queue setup failed")

registryWorker.SetStatus(constants.WorkerStatusError)
_, resp, logErr := w.VelaClient.Worker.Update(registryWorker.GetHostname(), registryWorker)

if resp == nil {
// log the error instead of returning so the operation doesn't block worker deployment
logrus.Error("worker status update response is nil")
}

if logErr != nil {
if resp != nil {
// log the error instead of returning so the operation doesn't block worker deployment
logrus.Errorf("status code: %v, unable to update worker %s status with the server: %v", resp.StatusCode, registryWorker.GetHostname(), logErr)
}
}

return err
// set to error as queue setup fails
w.updateWorkerStatus(registryWorker, constants.WorkerStatusError)
}

// spawn goroutine for phoning home
Expand Down Expand Up @@ -130,6 +112,15 @@ func (w *Worker) operate(ctx context.Context) error {

continue
}
w.QueueCheckedIn, err = w.queueCheckIn(gctx, registryWorker)
if err != nil {
// queue check in failed, retry
logrus.Errorf("unable to ping queue %v", err)
logrus.Info("retrying check-in...")

time.Sleep(5 * time.Second)
continue
}

// successful check in breaks the loop
break
Expand Down Expand Up @@ -171,6 +162,12 @@ func (w *Worker) operate(ctx context.Context) error {
logrus.Info("worker not checked in, skipping queue read")
continue
}
// do not pull from queue unless queue setup is done and connected
if !w.QueueCheckedIn {
time.Sleep(5 * time.Second)
logrus.Info("queue ping failed, skipping queue read")
continue
}
select {
case <-gctx.Done():
logrus.WithFields(logrus.Fields{
Expand Down
50 changes: 50 additions & 0 deletions cmd/vela-worker/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package main

import (
"context"
"fmt"
"net/http"

Expand All @@ -19,6 +20,7 @@ func (w *Worker) checkIn(config *library.Worker) (bool, string, error) {
_, resp, err := w.VelaClient.Worker.Get(config.GetHostname())
if err != nil {
respErr := fmt.Errorf("unable to retrieve worker %s from the server: %w", config.GetHostname(), err)
// if server is down, the worker status will not be updated
if resp == nil {
return false, "", respErr
}
Expand All @@ -35,8 +37,12 @@ func (w *Worker) checkIn(config *library.Worker) (bool, string, error) {

tkn, _, err := w.VelaClient.Worker.RefreshAuth(config.GetHostname())
if err != nil {
// set to error when check in fails
w.updateWorkerStatus(config, constants.WorkerStatusError)
return false, "", fmt.Errorf("unable to refresh auth for worker %s on the server: %w", config.GetHostname(), err)
}
// update worker status to Idle when checkIn is successful.
w.updateWorkerStatus(config, constants.WorkerStatusIdle)

return true, tkn.GetToken(), nil
}
Expand All @@ -45,6 +51,8 @@ func (w *Worker) checkIn(config *library.Worker) (bool, string, error) {
func (w *Worker) register(config *library.Worker) (bool, string, error) {
logrus.Infof("worker %s not found, registering it with the server", config.GetHostname())

// status Idle will be set for worker upon first time registration
// if worker cannot be registered, no status will be set.
config.SetStatus(constants.WorkerStatusIdle)

tkn, _, err := w.VelaClient.Worker.Add(config)
Expand All @@ -58,3 +66,45 @@ func (w *Worker) register(config *library.Worker) (bool, string, error) {
// successfully added the worker so return nil
return true, tkn.GetToken(), nil
}

// queueCheckIn is a helper function to phone home to the redis.
func (w *Worker) queueCheckIn(ctx context.Context, registryWorker *library.Worker) (bool, error) {
pErr := w.Queue.Ping(ctx)
if pErr != nil {
logrus.Errorf("worker %s unable to contact the queue: %v", registryWorker.GetHostname(), pErr)
// set status to error as queue is not available
w.updateWorkerStatus(registryWorker, constants.WorkerStatusError)

return false, pErr
}

// update worker status to Idle when setup and ping are good.
w.updateWorkerStatus(registryWorker, constants.WorkerStatusIdle)

return true, nil
}

// updateWorkerStatus is a helper function to update worker status
// logs the error if it can't update status
func (w *Worker) updateWorkerStatus(config *library.Worker, status string) {
config.SetStatus(status)
_, resp, logErr := w.VelaClient.Worker.Update(config.GetHostname(), config)

if resp == nil {
// log the error instead of returning so the operation doesn't block worker deployment
logrus.Error("worker status update response is nil")
}

if logErr != nil {
timhuynh94 marked this conversation as resolved.
Show resolved Hide resolved
if resp != nil {
// log the error instead of returning so the operation doesn't block worker deployment
logrus.Errorf("status code: %v, unable to update worker %s status with the server: %v",
resp.StatusCode, config.GetHostname(), logErr)
}
if resp == nil {
// log the error instead of returning so the operation doesn't block worker deployment
logrus.Errorf("worker status update response is nil, unable to update worker %s status with the server: %v",
config.GetHostname(), logErr)
}
}
}
1 change: 1 addition & 0 deletions cmd/vela-worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ type (
VelaClient *vela.Client
RegisterToken chan string
CheckedIn bool
QueueCheckedIn bool
RunningBuildIDs []string
RunningBuildIDsMutex sync.Mutex
}
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ require (
github.com/docker/go-units v0.5.0
github.com/gin-gonic/gin v1.9.1
github.com/go-vela/sdk-go v0.21.0
github.com/go-vela/server v0.21.1-0.20231017135815-bb35e76f6056
github.com/go-vela/types v0.21.1-0.20231012142227-0c0b890487af
github.com/go-vela/server v0.21.1-0.20231031195602-6708cea79580
github.com/go-vela/types v0.21.1-0.20231024201126-19101a5b1346
github.com/golang-jwt/jwt/v5 v5.0.0
github.com/google/go-cmp v0.5.9
github.com/joho/godotenv v1.5.1
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,10 @@ github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEe
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572/go.mod h1:9Pwr4B2jHnOSGXyyzV8ROjYa2ojvAY6HCGYYfMoC3Ls=
github.com/go-vela/sdk-go v0.21.0 h1:Hedak1Yk9rGn3ZBOvLvxLrMcyvBf3+RB6/wMgHNxyxw=
github.com/go-vela/sdk-go v0.21.0/go.mod h1:fNMQxSqBCXQH6bK3Ej0aCj/iugEDZNEIWW3Xj/m22AQ=
github.com/go-vela/server v0.21.1-0.20231017135815-bb35e76f6056 h1:TqLmvWRU3sqflw7kRUxDRw4H5g9JupLIp0JAGI8biG8=
github.com/go-vela/server v0.21.1-0.20231017135815-bb35e76f6056/go.mod h1:SFAAje/TsPxW+9iDo38CotLX0ralvPRLxbaS9fffT+A=
github.com/go-vela/types v0.21.1-0.20231012142227-0c0b890487af h1:qiP6pXFDyPDDP+hy8zY+nhmoWv9aoQrrnNmfAAT6yCA=
github.com/go-vela/types v0.21.1-0.20231012142227-0c0b890487af/go.mod h1:Jn8K28uj7mACc55fkFgaIzL0q45iXydOFGEeoSeHUtQ=
github.com/go-vela/server v0.21.1-0.20231031195602-6708cea79580 h1:u1B9VAj6eMouR37zRXQ7ThXrXu282ud6+rvefQa8TOE=
github.com/go-vela/server v0.21.1-0.20231031195602-6708cea79580/go.mod h1:1nQQLHtK1EJWW/0sQ/62ryYsg4OccDt8JRmbzoIe+IA=
github.com/go-vela/types v0.21.1-0.20231024201126-19101a5b1346 h1:8VqRJ02KcAxV+gHuxLzuPuNaf7EOE/zfBomEV+UPj/E=
github.com/go-vela/types v0.21.1-0.20231024201126-19101a5b1346/go.mod h1:Jn8K28uj7mACc55fkFgaIzL0q45iXydOFGEeoSeHUtQ=
github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU=
github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
Expand Down