Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(status): properly reflect worker status in db #525

Merged
merged 12 commits into from
Nov 1, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 18 additions & 21 deletions cmd/vela-worker/operate.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ package main

import (
"context"
"github.com/go-vela/server/queue"
timhuynh94 marked this conversation as resolved.
Show resolved Hide resolved
"time"

"github.com/go-vela/server/queue"
"github.com/go-vela/types/constants"
"github.com/go-vela/types/library"

Expand Down Expand Up @@ -47,13 +47,10 @@ func (w *Worker) operate(ctx context.Context) error {
}

logrus.Trace("getting queue creds")

// fetching queue credentials using registration token
creds, _, err := w.VelaClient.Queue.GetInfo()
if err != nil {
logrus.Trace("error getting creds")

return err
timhuynh94 marked this conversation as resolved.
Show resolved Hide resolved
}

// set queue address and public key using credentials received from server
Expand All @@ -66,23 +63,8 @@ func (w *Worker) operate(ctx context.Context) error {
w.Queue, err = queue.New(w.Config.Queue)
if err != nil {
logrus.Error("queue setup failed")

registryWorker.SetStatus(constants.WorkerStatusError)
_, resp, logErr := w.VelaClient.Worker.Update(registryWorker.GetHostname(), registryWorker)

if resp == nil {
// log the error instead of returning so the operation doesn't block worker deployment
logrus.Error("worker status update response is nil")
}

if logErr != nil {
if resp != nil {
// log the error instead of returning so the operation doesn't block worker deployment
logrus.Errorf("status code: %v, unable to update worker %s status with the server: %v", resp.StatusCode, registryWorker.GetHostname(), logErr)
}
}

return err
// set to error as queue setup fails
w.updateWorkerStatus(registryWorker, constants.WorkerStatusError)
}

// spawn goroutine for phoning home
Expand Down Expand Up @@ -130,6 +112,15 @@ func (w *Worker) operate(ctx context.Context) error {

continue
}
w.QueueCheckIn, err = w.queueCheckIn(gctx, registryWorker)
timhuynh94 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
// queue check in failed, retry
logrus.Errorf("unable to ping queue %v", err)
logrus.Info("retrying check-in...")

time.Sleep(5 * time.Second)
continue
}

// successful check in breaks the loop
break
Expand Down Expand Up @@ -171,6 +162,12 @@ func (w *Worker) operate(ctx context.Context) error {
logrus.Info("worker not checked in, skipping queue read")
continue
}
// do not pull from queue unless queue setup is done and connected
if !w.QueueCheckIn {
time.Sleep(5 * time.Second)
logrus.Info("queue ping fails, skipping queue read")
timhuynh94 marked this conversation as resolved.
Show resolved Hide resolved
continue
}
select {
case <-gctx.Done():
logrus.WithFields(logrus.Fields{
Expand Down
45 changes: 45 additions & 0 deletions cmd/vela-worker/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package main

import (
"context"
"fmt"
"net/http"

Expand All @@ -19,6 +20,7 @@
_, resp, err := w.VelaClient.Worker.Get(config.GetHostname())
if err != nil {
respErr := fmt.Errorf("unable to retrieve worker %s from the server: %w", config.GetHostname(), err)
// in the event of server is down, the worker status will not be updated
timhuynh94 marked this conversation as resolved.
Show resolved Hide resolved
if resp == nil {
return false, "", respErr
}
Expand All @@ -35,8 +37,12 @@

tkn, _, err := w.VelaClient.Worker.RefreshAuth(config.GetHostname())
if err != nil {
// set to error as check in fails
timhuynh94 marked this conversation as resolved.
Show resolved Hide resolved
w.updateWorkerStatus(config, constants.WorkerStatusError)
return false, "", fmt.Errorf("unable to refresh auth for worker %s on the server: %w", config.GetHostname(), err)
}
// update worker status to Idle when checkIn is successful.
w.updateWorkerStatus(config, constants.WorkerStatusIdle)

return true, tkn.GetToken(), nil
}
Expand All @@ -45,6 +51,8 @@
func (w *Worker) register(config *library.Worker) (bool, string, error) {
logrus.Infof("worker %s not found, registering it with the server", config.GetHostname())

// status Idle will be set for worker upon first time registration
// if worker cannot be registered, no status will be set.
config.SetStatus(constants.WorkerStatusIdle)

tkn, _, err := w.VelaClient.Worker.Add(config)
Expand All @@ -58,3 +66,40 @@
// successfully added the worker so return nil
return true, tkn.GetToken(), nil
}

// queueCheckIn is a helper function to phone home to the redis.
func (w *Worker) queueCheckIn(ctx context.Context, registryWorker *library.Worker) (bool, error) {
pErr := w.Queue.Ping(ctx)

Check failure on line 72 in cmd/vela-worker/register.go

View workflow job for this annotation

GitHub Actions / test

w.Queue.Ping undefined (type queue.Service has no field or method Ping)

Check failure on line 72 in cmd/vela-worker/register.go

View workflow job for this annotation

GitHub Actions / build

w.Queue.Ping undefined (type queue.Service has no field or method Ping)

Check failure on line 72 in cmd/vela-worker/register.go

View workflow job for this annotation

GitHub Actions / build

w.Queue.Ping undefined (type queue.Service has no field or method Ping)

Check failure on line 72 in cmd/vela-worker/register.go

View workflow job for this annotation

GitHub Actions / test

w.Queue.Ping undefined (type queue.Service has no field or method Ping)
if pErr != nil {
logrus.Errorf("worker %s unable to contact the queue: %v", registryWorker.GetHostname(), pErr)
// set status to error as queue is not available
w.updateWorkerStatus(registryWorker, constants.WorkerStatusError)

return false, pErr
}

// update worker status to Idle when setup and ping are good.
w.updateWorkerStatus(registryWorker, constants.WorkerStatusIdle)

return true, nil
}

// updateWorkerStatus is a helper function to update worker status
// logs the error if it can't update status
func (w *Worker) updateWorkerStatus(config *library.Worker, status string) {
config.SetStatus(status)
_, resp, logErr := w.VelaClient.Worker.Update(config.GetHostname(), config)

if resp == nil {
// log the error instead of returning so the operation doesn't block worker deployment
logrus.Error("worker status update response is nil")
}

if logErr != nil {
timhuynh94 marked this conversation as resolved.
Show resolved Hide resolved
if resp != nil {
// log the error instead of returning so the operation doesn't block worker deployment
logrus.Errorf("status code: %v, unable to update worker %s status with the server: %v",
resp.StatusCode, config.GetHostname(), logErr)
}
}
}
1 change: 1 addition & 0 deletions cmd/vela-worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ type (
VelaClient *vela.Client
RegisterToken chan string
CheckedIn bool
QueueCheckIn bool
timhuynh94 marked this conversation as resolved.
Show resolved Hide resolved
RunningBuildIDs []string
RunningBuildIDsMutex sync.Mutex
}
Expand Down
Loading