Skip to content

Commit

Permalink
feat(queue)!: register endpoint to use QueueInfo type (#957)
Browse files Browse the repository at this point in the history
* change register endpoint to return WorkerRegistration as a body

* add comments

* make clean

* add test for signing.go

* rename registerToken to WorkerRegistration

* add mock

* Update router/middleware/signing.go

Co-authored-by: David May <[email protected]>

* Update router/middleware/signing.go

Co-authored-by: David May <[email protected]>

* add validation to setup

* fix setup_test

* fix tests

* queue creds endpoint

* Update router/middleware/signing.go

Co-authored-by: Jacob Floyd <[email protected]>

* cleanup #1

* cleanup

* cleanup

* update types

* make clean

* address feedback

* revert setup validation

* remove type check

* address feedback

* move checks to opts.go

* fix typo

* make clean

* make clean

* make clean

* Update api/queue/queue.go

Co-authored-by: Easton Crupper <[email protected]>

* change post to get

* Update api/queue/queue.go

Co-authored-by: dave vader <[email protected]>

* Update router/admin.go

Co-authored-by: dave vader <[email protected]>

* Update router/queue.go

Co-authored-by: dave vader <[email protected]>

* Update api/queue/queue.go

Co-authored-by: dave vader <[email protected]>

* Update api/queue/queue.go

Co-authored-by: dave vader <[email protected]>

* Update api/queue/queue.go

Co-authored-by: dave vader <[email protected]>

* change to QueueRegistration

* fix typo

* add mock for 401

* Update router/queue.go

Co-authored-by: David May <[email protected]>

* Update queue/redis/opts.go

Co-authored-by: David May <[email protected]>

* Update queue/redis/opts.go

Co-authored-by: David May <[email protected]>

* Update queue/setup.go

Co-authored-by: David May <[email protected]>

* Update queue/redis/opts.go

Co-authored-by: David May <[email protected]>

* add linter ignore

* sort imports

* fix linter

* add queue_key to docker compose

* fix linter

* fix typo

* add schedule allowist

* Update api/queue/queue.go

Co-authored-by: dave vader <[email protected]>

* change to /info

* change to /info

* change to QueueInfo struct

* remove comments from docker compose

* remove comments from docker compose

* change to single quote

---------

Co-authored-by: Easton Crupper <[email protected]>
Co-authored-by: David May <[email protected]>
Co-authored-by: TimHuynh <[email protected]>
Co-authored-by: Jacob Floyd <[email protected]>
Co-authored-by: dave vader <[email protected]>
  • Loading branch information
6 people committed Sep 25, 2023
1 parent 7ee3ae3 commit ab85e96
Show file tree
Hide file tree
Showing 19 changed files with 293 additions and 36 deletions.
56 changes: 56 additions & 0 deletions api/queue/queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright (c) 2023 Target Brands, Inc. All rights reserved.
//
// Use of this source code is governed by the LICENSE file in this repository.

package queue

import (
"net/http"

"github.com/gin-gonic/gin"
"github.com/go-vela/server/router/middleware/claims"
"github.com/go-vela/types/library"
"github.com/sirupsen/logrus"
)

// swagger:operation POST /api/v1/queue/info queue Info
//
// Get queue credentials
//
// ---
// produces:
// - application/json
// security:
// - ApiKeyAuth: []
// responses:
// '200':
// description: Successfully retrieved queue credentials
// schema:
// "$ref": "#/definitions/QueueInfo"
// '401':
// description: Unauthorized
// schema:
// "$ref": "#/definitions/Error"

// Info represents the API handler to
// retrieve queue credentials as part of worker onboarding.
func Info(c *gin.Context) {
cl := claims.Retrieve(c)

logrus.WithFields(logrus.Fields{
"user": cl.Subject,
}).Info("requesting queue credentials with registration token")

// extract the public key that was packed into gin context
k := c.MustGet("public-key").(string)

// extract the queue-address that was packed into gin context
a := c.MustGet("queue-address").(string)

wr := library.QueueInfo{
QueuePublicKey: &k,
QueueAddress: &a,
}

c.JSON(http.StatusOK, wr)
}
1 change: 1 addition & 0 deletions cmd/vela-server/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ func setupQueue(c *cli.Context) (queue.Service, error) {
Routes: c.StringSlice("queue.routes"),
Timeout: c.Duration("queue.pop.timeout"),
PrivateKey: c.String("queue.private-key"),
PublicKey: c.String("queue.public-key"),
}

// setup the queue
Expand Down
2 changes: 2 additions & 0 deletions cmd/vela-server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ func server(c *cli.Context) error {
middleware.Secrets(secrets),
middleware.Scm(scm),
middleware.QueueSigningPrivateKey(c.String("queue.private-key")),
middleware.QueueSigningPublicKey(c.String("queue.public-key")),
middleware.QueueAddress(c.String("queue.addr")),
middleware.Allowlist(c.StringSlice("vela-repo-allowlist")),
middleware.DefaultBuildLimit(c.Int64("default-build-limit")),
middleware.DefaultTimeout(c.Int64("default-build-timeout")),
Expand Down
10 changes: 7 additions & 3 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,13 @@ services:
VELA_ADDR: 'http://localhost:8080'
VELA_WEBUI_ADDR: 'http://localhost:8888'
VELA_LOG_LEVEL: trace
# comment the line below to use registration flow
VELA_SECRET: 'zB7mrKDTZqNeNTD8z47yG4DHywspAh'
QUEUE_PUBLIC_KEY: 'DXsJkoTSkHlG26d75LyHJG+KQsXPr8VKPpmH/78zmko='
VELA_SERVER_PRIVATE_KEY: 'F534FF2A080E45F38E05DC70752E6787'
VELA_USER_REFRESH_TOKEN_DURATION: 90m
VELA_USER_ACCESS_TOKEN_DURATION: 60m
VELA_WORKER_AUTH_TOKEN_DURATION: 3m
VELA_DISABLE_WEBHOOK_VALIDATION: 'true'
VELA_ENABLE_SECURE_COOKIE: 'false'
VELA_REPO_ALLOWLIST: '*'
Expand All @@ -55,6 +58,7 @@ services:
- redis
- vault


# The `worker` compose service hosts the Vela build daemon.
#
# This component is used for pulling builds from the FIFO
Expand All @@ -69,17 +73,17 @@ services:
environment:
EXECUTOR_DRIVER: linux
QUEUE_DRIVER: redis
QUEUE_ADDR: 'redis://redis:6379'
QUEUE_PUBLIC_KEY: 'DXsJkoTSkHlG26d75LyHJG+KQsXPr8VKPpmH/78zmko='
VELA_BUILD_LIMIT: 1
VELA_BUILD_TIMEOUT: 30m
VELA_LOG_LEVEL: trace
VELA_RUNTIME_DRIVER: docker
VELA_RUNTIME_PRIVILEGED_IMAGES: 'target/vela-docker'
VELA_EXECUTOR_ENFORCE_TRUSTED_REPOS: 'true'
VELA_SERVER_ADDR: 'http://server:8080'
# comment the 3 lines below to use registration flow
VELA_SERVER_SECRET: 'zB7mrKDTZqNeNTD8z47yG4DHywspAh'
WORKER_ADDR: 'http://worker:8080'
WORKER_CHECK_IN: 5m
WORKER_CHECK_IN: 2m
restart: always
ports:
- '8081:8080'
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ require (
github.com/drone/envsubst v1.0.3
github.com/gin-gonic/gin v1.9.1
github.com/go-playground/assert/v2 v2.2.0
github.com/go-vela/types v0.20.2-0.20230822144153-14b37585731d
github.com/go-vela/types v0.20.2-0.20230922185343-b83bcddfa60d
github.com/golang-jwt/jwt/v5 v5.0.0
github.com/google/go-cmp v0.5.9
github.com/google/go-github/v54 v54.0.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,8 @@ github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91
github.com/go-playground/validator/v10 v10.14.0 h1:vgvQWe3XCz3gIeFDm/HnTIbj6UGmg/+t63MyGU2n5js=
github.com/go-playground/validator/v10 v10.14.0/go.mod h1:9iXMNT7sEkjXb0I+enO7QXmzG6QCsPWY4zveKFVRSyU=
github.com/go-test/deep v1.0.2 h1:onZX1rnHT3Wv6cqNgYyFOOlgVKJrksuCMCRvJStbMYw=
github.com/go-vela/types v0.20.2-0.20230822144153-14b37585731d h1:ag6trc3Ev+7hzifeWy0M9rHHjrO9nFCYgW8dlKdZ4j4=
github.com/go-vela/types v0.20.2-0.20230822144153-14b37585731d/go.mod h1:AXO4oQSygOBQ02fPapsKjQHkx2aQO3zTu7clpvVbXBY=
github.com/go-vela/types v0.20.2-0.20230922185343-b83bcddfa60d h1:PoGQfHM1Lq3cCttrQ9s5Cp9bHc1xXNWNYLGArPvHqRo=
github.com/go-vela/types v0.20.2-0.20230922185343-b83bcddfa60d/go.mod h1:fVmUP4y7Cw8cG6CBWTdjIdgXNNrpVo25yoE9NmNBdOg=
github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU=
github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
Expand Down
5 changes: 4 additions & 1 deletion mock/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func FakeHandler() http.Handler {
e.PUT("/api/v1/admin/service", updateService)
e.PUT("/api/v1/admin/step", updateStep)
e.PUT("/api/v1/admin/user", updateUser)
e.POST("/api/v1/admin/workers/:worker/register-token", registerToken)
e.POST("/api/v1/admin/workers/:worker/register", registerToken)
e.PUT("api/v1/admin/clean", cleanResoures)

// mock endpoints for build calls
Expand Down Expand Up @@ -137,5 +137,8 @@ func FakeHandler() http.Handler {
e.POST("/authenticate/token", getAuthenticateFromToken)
e.GET("/validate-token", validateToken)

// mock endpoint for queue credentials
e.GET("/api/v1/queue/info", getQueueCreds)

return e
}
30 changes: 30 additions & 0 deletions mock/server/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,14 @@ const (
RegisterTokenResp = `{
"token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJ3b3JrZXIiLCJpYXQiOjE1MTYyMzkwMjIsInRva2VuX3R5cGUiOiJXb3JrZXJSZWdpc3RlciJ9.gEzKaZB-sDd_gFCVF5uGo2mcf3iy9CrXDTLPZ6PTsTc"
}`

// QueueInfoResp represents a JSON return for an admin requesting a queue registration info.
//
//not actual credentials.
QueueInfoResp = `{
"queue_public_key": "DXeyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ98zmko=",
"queue_address": "redis://redis:6000"
}`
)

// getWorkers returns mock JSON for a http GET.
Expand Down Expand Up @@ -199,3 +207,25 @@ func registerToken(c *gin.Context) {

c.JSON(http.StatusCreated, body)
}

// getQueueCreds returns mock JSON for a http GET.
//
// Pass "" to Authorization header to test receiving a http 401 response.
func getQueueCreds(c *gin.Context) {
token := c.Request.Header.Get("Authorization")
// verify token if empty
if token == "" {
msg := "unable get queue credentials; invalid registration token"

c.AbortWithStatusJSON(http.StatusUnauthorized, types.Error{Message: &msg})

return
}

data := []byte(QueueInfoResp)

var body library.QueueInfo
_ = json.Unmarshal(data, &body)

c.JSON(http.StatusCreated, body)
}
10 changes: 6 additions & 4 deletions queue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@ func TestQueue_New(t *testing.T) {
{
failure: false,
setup: &Setup{
Driver: "redis",
Address: fmt.Sprintf("redis://%s", _redis.Addr()),
Routes: []string{"foo"},
Cluster: false,
Driver: "redis",
Address: fmt.Sprintf("redis://%s", _redis.Addr()),
Routes: []string{"foo"},
Cluster: false,
PrivateKey: "bOiFT7Y9e0jpOqaapTa3NzUkAve3VdRvyowgsY/vtlcK5L4RADOh9uTe1UVLdu3l/a0hvhiIkkLidUwVBhASWA==",
PublicKey: "CuS+EQAzofbk3tVFS3bt5f2tIb4YiJJC4nVMFQYQElg=",
},
},
{
Expand Down
10 changes: 9 additions & 1 deletion queue/redis/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ func WithPrivateKey(key string) ClientOpt {
c.config.PrivateKey = new([64]byte)
copy(c.config.PrivateKey[:], decoded)

if len(*c.config.PrivateKey) != 64 {
return errors.New("no valid queue signing private key provided")
}

if c.config.PrivateKey == nil {
return errors.New("unable to copy decoded queue signing private key, copied key is nil")
}
Expand Down Expand Up @@ -132,8 +136,12 @@ func WithPublicKey(key string) ClientOpt {
c.config.PublicKey = new([32]byte)
copy(c.config.PublicKey[:], decoded)

if len(*c.config.PublicKey) != 32 {
return errors.New("no valid queue public key provided")
}

if c.config.PublicKey == nil {
return errors.New("unable to copy decoded queue signing public key, copied key is nil")
return errors.New("unable to copy decoded queue public key, copied key is nil")
}

if len(c.config.PublicKey) == 0 {
Expand Down
5 changes: 0 additions & 5 deletions queue/redis/pop.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,6 @@ func (c *client) Pop(ctx context.Context, routes []string) (*types.Item, error)
return nil, err
}

// this should already be validated on startup
if c.config.PublicKey == nil || len(*c.config.PublicKey) != 32 {
return nil, errors.New("no valid signing public key provided")
}

// extract signed item from pop results
signed := []byte(result[1])

Expand Down
5 changes: 0 additions & 5 deletions queue/redis/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,6 @@ func (c *client) Push(ctx context.Context, channel string, item []byte) error {

var out []byte

// this should already be validated on startup
if c.config.PrivateKey == nil || len(*c.config.PrivateKey) != 64 {
return errors.New("no valid signing private key provided")
}

c.Logger.Tracef("signing item for queue %s", channel)

// sign the item using the private key generated using sign
Expand Down
4 changes: 4 additions & 0 deletions queue/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ func (s *Setup) Validate() error {
return fmt.Errorf("no queue routes provided")
}

if len(s.PublicKey) == 0 {
return fmt.Errorf("no queue public key was provided")
}

// setup is valid
return nil
}
27 changes: 15 additions & 12 deletions queue/setup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@ func TestQueue_Setup_Redis(t *testing.T) {
defer _redis.Close()

_setup := &Setup{
Driver: "redis",
Address: fmt.Sprintf("redis://%s", _redis.Addr()),
Routes: []string{"foo"},
Cluster: false,
Driver: "redis",
Address: fmt.Sprintf("redis://%s", _redis.Addr()),
Routes: []string{"foo"},
Cluster: false,
PublicKey: "CuS+EQAzofbk3tVFS3bt5f2tIb4YiJJC4nVMFQYQElg=",
}

_, err = _setup.Redis()
Expand Down Expand Up @@ -63,19 +64,21 @@ func TestQueue_Setup_Validate(t *testing.T) {
{
failure: false,
setup: &Setup{
Driver: "redis",
Address: "redis://redis.example.com",
Routes: []string{"foo"},
Cluster: false,
Driver: "redis",
Address: "redis://redis.example.com",
Routes: []string{"foo"},
Cluster: false,
PublicKey: "CuS+EQAzofbk3tVFS3bt5f2tIb4YiJJC4nVMFQYQElg=",
},
},
{
failure: false,
setup: &Setup{
Driver: "kafka",
Address: "kafka://kafka.example.com",
Routes: []string{"foo"},
Cluster: false,
Driver: "kafka",
Address: "kafka://kafka.example.com",
Routes: []string{"foo"},
Cluster: false,
PublicKey: "CuS+EQAzofbk3tVFS3bt5f2tIb4YiJJC4nVMFQYQElg=",
},
},
{
Expand Down
5 changes: 3 additions & 2 deletions router/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ import (
// PUT /api/v1/admin/secret
// PUT /api/v1/admin/service
// PUT /api/v1/admin/step
// PUT /api/v1/admin/user.
// PUT /api/v1/admin/user
// POST /api/v1/admin/workers/:worker/register.
func AdminHandlers(base *gin.RouterGroup) {
// Admin endpoints
_admin := base.Group("/admin", perm.MustPlatformAdmin())
Expand Down Expand Up @@ -59,6 +60,6 @@ func AdminHandlers(base *gin.RouterGroup) {
_admin.PUT("/user", admin.UpdateUser)

// Admin worker endpoint
_admin.POST("/workers/:worker/register-token", admin.RegisterToken)
_admin.POST("/workers/:worker/register", admin.RegisterToken)
} // end of admin endpoints
}
18 changes: 18 additions & 0 deletions router/middleware/signing.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,21 @@ func QueueSigningPrivateKey(key string) gin.HandlerFunc {
c.Next()
}
}

// QueueSigningPublicKey is a middleware function that attaches the public key used
// to open signed items that are pushed to the queue.
func QueueSigningPublicKey(key string) gin.HandlerFunc {
return func(c *gin.Context) {
c.Set("public-key", key)
c.Next()
}
}

// QueueAddress is a middleware function that attaches the queue address used
// to open the connection to the queue.
func QueueAddress(address string) gin.HandlerFunc {
return func(c *gin.Context) {
c.Set("queue-address", address)
c.Next()
}
}
Loading

0 comments on commit ab85e96

Please sign in to comment.