Skip to content

Commit

Permalink
enhance: add context to Workers (#940)
Browse files Browse the repository at this point in the history
Co-authored-by: David May <[email protected]>
  • Loading branch information
plyr4 and wass3r authored Sep 1, 2023
1 parent a3912ea commit 38fae72
Show file tree
Hide file tree
Showing 35 changed files with 160 additions and 56 deletions.
2 changes: 1 addition & 1 deletion api/build/cancel.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func CancelBuild(c *gin.Context) {
switch b.GetStatus() {
case constants.StatusRunning:
// retrieve the worker info
w, err := database.FromContext(c).GetWorkerForHostname(b.GetHost())
w, err := database.FromContext(c).GetWorkerForHostname(ctx, b.GetHost())
if err != nil {
retErr := fmt.Errorf("unable to get worker for build %s: %w", entry, err)
util.HandleError(c, http.StatusNotFound, retErr)
Expand Down
2 changes: 1 addition & 1 deletion api/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ func recordGauges(c *gin.Context) {
// worker_build_limit, active_worker_count, inactive_worker_count, idle_worker_count, available_worker_count, busy_worker_count, error_worker_count
if q.WorkerBuildLimit || q.ActiveWorkerCount || q.InactiveWorkerCount || q.IdleWorkerCount || q.AvailableWorkerCount || q.BusyWorkerCount || q.ErrorWorkerCount {
// send API call to capture the workers
workers, err := database.FromContext(c).ListWorkers()
workers, err := database.FromContext(c).ListWorkers(ctx)
if err != nil {
logrus.Errorf("unable to get workers: %v", err)
}
Expand Down
3 changes: 2 additions & 1 deletion api/worker/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func CreateWorker(c *gin.Context) {
// capture middleware values
u := user.Retrieve(c)
cl := claims.Retrieve(c)
ctx := c.Request.Context()

// capture body from API request
input := new(library.Worker)
Expand Down Expand Up @@ -89,7 +90,7 @@ func CreateWorker(c *gin.Context) {
"worker": input.GetHostname(),
}).Infof("creating new worker %s", input.GetHostname())

_, err = database.FromContext(c).CreateWorker(input)
_, err = database.FromContext(c).CreateWorker(ctx, input)
if err != nil {
retErr := fmt.Errorf("unable to create worker: %w", err)

Expand Down
3 changes: 2 additions & 1 deletion api/worker/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func DeleteWorker(c *gin.Context) {
// capture middleware values
u := user.Retrieve(c)
w := worker.Retrieve(c)
ctx := c.Request.Context()

// update engine logger with API metadata
//
Expand All @@ -57,7 +58,7 @@ func DeleteWorker(c *gin.Context) {
}).Infof("deleting worker %s", w.GetHostname())

// send API call to remove the step
err := database.FromContext(c).DeleteWorker(w)
err := database.FromContext(c).DeleteWorker(ctx, w)
if err != nil {
retErr := fmt.Errorf("unable to delete worker %s: %w", w.GetHostname(), err)

Expand Down
3 changes: 2 additions & 1 deletion api/worker/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func GetWorker(c *gin.Context) {
// capture middleware values
u := user.Retrieve(c)
w := worker.Retrieve(c)
ctx := c.Request.Context()

// update engine logger with API metadata
//
Expand All @@ -56,7 +57,7 @@ func GetWorker(c *gin.Context) {
"worker": w.GetHostname(),
}).Infof("reading worker %s", w.GetHostname())

w, err := database.FromContext(c).GetWorkerForHostname(w.GetHostname())
w, err := database.FromContext(c).GetWorkerForHostname(ctx, w.GetHostname())
if err != nil {
retErr := fmt.Errorf("unable to get workers: %w", err)

Expand Down
3 changes: 2 additions & 1 deletion api/worker/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
func ListWorkers(c *gin.Context) {
// capture middleware values
u := user.Retrieve(c)
ctx := c.Request.Context()

// update engine logger with API metadata
//
Expand All @@ -49,7 +50,7 @@ func ListWorkers(c *gin.Context) {
"user": u.GetName(),
}).Info("reading workers")

w, err := database.FromContext(c).ListWorkers()
w, err := database.FromContext(c).ListWorkers(ctx)
if err != nil {
retErr := fmt.Errorf("unable to get workers: %w", err)

Expand Down
3 changes: 2 additions & 1 deletion api/worker/refresh.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func Refresh(c *gin.Context) {
// capture middleware values
w := worker.Retrieve(c)
cl := claims.Retrieve(c)
ctx := c.Request.Context()

// if we are not using a symmetric token, and the subject does not match the input, request should be denied
if !strings.EqualFold(cl.TokenType, constants.ServerWorkerTokenType) && !strings.EqualFold(cl.Subject, w.GetHostname()) {
Expand All @@ -79,7 +80,7 @@ func Refresh(c *gin.Context) {
w.SetLastCheckedIn(time.Now().Unix())

// send API call to update the worker
_, err := database.FromContext(c).UpdateWorker(w)
_, err := database.FromContext(c).UpdateWorker(ctx, w)
if err != nil {
retErr := fmt.Errorf("unable to update worker %s: %w", w.GetHostname(), err)

Expand Down
3 changes: 2 additions & 1 deletion api/worker/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func UpdateWorker(c *gin.Context) {
// capture middleware values
u := user.Retrieve(c)
w := worker.Retrieve(c)
ctx := c.Request.Context()

// update engine logger with API metadata
//
Expand Down Expand Up @@ -124,7 +125,7 @@ func UpdateWorker(c *gin.Context) {
}

// send API call to update the worker
w, err = database.FromContext(c).UpdateWorker(w)
w, err = database.FromContext(c).UpdateWorker(ctx, w)
if err != nil {
retErr := fmt.Errorf("unable to update worker %s: %w", w.GetHostname(), err)

Expand Down
12 changes: 6 additions & 6 deletions database/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1760,15 +1760,15 @@ func testWorkers(t *testing.T, db Interface, resources *Resources) {

// create the workers
for _, worker := range resources.Workers {
_, err := db.CreateWorker(worker)
_, err := db.CreateWorker(context.TODO(), worker)
if err != nil {
t.Errorf("unable to create worker %d: %v", worker.GetID(), err)
}
}
methods["CreateWorker"] = true

// count the workers
count, err := db.CountWorkers()
count, err := db.CountWorkers(context.TODO())
if err != nil {
t.Errorf("unable to count workers: %v", err)
}
Expand All @@ -1778,7 +1778,7 @@ func testWorkers(t *testing.T, db Interface, resources *Resources) {
methods["CountWorkers"] = true

// list the workers
list, err := db.ListWorkers()
list, err := db.ListWorkers(context.TODO())
if err != nil {
t.Errorf("unable to list workers: %v", err)
}
Expand All @@ -1789,7 +1789,7 @@ func testWorkers(t *testing.T, db Interface, resources *Resources) {

// lookup the workers by hostname
for _, worker := range resources.Workers {
got, err := db.GetWorkerForHostname(worker.GetHostname())
got, err := db.GetWorkerForHostname(context.TODO(), worker.GetHostname())
if err != nil {
t.Errorf("unable to get worker %d by hostname: %v", worker.GetID(), err)
}
Expand All @@ -1802,7 +1802,7 @@ func testWorkers(t *testing.T, db Interface, resources *Resources) {
// update the workers
for _, worker := range resources.Workers {
worker.SetActive(false)
got, err := db.UpdateWorker(worker)
got, err := db.UpdateWorker(context.TODO(), worker)
if err != nil {
t.Errorf("unable to update worker %d: %v", worker.GetID(), err)
}
Expand All @@ -1816,7 +1816,7 @@ func testWorkers(t *testing.T, db Interface, resources *Resources) {

// delete the workers
for _, worker := range resources.Workers {
err = db.DeleteWorker(worker)
err = db.DeleteWorker(context.TODO(), worker)
if err != nil {
t.Errorf("unable to delete worker %d: %v", worker.GetID(), err)
}
Expand Down
1 change: 1 addition & 0 deletions database/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ func (e *engine) NewResources(ctx context.Context) error {

// create the database agnostic engine for workers
e.WorkerInterface, err = worker.New(
worker.WithContext(e.ctx),
worker.WithClient(e.client),
worker.WithLogger(e.logger),
worker.WithSkipCreation(e.config.SkipCreation),
Expand Down
4 changes: 3 additions & 1 deletion database/worker/count.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@
package worker

import (
"context"

"github.com/go-vela/types/constants"
)

// CountWorkers gets the count of all workers from the database.
func (e *engine) CountWorkers() (int64, error) {
func (e *engine) CountWorkers(ctx context.Context) (int64, error) {
e.logger.Tracef("getting count of all workers from the database")

// variable to store query results
Expand Down
7 changes: 4 additions & 3 deletions database/worker/count_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package worker

import (
"context"
"reflect"
"testing"

Expand Down Expand Up @@ -37,12 +38,12 @@ func TestWorker_Engine_CountWorkers(t *testing.T) {
_sqlite := testSqlite(t)
defer func() { _sql, _ := _sqlite.client.DB(); _sql.Close() }()

_, err := _sqlite.CreateWorker(_workerOne)
_, err := _sqlite.CreateWorker(context.TODO(), _workerOne)
if err != nil {
t.Errorf("unable to create test worker for sqlite: %v", err)
}

_, err = _sqlite.CreateWorker(_workerTwo)
_, err = _sqlite.CreateWorker(context.TODO(), _workerTwo)
if err != nil {
t.Errorf("unable to create test worker for sqlite: %v", err)
}
Expand Down Expand Up @@ -71,7 +72,7 @@ func TestWorker_Engine_CountWorkers(t *testing.T) {
// run tests
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
got, err := test.database.CountWorkers()
got, err := test.database.CountWorkers(context.TODO())

if test.failure {
if err == nil {
Expand Down
4 changes: 3 additions & 1 deletion database/worker/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,16 @@
package worker

import (
"context"

"github.com/go-vela/types/constants"
"github.com/go-vela/types/database"
"github.com/go-vela/types/library"
"github.com/sirupsen/logrus"
)

// CreateWorker creates a new worker in the database.
func (e *engine) CreateWorker(w *library.Worker) (*library.Worker, error) {
func (e *engine) CreateWorker(ctx context.Context, w *library.Worker) (*library.Worker, error) {
e.logger.WithFields(logrus.Fields{
"worker": w.GetHostname(),
}).Tracef("creating worker %s in the database", w.GetHostname())
Expand Down
3 changes: 2 additions & 1 deletion database/worker/create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package worker

import (
"context"
"reflect"
"testing"

Expand Down Expand Up @@ -56,7 +57,7 @@ VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12) RETURNING "id"`).
// run tests
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
got, err := test.database.CreateWorker(_worker)
got, err := test.database.CreateWorker(context.TODO(), _worker)

if test.failure {
if err == nil {
Expand Down
4 changes: 3 additions & 1 deletion database/worker/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,16 @@
package worker

import (
"context"

"github.com/go-vela/types/constants"
"github.com/go-vela/types/database"
"github.com/go-vela/types/library"
"github.com/sirupsen/logrus"
)

// DeleteWorker deletes an existing worker from the database.
func (e *engine) DeleteWorker(w *library.Worker) error {
func (e *engine) DeleteWorker(ctx context.Context, w *library.Worker) error {
e.logger.WithFields(logrus.Fields{
"worker": w.GetHostname(),
}).Tracef("deleting worker %s from the database", w.GetHostname())
Expand Down
5 changes: 3 additions & 2 deletions database/worker/delete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package worker

import (
"context"
"testing"

"github.com/DATA-DOG/go-sqlmock"
Expand All @@ -29,7 +30,7 @@ func TestWorker_Engine_DeleteWorker(t *testing.T) {
_sqlite := testSqlite(t)
defer func() { _sql, _ := _sqlite.client.DB(); _sql.Close() }()

_, err := _sqlite.CreateWorker(_worker)
_, err := _sqlite.CreateWorker(context.TODO(), _worker)
if err != nil {
t.Errorf("unable to create test worker for sqlite: %v", err)
}
Expand All @@ -55,7 +56,7 @@ func TestWorker_Engine_DeleteWorker(t *testing.T) {
// run tests
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
err = test.database.DeleteWorker(_worker)
err = test.database.DeleteWorker(context.TODO(), _worker)

if test.failure {
if err == nil {
Expand Down
4 changes: 3 additions & 1 deletion database/worker/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@
package worker

import (
"context"

"github.com/go-vela/types/constants"
"github.com/go-vela/types/database"
"github.com/go-vela/types/library"
)

// GetWorker gets a worker by ID from the database.
func (e *engine) GetWorker(id int64) (*library.Worker, error) {
func (e *engine) GetWorker(ctx context.Context, id int64) (*library.Worker, error) {
e.logger.Tracef("getting worker %d from the database", id)

// variable to store query results
Expand Down
4 changes: 3 additions & 1 deletion database/worker/get_hostname.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,16 @@
package worker

import (
"context"

"github.com/go-vela/types/constants"
"github.com/go-vela/types/database"
"github.com/go-vela/types/library"
"github.com/sirupsen/logrus"
)

// GetWorkerForHostname gets a worker by hostname from the database.
func (e *engine) GetWorkerForHostname(hostname string) (*library.Worker, error) {
func (e *engine) GetWorkerForHostname(ctx context.Context, hostname string) (*library.Worker, error) {
e.logger.WithFields(logrus.Fields{
"worker": hostname,
}).Tracef("getting worker %s from the database", hostname)
Expand Down
5 changes: 3 additions & 2 deletions database/worker/get_hostname_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package worker

import (
"context"
"reflect"
"testing"

Expand Down Expand Up @@ -34,7 +35,7 @@ func TestWorker_Engine_GetWorkerForName(t *testing.T) {
_sqlite := testSqlite(t)
defer func() { _sql, _ := _sqlite.client.DB(); _sql.Close() }()

_, err := _sqlite.CreateWorker(_worker)
_, err := _sqlite.CreateWorker(context.TODO(), _worker)
if err != nil {
t.Errorf("unable to create test worker for sqlite: %v", err)
}
Expand Down Expand Up @@ -63,7 +64,7 @@ func TestWorker_Engine_GetWorkerForName(t *testing.T) {
// run tests
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
got, err := test.database.GetWorkerForHostname("worker_0")
got, err := test.database.GetWorkerForHostname(context.TODO(), "worker_0")

if test.failure {
if err == nil {
Expand Down
5 changes: 3 additions & 2 deletions database/worker/get_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package worker

import (
"context"
"reflect"
"testing"

Expand Down Expand Up @@ -34,7 +35,7 @@ func TestWorker_Engine_GetWorker(t *testing.T) {
_sqlite := testSqlite(t)
defer func() { _sql, _ := _sqlite.client.DB(); _sql.Close() }()

_, err := _sqlite.CreateWorker(_worker)
_, err := _sqlite.CreateWorker(context.TODO(), _worker)
if err != nil {
t.Errorf("unable to create test worker for sqlite: %v", err)
}
Expand Down Expand Up @@ -63,7 +64,7 @@ func TestWorker_Engine_GetWorker(t *testing.T) {
// run tests
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
got, err := test.database.GetWorker(1)
got, err := test.database.GetWorker(context.TODO(), 1)

if test.failure {
if err == nil {
Expand Down
Loading

0 comments on commit 38fae72

Please sign in to comment.