From eaa21c3783f06342b1da0e1141a63c835b239fe9 Mon Sep 17 00:00:00 2001 From: huabing zhao Date: Wed, 13 Sep 2023 14:17:45 +0800 Subject: [PATCH] purge response channel before processing a request to avoid deadlock Signed-off-by: huabing zhao --- pkg/server/delta/v3/server.go | 60 ++++++++++++++++++++++------------ pkg/server/delta/v3/watches.go | 9 +++-- 2 files changed, 46 insertions(+), 23 deletions(-) diff --git a/pkg/server/delta/v3/server.go b/pkg/server/delta/v3/server.go index 74d8af57b4..c2369416d0 100644 --- a/pkg/server/delta/v3/server.go +++ b/pkg/server/delta/v3/server.go @@ -103,6 +103,42 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh chan *discovery.Delt return response.Nonce, str.Send(response) } + process := func(resp cache.DeltaResponse) error { + typ := resp.GetDeltaRequest().GetTypeUrl() + if resp == deltaErrorResponse { + return status.Errorf(codes.Unavailable, typ+" watch failed") + } + + nonce, err := send(resp) + if err != nil { + return err + } + + watch := watches.deltaWatches[typ] + watch.nonce = nonce + + watch.state.SetResourceVersions(resp.GetNextVersionMap()) + watches.deltaWatches[typ] = watch + return nil + } + + processAll := func() error { + for { + select { + // We watch the multiplexed ADS channel for incoming responses. + case resp, more := <-watches.deltaMuxedResponses: + if !more { + break + } + if err := process(resp); err != nil { + return err + } + default: + return nil + } + } + } + if s.callbacks != nil { if err := s.callbacks.OnDeltaStreamOpen(str.Context(), streamID, defaultTypeURL); err != nil { return err @@ -118,21 +154,9 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh chan *discovery.Delt break } - typ := resp.GetDeltaRequest().GetTypeUrl() - if resp == deltaErrorResponse { - return status.Errorf(codes.Unavailable, typ+" watch failed") - } - - nonce, err := send(resp) - if err != nil { + if err := process(resp); err != nil { return err } - - watch := watches.deltaWatches[typ] - watch.nonce = nonce - - watch.state.SetResourceVersions(resp.GetNextVersionMap()) - watches.deltaWatches[typ] = watch case req, more := <-reqCh: // input stream ended or errored out if !more { @@ -143,11 +167,8 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh chan *discovery.Delt } // make sure responses are processed prior to new requests to avoid deadlock - if len(watches.deltaMuxedResponses) > 0 { - go func() { - reqCh <- req - }() - break + if err := processAll(); err != nil { + return err } if s.callbacks != nil { @@ -192,8 +213,7 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh chan *discovery.Delt s.subscribe(req.GetResourceNamesSubscribe(), &watch.state) s.unsubscribe(req.GetResourceNamesUnsubscribe(), &watch.state) - watch.responses = watches.deltaMuxedResponses - watch.cancel = s.cache.CreateDeltaWatch(req, watch.state, watch.responses) + watch.cancel = s.cache.CreateDeltaWatch(req, watch.state, watches.deltaMuxedResponses) watches.deltaWatches[typeURL] = watch } } diff --git a/pkg/server/delta/v3/watches.go b/pkg/server/delta/v3/watches.go index 839d323211..313eb67c30 100644 --- a/pkg/server/delta/v3/watches.go +++ b/pkg/server/delta/v3/watches.go @@ -17,6 +17,10 @@ type watches struct { // newWatches creates and initializes watches. func newWatches() watches { // deltaMuxedResponses needs a buffer to release go-routines populating it + // + // because deltaMuxedResponses can be populated by an update from the cache + // and a request from the client, we need to create the channel with a buffer + // size of 2x the number of types to avoid deadlocks. return watches{ deltaWatches: make(map[string]watch, int(types.UnknownType)), deltaMuxedResponses: make(chan cache.DeltaResponse, int(types.UnknownType)*2), @@ -32,9 +36,8 @@ func (w *watches) Cancel() { // watch contains the necessary modifiables for receiving resource responses type watch struct { - responses chan cache.DeltaResponse - cancel func() - nonce string + cancel func() + nonce string state stream.StreamState }