diff --git a/cmd/vela-worker/operate.go b/cmd/vela-worker/operate.go index b0e50b09..5f1170bf 100644 --- a/cmd/vela-worker/operate.go +++ b/cmd/vela-worker/operate.go @@ -9,7 +9,6 @@ import ( "github.com/go-vela/server/queue" "github.com/go-vela/types/constants" "github.com/go-vela/types/library" - "github.com/sirupsen/logrus" "golang.org/x/sync/errgroup" @@ -47,12 +46,10 @@ func (w *Worker) operate(ctx context.Context) error { } logrus.Trace("getting queue creds") - // fetching queue credentials using registration token creds, _, err := w.VelaClient.Queue.GetInfo() if err != nil { logrus.Trace("error getting creds") - return err } @@ -66,23 +63,8 @@ func (w *Worker) operate(ctx context.Context) error { w.Queue, err = queue.New(w.Config.Queue) if err != nil { logrus.Error("queue setup failed") - - registryWorker.SetStatus(constants.WorkerStatusError) - _, resp, logErr := w.VelaClient.Worker.Update(registryWorker.GetHostname(), registryWorker) - - if resp == nil { - // log the error instead of returning so the operation doesn't block worker deployment - logrus.Error("worker status update response is nil") - } - - if logErr != nil { - if resp != nil { - // log the error instead of returning so the operation doesn't block worker deployment - logrus.Errorf("status code: %v, unable to update worker %s status with the server: %v", resp.StatusCode, registryWorker.GetHostname(), logErr) - } - } - - return err + // set to error as queue setup fails + w.updateWorkerStatus(registryWorker, constants.WorkerStatusError) } // spawn goroutine for phoning home @@ -130,6 +112,15 @@ func (w *Worker) operate(ctx context.Context) error { continue } + w.QueueCheckedIn, err = w.queueCheckIn(gctx, registryWorker) + if err != nil { + // queue check in failed, retry + logrus.Errorf("unable to ping queue %v", err) + logrus.Info("retrying check-in...") + + time.Sleep(5 * time.Second) + continue + } // successful check in breaks the loop break @@ -171,6 +162,12 @@ func (w *Worker) operate(ctx context.Context) error { logrus.Info("worker not checked in, skipping queue read") continue } + // do not pull from queue unless queue setup is done and connected + if !w.QueueCheckedIn { + time.Sleep(5 * time.Second) + logrus.Info("queue ping failed, skipping queue read") + continue + } select { case <-gctx.Done(): logrus.WithFields(logrus.Fields{ diff --git a/cmd/vela-worker/register.go b/cmd/vela-worker/register.go index 76ffab99..ae07a369 100644 --- a/cmd/vela-worker/register.go +++ b/cmd/vela-worker/register.go @@ -3,6 +3,7 @@ package main import ( + "context" "fmt" "net/http" @@ -19,6 +20,7 @@ func (w *Worker) checkIn(config *library.Worker) (bool, string, error) { _, resp, err := w.VelaClient.Worker.Get(config.GetHostname()) if err != nil { respErr := fmt.Errorf("unable to retrieve worker %s from the server: %w", config.GetHostname(), err) + // if server is down, the worker status will not be updated if resp == nil { return false, "", respErr } @@ -35,8 +37,12 @@ func (w *Worker) checkIn(config *library.Worker) (bool, string, error) { tkn, _, err := w.VelaClient.Worker.RefreshAuth(config.GetHostname()) if err != nil { + // set to error when check in fails + w.updateWorkerStatus(config, constants.WorkerStatusError) return false, "", fmt.Errorf("unable to refresh auth for worker %s on the server: %w", config.GetHostname(), err) } + // update worker status to Idle when checkIn is successful. + w.updateWorkerStatus(config, constants.WorkerStatusIdle) return true, tkn.GetToken(), nil } @@ -45,6 +51,8 @@ func (w *Worker) checkIn(config *library.Worker) (bool, string, error) { func (w *Worker) register(config *library.Worker) (bool, string, error) { logrus.Infof("worker %s not found, registering it with the server", config.GetHostname()) + // status Idle will be set for worker upon first time registration + // if worker cannot be registered, no status will be set. config.SetStatus(constants.WorkerStatusIdle) tkn, _, err := w.VelaClient.Worker.Add(config) @@ -58,3 +66,45 @@ func (w *Worker) register(config *library.Worker) (bool, string, error) { // successfully added the worker so return nil return true, tkn.GetToken(), nil } + +// queueCheckIn is a helper function to phone home to the redis. +func (w *Worker) queueCheckIn(ctx context.Context, registryWorker *library.Worker) (bool, error) { + pErr := w.Queue.Ping(ctx) + if pErr != nil { + logrus.Errorf("worker %s unable to contact the queue: %v", registryWorker.GetHostname(), pErr) + // set status to error as queue is not available + w.updateWorkerStatus(registryWorker, constants.WorkerStatusError) + + return false, pErr + } + + // update worker status to Idle when setup and ping are good. + w.updateWorkerStatus(registryWorker, constants.WorkerStatusIdle) + + return true, nil +} + +// updateWorkerStatus is a helper function to update worker status +// logs the error if it can't update status +func (w *Worker) updateWorkerStatus(config *library.Worker, status string) { + config.SetStatus(status) + _, resp, logErr := w.VelaClient.Worker.Update(config.GetHostname(), config) + + if resp == nil { + // log the error instead of returning so the operation doesn't block worker deployment + logrus.Error("worker status update response is nil") + } + + if logErr != nil { + if resp != nil { + // log the error instead of returning so the operation doesn't block worker deployment + logrus.Errorf("status code: %v, unable to update worker %s status with the server: %v", + resp.StatusCode, config.GetHostname(), logErr) + } + if resp == nil { + // log the error instead of returning so the operation doesn't block worker deployment + logrus.Errorf("worker status update response is nil, unable to update worker %s status with the server: %v", + config.GetHostname(), logErr) + } + } +} diff --git a/cmd/vela-worker/worker.go b/cmd/vela-worker/worker.go index 19c6c41b..af03ff77 100644 --- a/cmd/vela-worker/worker.go +++ b/cmd/vela-worker/worker.go @@ -68,6 +68,7 @@ type ( VelaClient *vela.Client RegisterToken chan string CheckedIn bool + QueueCheckedIn bool RunningBuildIDs []string RunningBuildIDsMutex sync.Mutex } diff --git a/go.mod b/go.mod index 33ac9610..bf46316d 100644 --- a/go.mod +++ b/go.mod @@ -9,8 +9,8 @@ require ( github.com/docker/go-units v0.5.0 github.com/gin-gonic/gin v1.9.1 github.com/go-vela/sdk-go v0.21.0 - github.com/go-vela/server v0.21.1-0.20231017135815-bb35e76f6056 - github.com/go-vela/types v0.21.1-0.20231012142227-0c0b890487af + github.com/go-vela/server v0.21.1-0.20231031195602-6708cea79580 + github.com/go-vela/types v0.21.1-0.20231024201126-19101a5b1346 github.com/golang-jwt/jwt/v5 v5.0.0 github.com/google/go-cmp v0.5.9 github.com/joho/godotenv v1.5.1 diff --git a/go.sum b/go.sum index 6b217251..1b40c434 100644 --- a/go.sum +++ b/go.sum @@ -152,10 +152,10 @@ github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEe github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572/go.mod h1:9Pwr4B2jHnOSGXyyzV8ROjYa2ojvAY6HCGYYfMoC3Ls= github.com/go-vela/sdk-go v0.21.0 h1:Hedak1Yk9rGn3ZBOvLvxLrMcyvBf3+RB6/wMgHNxyxw= github.com/go-vela/sdk-go v0.21.0/go.mod h1:fNMQxSqBCXQH6bK3Ej0aCj/iugEDZNEIWW3Xj/m22AQ= -github.com/go-vela/server v0.21.1-0.20231017135815-bb35e76f6056 h1:TqLmvWRU3sqflw7kRUxDRw4H5g9JupLIp0JAGI8biG8= -github.com/go-vela/server v0.21.1-0.20231017135815-bb35e76f6056/go.mod h1:SFAAje/TsPxW+9iDo38CotLX0ralvPRLxbaS9fffT+A= -github.com/go-vela/types v0.21.1-0.20231012142227-0c0b890487af h1:qiP6pXFDyPDDP+hy8zY+nhmoWv9aoQrrnNmfAAT6yCA= -github.com/go-vela/types v0.21.1-0.20231012142227-0c0b890487af/go.mod h1:Jn8K28uj7mACc55fkFgaIzL0q45iXydOFGEeoSeHUtQ= +github.com/go-vela/server v0.21.1-0.20231031195602-6708cea79580 h1:u1B9VAj6eMouR37zRXQ7ThXrXu282ud6+rvefQa8TOE= +github.com/go-vela/server v0.21.1-0.20231031195602-6708cea79580/go.mod h1:1nQQLHtK1EJWW/0sQ/62ryYsg4OccDt8JRmbzoIe+IA= +github.com/go-vela/types v0.21.1-0.20231024201126-19101a5b1346 h1:8VqRJ02KcAxV+gHuxLzuPuNaf7EOE/zfBomEV+UPj/E= +github.com/go-vela/types v0.21.1-0.20231024201126-19101a5b1346/go.mod h1:Jn8K28uj7mACc55fkFgaIzL0q45iXydOFGEeoSeHUtQ= github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU= github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=