diff --git a/pkg/server/delta/v3/server.go b/pkg/server/delta/v3/server.go index 395ff3865c..e6e126c9f4 100644 --- a/pkg/server/delta/v3/server.go +++ b/pkg/server/delta/v3/server.go @@ -37,13 +37,6 @@ type Callbacks interface { var deltaErrorResponse = &cache.RawDeltaResponse{} -// WithOrderedADS enables the internal flag to order responses strictly. -func WithOrderedADS() config.XDSOption { - return func(o *config.Opts) { - o.Ordered = true - } -} - type server struct { cache cache.ConfigWatcher callbacks Callbacks @@ -72,7 +65,7 @@ func NewServer(ctx context.Context, config cache.ConfigWatcher, callbacks Callba return s } -func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.DeltaDiscoveryRequest, defaultTypeURL string) error { +func (s *server) processDelta(str stream.DeltaStream, reqCh chan *discovery.DeltaDiscoveryRequest, defaultTypeURL string) error { streamID := atomic.AddInt64(&s.streamCount, 1) // streamNonce holds a unique nonce for req-resp pairs per xDS stream. @@ -149,6 +142,12 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De return status.Errorf(codes.Unavailable, "empty request") } + // make sure responses are processed prior to new requests to avoid deadlock + if len(watches.deltaMuxedResponses) > 0 { + reqCh <- req + break + } + if s.callbacks != nil { if err := s.callbacks.OnStreamDeltaRequest(streamID, req); err != nil { return err @@ -163,15 +162,11 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De req.Node = node } - ordered := false // type URL is required for ADS but is implicit for any other xDS stream if defaultTypeURL == resource.AnyType { if req.TypeUrl == "" { return status.Errorf(codes.InvalidArgument, "type URL is required for ADS") } - if s.opts.Ordered { - ordered = true - } } else if req.TypeUrl == "" { req.TypeUrl = defaultTypeURL } @@ -195,25 +190,9 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De s.subscribe(req.GetResourceNamesSubscribe(), &watch.state) s.unsubscribe(req.GetResourceNamesUnsubscribe(), &watch.state) - if ordered { - // Use the shared channel to keep the order of responses. - watch.UseSharedResponseChan(watches.deltaMuxedResponses) - } else { - watch.MakeResponseChan() - } + watch.responses = watches.deltaMuxedResponses watch.cancel = s.cache.CreateDeltaWatch(req, watch.state, watch.responses) watches.deltaWatches[typeURL] = watch - - // just handle normal non-ordered responses here - // all ordered responses are sent to the muxedResponses channel directly - if !watch.useSharedChan { - go func() { - resp, more := <-watch.responses - if more { - watches.deltaMuxedResponses <- resp - } - }() - } } } } diff --git a/pkg/server/delta/v3/watches.go b/pkg/server/delta/v3/watches.go index d9df5c32dc..d95bc7dc54 100644 --- a/pkg/server/delta/v3/watches.go +++ b/pkg/server/delta/v3/watches.go @@ -32,32 +32,16 @@ func (w *watches) Cancel() { // watch contains the necessary modifiables for receiving resource responses type watch struct { - responses chan cache.DeltaResponse - useSharedChan bool // is this watch using a shared channel - cancel func() - nonce string + responses chan cache.DeltaResponse + cancel func() + nonce string state stream.StreamState } -func (w *watch) MakeResponseChan() { - w.responses = make(chan cache.DeltaResponse, 1) - w.useSharedChan = false -} - -func (w *watch) UseSharedResponseChan(sharedChan chan cache.DeltaResponse) { - w.responses = sharedChan - w.useSharedChan = true -} - // Cancel calls terminate and cancel func (w *watch) Cancel() { if w.cancel != nil { w.cancel() } - if w.responses != nil && !w.useSharedChan { - // w.responses should never be used by a producer once cancel() has been closed, so we can safely close it here - // This is needed to release resources taken by goroutines watching this channel - close(w.responses) - } } diff --git a/pkg/server/delta/v3/watches_test.go b/pkg/server/delta/v3/watches_test.go index 104b979be3..6113498707 100644 --- a/pkg/server/delta/v3/watches_test.go +++ b/pkg/server/delta/v3/watches_test.go @@ -30,13 +30,5 @@ func TestDeltaWatches(t *testing.T) { watches.Cancel() assert.Equal(t, 3, cancelCount) - for _, channel := range channels { - select { - case _, ok := <-channel: - assert.False(t, ok, "a channel was not closed") - default: - assert.Fail(t, "a channel was not closed") - } - } }) }