diff --git a/api/queue/queue.go b/api/queue/queue.go new file mode 100644 index 000000000..10946e99c --- /dev/null +++ b/api/queue/queue.go @@ -0,0 +1,56 @@ +// Copyright (c) 2023 Target Brands, Inc. All rights reserved. +// +// Use of this source code is governed by the LICENSE file in this repository. + +package queue + +import ( + "net/http" + + "github.com/gin-gonic/gin" + "github.com/go-vela/server/router/middleware/claims" + "github.com/go-vela/types/library" + "github.com/sirupsen/logrus" +) + +// swagger:operation POST /api/v1/queue/info queue Info +// +// Get queue credentials +// +// --- +// produces: +// - application/json +// security: +// - ApiKeyAuth: [] +// responses: +// '200': +// description: Successfully retrieved queue credentials +// schema: +// "$ref": "#/definitions/QueueInfo" +// '401': +// description: Unauthorized +// schema: +// "$ref": "#/definitions/Error" + +// Info represents the API handler to +// retrieve queue credentials as part of worker onboarding. +func Info(c *gin.Context) { + cl := claims.Retrieve(c) + + logrus.WithFields(logrus.Fields{ + "user": cl.Subject, + }).Info("requesting queue credentials with registration token") + + // extract the public key that was packed into gin context + k := c.MustGet("public-key").(string) + + // extract the queue-address that was packed into gin context + a := c.MustGet("queue-address").(string) + + wr := library.QueueInfo{ + QueuePublicKey: &k, + QueueAddress: &a, + } + + c.JSON(http.StatusOK, wr) +} diff --git a/cmd/vela-server/queue.go b/cmd/vela-server/queue.go index edd703e69..da705e267 100644 --- a/cmd/vela-server/queue.go +++ b/cmd/vela-server/queue.go @@ -24,6 +24,7 @@ func setupQueue(c *cli.Context) (queue.Service, error) { Routes: c.StringSlice("queue.routes"), Timeout: c.Duration("queue.pop.timeout"), PrivateKey: c.String("queue.private-key"), + PublicKey: c.String("queue.public-key"), } // setup the queue diff --git a/cmd/vela-server/server.go b/cmd/vela-server/server.go index cb9f0edd1..2e7234373 100644 --- a/cmd/vela-server/server.go +++ b/cmd/vela-server/server.go @@ -99,6 +99,8 @@ func server(c *cli.Context) error { middleware.Secrets(secrets), middleware.Scm(scm), middleware.QueueSigningPrivateKey(c.String("queue.private-key")), + middleware.QueueSigningPublicKey(c.String("queue.public-key")), + middleware.QueueAddress(c.String("queue.addr")), middleware.Allowlist(c.StringSlice("vela-repo-allowlist")), middleware.DefaultBuildLimit(c.Int64("default-build-limit")), middleware.DefaultTimeout(c.Int64("default-build-timeout")), diff --git a/docker-compose.yml b/docker-compose.yml index 6a596e45e..02b749ed4 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -37,10 +37,13 @@ services: VELA_ADDR: 'http://localhost:8080' VELA_WEBUI_ADDR: 'http://localhost:8888' VELA_LOG_LEVEL: trace + # comment the line below to use registration flow VELA_SECRET: 'zB7mrKDTZqNeNTD8z47yG4DHywspAh' + QUEUE_PUBLIC_KEY: 'DXsJkoTSkHlG26d75LyHJG+KQsXPr8VKPpmH/78zmko=' VELA_SERVER_PRIVATE_KEY: 'F534FF2A080E45F38E05DC70752E6787' VELA_USER_REFRESH_TOKEN_DURATION: 90m VELA_USER_ACCESS_TOKEN_DURATION: 60m + VELA_WORKER_AUTH_TOKEN_DURATION: 3m VELA_DISABLE_WEBHOOK_VALIDATION: 'true' VELA_ENABLE_SECURE_COOKIE: 'false' VELA_REPO_ALLOWLIST: '*' @@ -55,6 +58,7 @@ services: - redis - vault + # The `worker` compose service hosts the Vela build daemon. # # This component is used for pulling builds from the FIFO @@ -69,17 +73,17 @@ services: environment: EXECUTOR_DRIVER: linux QUEUE_DRIVER: redis - QUEUE_ADDR: 'redis://redis:6379' - QUEUE_PUBLIC_KEY: 'DXsJkoTSkHlG26d75LyHJG+KQsXPr8VKPpmH/78zmko=' VELA_BUILD_LIMIT: 1 VELA_BUILD_TIMEOUT: 30m VELA_LOG_LEVEL: trace VELA_RUNTIME_DRIVER: docker VELA_RUNTIME_PRIVILEGED_IMAGES: 'target/vela-docker' + VELA_EXECUTOR_ENFORCE_TRUSTED_REPOS: 'true' VELA_SERVER_ADDR: 'http://server:8080' + # comment the 3 lines below to use registration flow VELA_SERVER_SECRET: 'zB7mrKDTZqNeNTD8z47yG4DHywspAh' WORKER_ADDR: 'http://worker:8080' - WORKER_CHECK_IN: 5m + WORKER_CHECK_IN: 2m restart: always ports: - '8081:8080' diff --git a/go.mod b/go.mod index 586f5bedb..624a437c1 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/drone/envsubst v1.0.3 github.com/gin-gonic/gin v1.9.1 github.com/go-playground/assert/v2 v2.2.0 - github.com/go-vela/types v0.20.2-0.20230822144153-14b37585731d + github.com/go-vela/types v0.20.2-0.20230922185343-b83bcddfa60d github.com/golang-jwt/jwt/v5 v5.0.0 github.com/google/go-cmp v0.5.9 github.com/google/go-github/v54 v54.0.0 diff --git a/go.sum b/go.sum index 796ddae86..70264beec 100644 --- a/go.sum +++ b/go.sum @@ -143,8 +143,8 @@ github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91 github.com/go-playground/validator/v10 v10.14.0 h1:vgvQWe3XCz3gIeFDm/HnTIbj6UGmg/+t63MyGU2n5js= github.com/go-playground/validator/v10 v10.14.0/go.mod h1:9iXMNT7sEkjXb0I+enO7QXmzG6QCsPWY4zveKFVRSyU= github.com/go-test/deep v1.0.2 h1:onZX1rnHT3Wv6cqNgYyFOOlgVKJrksuCMCRvJStbMYw= -github.com/go-vela/types v0.20.2-0.20230822144153-14b37585731d h1:ag6trc3Ev+7hzifeWy0M9rHHjrO9nFCYgW8dlKdZ4j4= -github.com/go-vela/types v0.20.2-0.20230822144153-14b37585731d/go.mod h1:AXO4oQSygOBQ02fPapsKjQHkx2aQO3zTu7clpvVbXBY= +github.com/go-vela/types v0.20.2-0.20230922185343-b83bcddfa60d h1:PoGQfHM1Lq3cCttrQ9s5Cp9bHc1xXNWNYLGArPvHqRo= +github.com/go-vela/types v0.20.2-0.20230922185343-b83bcddfa60d/go.mod h1:fVmUP4y7Cw8cG6CBWTdjIdgXNNrpVo25yoE9NmNBdOg= github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU= github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= diff --git a/mock/server/server.go b/mock/server/server.go index 7bc668719..62f8d9302 100644 --- a/mock/server/server.go +++ b/mock/server/server.go @@ -29,7 +29,7 @@ func FakeHandler() http.Handler { e.PUT("/api/v1/admin/service", updateService) e.PUT("/api/v1/admin/step", updateStep) e.PUT("/api/v1/admin/user", updateUser) - e.POST("/api/v1/admin/workers/:worker/register-token", registerToken) + e.POST("/api/v1/admin/workers/:worker/register", registerToken) e.PUT("api/v1/admin/clean", cleanResoures) // mock endpoints for build calls @@ -137,5 +137,8 @@ func FakeHandler() http.Handler { e.POST("/authenticate/token", getAuthenticateFromToken) e.GET("/validate-token", validateToken) + // mock endpoint for queue credentials + e.GET("/api/v1/queue/info", getQueueCreds) + return e } diff --git a/mock/server/worker.go b/mock/server/worker.go index 51aedfa80..a717f2f2b 100644 --- a/mock/server/worker.go +++ b/mock/server/worker.go @@ -75,6 +75,14 @@ const ( RegisterTokenResp = `{ "token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJ3b3JrZXIiLCJpYXQiOjE1MTYyMzkwMjIsInRva2VuX3R5cGUiOiJXb3JrZXJSZWdpc3RlciJ9.gEzKaZB-sDd_gFCVF5uGo2mcf3iy9CrXDTLPZ6PTsTc" }` + + // QueueInfoResp represents a JSON return for an admin requesting a queue registration info. + // + //not actual credentials. + QueueInfoResp = `{ + "queue_public_key": "DXeyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ98zmko=", + "queue_address": "redis://redis:6000" + }` ) // getWorkers returns mock JSON for a http GET. @@ -199,3 +207,25 @@ func registerToken(c *gin.Context) { c.JSON(http.StatusCreated, body) } + +// getQueueCreds returns mock JSON for a http GET. +// +// Pass "" to Authorization header to test receiving a http 401 response. +func getQueueCreds(c *gin.Context) { + token := c.Request.Header.Get("Authorization") + // verify token if empty + if token == "" { + msg := "unable get queue credentials; invalid registration token" + + c.AbortWithStatusJSON(http.StatusUnauthorized, types.Error{Message: &msg}) + + return + } + + data := []byte(QueueInfoResp) + + var body library.QueueInfo + _ = json.Unmarshal(data, &body) + + c.JSON(http.StatusCreated, body) +} diff --git a/queue/queue_test.go b/queue/queue_test.go index 61bcfd527..b7a77e999 100644 --- a/queue/queue_test.go +++ b/queue/queue_test.go @@ -30,10 +30,12 @@ func TestQueue_New(t *testing.T) { { failure: false, setup: &Setup{ - Driver: "redis", - Address: fmt.Sprintf("redis://%s", _redis.Addr()), - Routes: []string{"foo"}, - Cluster: false, + Driver: "redis", + Address: fmt.Sprintf("redis://%s", _redis.Addr()), + Routes: []string{"foo"}, + Cluster: false, + PrivateKey: "bOiFT7Y9e0jpOqaapTa3NzUkAve3VdRvyowgsY/vtlcK5L4RADOh9uTe1UVLdu3l/a0hvhiIkkLidUwVBhASWA==", + PublicKey: "CuS+EQAzofbk3tVFS3bt5f2tIb4YiJJC4nVMFQYQElg=", }, }, { diff --git a/queue/redis/opts.go b/queue/redis/opts.go index 5589eeb92..17030262c 100644 --- a/queue/redis/opts.go +++ b/queue/redis/opts.go @@ -96,6 +96,10 @@ func WithPrivateKey(key string) ClientOpt { c.config.PrivateKey = new([64]byte) copy(c.config.PrivateKey[:], decoded) + if len(*c.config.PrivateKey) != 64 { + return errors.New("no valid queue signing private key provided") + } + if c.config.PrivateKey == nil { return errors.New("unable to copy decoded queue signing private key, copied key is nil") } @@ -132,8 +136,12 @@ func WithPublicKey(key string) ClientOpt { c.config.PublicKey = new([32]byte) copy(c.config.PublicKey[:], decoded) + if len(*c.config.PublicKey) != 32 { + return errors.New("no valid queue public key provided") + } + if c.config.PublicKey == nil { - return errors.New("unable to copy decoded queue signing public key, copied key is nil") + return errors.New("unable to copy decoded queue public key, copied key is nil") } if len(c.config.PublicKey) == 0 { diff --git a/queue/redis/pop.go b/queue/redis/pop.go index 65a043e8b..ac1ee03c5 100644 --- a/queue/redis/pop.go +++ b/queue/redis/pop.go @@ -46,11 +46,6 @@ func (c *client) Pop(ctx context.Context, routes []string) (*types.Item, error) return nil, err } - // this should already be validated on startup - if c.config.PublicKey == nil || len(*c.config.PublicKey) != 32 { - return nil, errors.New("no valid signing public key provided") - } - // extract signed item from pop results signed := []byte(result[1]) diff --git a/queue/redis/push.go b/queue/redis/push.go index 66c8a386a..eb7ae49d8 100644 --- a/queue/redis/push.go +++ b/queue/redis/push.go @@ -27,11 +27,6 @@ func (c *client) Push(ctx context.Context, channel string, item []byte) error { var out []byte - // this should already be validated on startup - if c.config.PrivateKey == nil || len(*c.config.PrivateKey) != 64 { - return errors.New("no valid signing private key provided") - } - c.Logger.Tracef("signing item for queue %s", channel) // sign the item using the private key generated using sign diff --git a/queue/setup.go b/queue/setup.go index ed0f131c8..f4b6bdbc3 100644 --- a/queue/setup.go +++ b/queue/setup.go @@ -92,6 +92,10 @@ func (s *Setup) Validate() error { return fmt.Errorf("no queue routes provided") } + if len(s.PublicKey) == 0 { + return fmt.Errorf("no queue public key was provided") + } + // setup is valid return nil } diff --git a/queue/setup_test.go b/queue/setup_test.go index 4b2e35ede..5be5a925d 100644 --- a/queue/setup_test.go +++ b/queue/setup_test.go @@ -23,10 +23,11 @@ func TestQueue_Setup_Redis(t *testing.T) { defer _redis.Close() _setup := &Setup{ - Driver: "redis", - Address: fmt.Sprintf("redis://%s", _redis.Addr()), - Routes: []string{"foo"}, - Cluster: false, + Driver: "redis", + Address: fmt.Sprintf("redis://%s", _redis.Addr()), + Routes: []string{"foo"}, + Cluster: false, + PublicKey: "CuS+EQAzofbk3tVFS3bt5f2tIb4YiJJC4nVMFQYQElg=", } _, err = _setup.Redis() @@ -63,19 +64,21 @@ func TestQueue_Setup_Validate(t *testing.T) { { failure: false, setup: &Setup{ - Driver: "redis", - Address: "redis://redis.example.com", - Routes: []string{"foo"}, - Cluster: false, + Driver: "redis", + Address: "redis://redis.example.com", + Routes: []string{"foo"}, + Cluster: false, + PublicKey: "CuS+EQAzofbk3tVFS3bt5f2tIb4YiJJC4nVMFQYQElg=", }, }, { failure: false, setup: &Setup{ - Driver: "kafka", - Address: "kafka://kafka.example.com", - Routes: []string{"foo"}, - Cluster: false, + Driver: "kafka", + Address: "kafka://kafka.example.com", + Routes: []string{"foo"}, + Cluster: false, + PublicKey: "CuS+EQAzofbk3tVFS3bt5f2tIb4YiJJC4nVMFQYQElg=", }, }, { diff --git a/router/admin.go b/router/admin.go index 96764617f..269df46ae 100644 --- a/router/admin.go +++ b/router/admin.go @@ -23,7 +23,8 @@ import ( // PUT /api/v1/admin/secret // PUT /api/v1/admin/service // PUT /api/v1/admin/step -// PUT /api/v1/admin/user. +// PUT /api/v1/admin/user +// POST /api/v1/admin/workers/:worker/register. func AdminHandlers(base *gin.RouterGroup) { // Admin endpoints _admin := base.Group("/admin", perm.MustPlatformAdmin()) @@ -59,6 +60,6 @@ func AdminHandlers(base *gin.RouterGroup) { _admin.PUT("/user", admin.UpdateUser) // Admin worker endpoint - _admin.POST("/workers/:worker/register-token", admin.RegisterToken) + _admin.POST("/workers/:worker/register", admin.RegisterToken) } // end of admin endpoints } diff --git a/router/middleware/signing.go b/router/middleware/signing.go index 05c63b8e5..89db89d7e 100644 --- a/router/middleware/signing.go +++ b/router/middleware/signing.go @@ -16,3 +16,21 @@ func QueueSigningPrivateKey(key string) gin.HandlerFunc { c.Next() } } + +// QueueSigningPublicKey is a middleware function that attaches the public key used +// to open signed items that are pushed to the queue. +func QueueSigningPublicKey(key string) gin.HandlerFunc { + return func(c *gin.Context) { + c.Set("public-key", key) + c.Next() + } +} + +// QueueAddress is a middleware function that attaches the queue address used +// to open the connection to the queue. +func QueueAddress(address string) gin.HandlerFunc { + return func(c *gin.Context) { + c.Set("queue-address", address) + c.Next() + } +} diff --git a/router/middleware/signing_test.go b/router/middleware/signing_test.go new file mode 100644 index 000000000..908c0908b --- /dev/null +++ b/router/middleware/signing_test.go @@ -0,0 +1,110 @@ +// Copyright (c) 2023 Target Brands, Inc. All rights reserved. +// +// Use of this source code is governed by the LICENSE file in this repository. + +package middleware + +import ( + "net/http" + "net/http/httptest" + "reflect" + "testing" + + "github.com/gin-gonic/gin" +) + +func TestMiddleware_QueueSigningPrivateKey(t *testing.T) { + // setup types + got := "" + want := "foobar" + + // setup context + gin.SetMode(gin.TestMode) + + resp := httptest.NewRecorder() + context, engine := gin.CreateTestContext(resp) + context.Request, _ = http.NewRequest(http.MethodGet, "/health", nil) + + // setup mock server + engine.Use(QueueSigningPrivateKey(want)) + engine.GET("/health", func(c *gin.Context) { + got = c.Value("queue.private-key").(string) + + c.Status(http.StatusOK) + }) + + // run test + engine.ServeHTTP(context.Writer, context.Request) + + if resp.Code != http.StatusOK { + t.Errorf("QueueSigningPrivateKey returned %v, want %v", resp.Code, http.StatusOK) + } + + if !reflect.DeepEqual(got, want) { + t.Errorf("QueueSigningPrivateKey is %v, want %v", got, want) + } +} + +func TestMiddleware_QueueSigningPublicKey(t *testing.T) { + // setup types + got := "" + want := "foobar" + + // setup context + gin.SetMode(gin.TestMode) + + resp := httptest.NewRecorder() + context, engine := gin.CreateTestContext(resp) + context.Request, _ = http.NewRequest(http.MethodGet, "/health", nil) + + // setup mock server + engine.Use(QueueSigningPublicKey(want)) + engine.GET("/health", func(c *gin.Context) { + got = c.Value("public-key").(string) + + c.Status(http.StatusOK) + }) + + // run test + engine.ServeHTTP(context.Writer, context.Request) + + if resp.Code != http.StatusOK { + t.Errorf("QueueSigningPublicKey returned %v, want %v", resp.Code, http.StatusOK) + } + + if !reflect.DeepEqual(got, want) { + t.Errorf("QueueSigningPublicKey is %v, want %v", got, want) + } +} + +func TestMiddleware_QueueAddress(t *testing.T) { + // setup types + got := "" + want := "foobar" + + // setup context + gin.SetMode(gin.TestMode) + + resp := httptest.NewRecorder() + context, engine := gin.CreateTestContext(resp) + context.Request, _ = http.NewRequest(http.MethodGet, "/health", nil) + + // setup mock server + engine.Use(QueueAddress(want)) + engine.GET("/health", func(c *gin.Context) { + got = c.Value("queue-address").(string) + + c.Status(http.StatusOK) + }) + + // run test + engine.ServeHTTP(context.Writer, context.Request) + + if resp.Code != http.StatusOK { + t.Errorf("QueueAddress returned %v, want %v", resp.Code, http.StatusOK) + } + + if !reflect.DeepEqual(got, want) { + t.Errorf("QueueAddress is %v, want %v", got, want) + } +} diff --git a/router/queue.go b/router/queue.go new file mode 100644 index 000000000..d3b311b13 --- /dev/null +++ b/router/queue.go @@ -0,0 +1,23 @@ +// Copyright (c) 2023 Target Brands, Inc. All rights reserved. +// +// Use of this source code is governed by the LICENSE file in this repository. + +package router + +import ( + "github.com/gin-gonic/gin" + "github.com/go-vela/server/api/queue" + "github.com/go-vela/server/router/middleware/perm" +) + +// QueueHandlers is a function that extends the provided base router group +// with the API handlers for queue registration functionality. +// +// POST /api/v1/queue/register. +func QueueHandlers(base *gin.RouterGroup) { + // Queue endpoints + _queue := base.Group("/queue") + { + _queue.GET("/info", perm.MustWorkerRegisterToken(), queue.Info) + } // end of queue endpoints +} diff --git a/router/router.go b/router/router.go index f95c6bf4b..01304d6e2 100644 --- a/router/router.go +++ b/router/router.go @@ -137,6 +137,8 @@ func Load(options ...gin.HandlerFunc) *gin.Engine { // Pipeline endpoints PipelineHandlers(baseAPI) + // Queue endpoints + QueueHandlers(baseAPI) } // end of api return r