From 5a946e8723eda9d46850eb51db1a67ef0d7aa2f0 Mon Sep 17 00:00:00 2001 From: Jakub Dyszkiewicz Date: Thu, 26 Sep 2024 13:51:36 +0200 Subject: [PATCH] Option for the delta server to override the channel capacity and avoid deadlocks (#3) * add WithDistinctResourceTypes server option to override the channel capacity when Config consists of the greater number of resource types Signed-off-by: Ilya Lobkov Signed-off-by: Jakub Dyszkiewicz --- build/do_ci.sh | 2 +- pkg/server/config/config.go | 8 ++++++-- pkg/server/delta/v3/server.go | 15 ++++++++++++--- pkg/server/delta/v3/watches.go | 7 +++---- pkg/server/delta/v3/watches_test.go | 4 +++- 5 files changed, 25 insertions(+), 11 deletions(-) diff --git a/build/do_ci.sh b/build/do_ci.sh index dc3badbcac..726499ac1d 100755 --- a/build/do_ci.sh +++ b/build/do_ci.sh @@ -6,7 +6,7 @@ set -x # Needed to avoid issues with go version stamping in CI build git config --global --add safe.directory /go-control-plane -go install golang.org/x/tools/cmd/goimports@latest +go install golang.org/x/tools/cmd/goimports@v0.24.0 cd /go-control-plane diff --git a/pkg/server/config/config.go b/pkg/server/config/config.go index b746acfab9..44d6043010 100644 --- a/pkg/server/config/config.go +++ b/pkg/server/config/config.go @@ -1,15 +1,19 @@ package config +import "github.com/envoyproxy/go-control-plane/pkg/cache/types" + // Opts for individual xDS implementations that can be // utilized through the functional opts pattern. type Opts struct { // If true respond to ADS requests with a guaranteed resource ordering - Ordered bool + Ordered bool + DistinctResourceTypes int } func NewOpts() Opts { return Opts{ - Ordered: false, + Ordered: false, + DistinctResourceTypes: int(types.UnknownType), } } diff --git a/pkg/server/delta/v3/server.go b/pkg/server/delta/v3/server.go index 347735a83c..6eacd5601a 100644 --- a/pkg/server/delta/v3/server.go +++ b/pkg/server/delta/v3/server.go @@ -37,6 +37,14 @@ type Callbacks interface { var deltaErrorResponse = &cache.RawDeltaResponse{} +// WithDistinctResourceTypes overrides the default number of resource types, so that the server can serve Configs with +// more distinct resource types without getting into a deadlock. +func WithDistinctResourceTypes(n int) config.XDSOption { + return func(o *config.Opts) { + o.DistinctResourceTypes = n + } +} + type server struct { cache cache.ConfigWatcher callbacks Callbacks @@ -50,11 +58,12 @@ type server struct { } // NewServer creates a delta xDS specific server which utilizes a ConfigWatcher and delta Callbacks. -func NewServer(ctx context.Context, config cache.ConfigWatcher, callbacks Callbacks, opts ...config.XDSOption) Server { +func NewServer(ctx context.Context, configWatcher cache.ConfigWatcher, callbacks Callbacks, opts ...config.XDSOption) Server { s := &server{ - cache: config, + cache: configWatcher, callbacks: callbacks, ctx: ctx, + opts: config.NewOpts(), } // Parse through our options @@ -72,7 +81,7 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De var streamNonce int64 // a collection of stack allocated watches per request type - watches := newWatches() + watches := newWatches(s.opts.DistinctResourceTypes) node := &core.Node{} diff --git a/pkg/server/delta/v3/watches.go b/pkg/server/delta/v3/watches.go index 63c4c2d38d..1a4c1c54a1 100644 --- a/pkg/server/delta/v3/watches.go +++ b/pkg/server/delta/v3/watches.go @@ -1,7 +1,6 @@ package delta import ( - "github.com/envoyproxy/go-control-plane/pkg/cache/types" "github.com/envoyproxy/go-control-plane/pkg/cache/v3" "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" ) @@ -15,15 +14,15 @@ type watches struct { } // newWatches creates and initializes watches. -func newWatches() watches { +func newWatches(distinctResourceTypes int) watches { // deltaMuxedResponses needs a buffer to release go-routines populating it // // because deltaMuxedResponses can be populated by an update from the cache // and a request from the client, we need to create the channel with a buffer // size of 2x the number of types to avoid deadlocks. return watches{ - deltaWatches: make(map[string]watch, int(types.UnknownType)), - deltaMuxedResponses: make(chan cache.DeltaResponse, int(types.UnknownType)*2), + deltaWatches: make(map[string]watch, distinctResourceTypes), + deltaMuxedResponses: make(chan cache.DeltaResponse, distinctResourceTypes*2), } } diff --git a/pkg/server/delta/v3/watches_test.go b/pkg/server/delta/v3/watches_test.go index cee0985ebd..48aafeca84 100644 --- a/pkg/server/delta/v3/watches_test.go +++ b/pkg/server/delta/v3/watches_test.go @@ -5,11 +5,13 @@ import ( "testing" "github.com/stretchr/testify/assert" + + "github.com/envoyproxy/go-control-plane/pkg/cache/types" ) func TestDeltaWatches(t *testing.T) { t.Run("watches response channels are properly closed when the watches are canceled", func(t *testing.T) { - watches := newWatches() + watches := newWatches(int(types.UnknownType)) cancelCount := 0 // create a few watches, and ensure that the cancel function are called and the channels are closed