Skip to content

Commit

Permalink
make sure responses are processed prior to new requests to avoid dead…
Browse files Browse the repository at this point in the history
…lock

Signed-off-by: huabing zhao <[email protected]>
  • Loading branch information
zhaohuabing committed Sep 8, 2023
1 parent e235a44 commit 9c9c4b6
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 56 deletions.
37 changes: 8 additions & 29 deletions pkg/server/delta/v3/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,6 @@ type Callbacks interface {

var deltaErrorResponse = &cache.RawDeltaResponse{}

// WithOrderedADS enables the internal flag to order responses strictly.
func WithOrderedADS() config.XDSOption {
return func(o *config.Opts) {
o.Ordered = true
}
}

type server struct {
cache cache.ConfigWatcher
callbacks Callbacks
Expand Down Expand Up @@ -72,7 +65,7 @@ func NewServer(ctx context.Context, config cache.ConfigWatcher, callbacks Callba
return s
}

func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.DeltaDiscoveryRequest, defaultTypeURL string) error {
func (s *server) processDelta(str stream.DeltaStream, reqCh chan *discovery.DeltaDiscoveryRequest, defaultTypeURL string) error {
streamID := atomic.AddInt64(&s.streamCount, 1)

// streamNonce holds a unique nonce for req-resp pairs per xDS stream.
Expand Down Expand Up @@ -149,6 +142,12 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De
return status.Errorf(codes.Unavailable, "empty request")
}

// make sure responses are processed prior to new requests to avoid deadlock
if len(watches.deltaMuxedResponses) > 0 {
reqCh <- req
break
}

if s.callbacks != nil {
if err := s.callbacks.OnStreamDeltaRequest(streamID, req); err != nil {
return err
Expand All @@ -163,15 +162,11 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De
req.Node = node
}

ordered := false
// type URL is required for ADS but is implicit for any other xDS stream
if defaultTypeURL == resource.AnyType {
if req.TypeUrl == "" {
return status.Errorf(codes.InvalidArgument, "type URL is required for ADS")
}
if s.opts.Ordered {
ordered = true
}
} else if req.TypeUrl == "" {
req.TypeUrl = defaultTypeURL
}
Expand All @@ -195,25 +190,9 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De
s.subscribe(req.GetResourceNamesSubscribe(), &watch.state)
s.unsubscribe(req.GetResourceNamesUnsubscribe(), &watch.state)

if ordered {
// Use the shared channel to keep the order of responses.
watch.UseSharedResponseChan(watches.deltaMuxedResponses)
} else {
watch.MakeResponseChan()
}
watch.responses = watches.deltaMuxedResponses
watch.cancel = s.cache.CreateDeltaWatch(req, watch.state, watch.responses)
watches.deltaWatches[typeURL] = watch

// just handle normal non-ordered responses here
// all ordered responses are sent to the muxedResponses channel directly
if !watch.useSharedChan {
go func() {
resp, more := <-watch.responses
if more {
watches.deltaMuxedResponses <- resp
}
}()
}
}
}
}
Expand Down
22 changes: 3 additions & 19 deletions pkg/server/delta/v3/watches.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,32 +32,16 @@ func (w *watches) Cancel() {

// watch contains the necessary modifiables for receiving resource responses
type watch struct {
responses chan cache.DeltaResponse
useSharedChan bool // is this watch using a shared channel
cancel func()
nonce string
responses chan cache.DeltaResponse
cancel func()
nonce string

state stream.StreamState
}

func (w *watch) MakeResponseChan() {
w.responses = make(chan cache.DeltaResponse, 1)
w.useSharedChan = false
}

func (w *watch) UseSharedResponseChan(sharedChan chan cache.DeltaResponse) {
w.responses = sharedChan
w.useSharedChan = true
}

// Cancel calls terminate and cancel
func (w *watch) Cancel() {
if w.cancel != nil {
w.cancel()
}
if w.responses != nil && !w.useSharedChan {
// w.responses should never be used by a producer once cancel() has been closed, so we can safely close it here
// This is needed to release resources taken by goroutines watching this channel
close(w.responses)
}
}
8 changes: 0 additions & 8 deletions pkg/server/delta/v3/watches_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,5 @@ func TestDeltaWatches(t *testing.T) {
watches.Cancel()

assert.Equal(t, 3, cancelCount)
for _, channel := range channels {
select {
case _, ok := <-channel:
assert.False(t, ok, "a channel was not closed")
default:
assert.Fail(t, "a channel was not closed")
}
}
})
}

0 comments on commit 9c9c4b6

Please sign in to comment.