Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable ordered responses for ADS delta watches #752

Merged
merged 17 commits into from
Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 78 additions & 38 deletions pkg/cache/v3/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,54 +233,95 @@ func (cache *snapshotCache) SetSnapshot(ctx context.Context, node string, snapsh
info.mu.Lock()
defer info.mu.Unlock()

// responder callback for SOTW watches
respond := func(watch ResponseWatch, id int64) error {
version := snapshot.GetVersion(watch.Request.TypeUrl)
if version != watch.Request.VersionInfo {
cache.log.Debugf("respond open watch %d %s%v with new version %q", id, watch.Request.TypeUrl, watch.Request.ResourceNames, version)
resources := snapshot.GetResourcesAndTTL(watch.Request.TypeUrl)
err := cache.respond(ctx, watch.Request, watch.Response, resources, version, false)
if err != nil {
return err
}
// discard the watch
delete(info.watches, id)
// Respond to SOTW watches for the node.
if err := cache.respondSOTWWatches(ctx, info, snapshot); err != nil {
return err
}

// Respond to delta watches for the node.
return cache.respondDeltaWatches(ctx, info, snapshot)
}

return nil
}

func (cache *snapshotCache) respondSOTWWatches(ctx context.Context, info *statusInfo, snapshot ResourceSnapshot) error {
// responder callback for SOTW watches
respond := func(watch ResponseWatch, id int64) error {
version := snapshot.GetVersion(watch.Request.TypeUrl)
if version != watch.Request.VersionInfo {
cache.log.Debugf("respond open watch %d %s%v with new version %q", id, watch.Request.TypeUrl, watch.Request.ResourceNames, version)
resources := snapshot.GetResourcesAndTTL(watch.Request.TypeUrl)
err := cache.respond(ctx, watch.Request, watch.Response, resources, version, false)
if err != nil {
return err
}
return nil
// discard the watch
delete(info.watches, id)
}
return nil
}

// If ADS is enabled we need to order response watches so we guarantee
// sending them in the correct order. Go's default implementation
// of maps are randomized order when ranged over.
if cache.ads {
info.orderResponseWatches()
for _, key := range info.orderedWatches {
err := respond(info.watches[key.ID], key.ID)
if err != nil {
return err
}
// If ADS is enabled we need to order response watches so we guarantee
// sending them in the correct order. Go's default implementation
// of maps are randomized order when ranged over.
if cache.ads {
info.orderResponseWatches()
for _, key := range info.orderedWatches {
err := respond(info.watches[key.ID], key.ID)
if err != nil {
return err
}
} else {
for id, watch := range info.watches {
err := respond(watch, id)
if err != nil {
return err
}
}
} else {
for id, watch := range info.watches {
err := respond(watch, id)
if err != nil {
return err
}
}
}

return nil
}

func (cache *snapshotCache) respondDeltaWatches(ctx context.Context, info *statusInfo, snapshot ResourceSnapshot) error {
// We only calculate version hashes when using delta. We don't
// want to do this when using SOTW so we can avoid unnecessary
// computational cost if not using delta.
if len(info.deltaWatches) == 0 {
return nil
}

err := snapshot.ConstructVersionMap()
if err != nil {
return err
}

// We only calculate version hashes when using delta. We don't
// want to do this when using SOTW so we can avoid unnecessary
// computational cost if not using delta.
if len(info.deltaWatches) > 0 {
err := snapshot.ConstructVersionMap()
// If ADS is enabled we need to order response delta watches so we guarantee
zhaohuabing marked this conversation as resolved.
Show resolved Hide resolved
// sending them in the correct order. Go's default implementation
// of maps are randomized order when ranged over.
if cache.ads {
info.orderResponseDeltaWatches()
for _, key := range info.orderedDeltaWatches {
watch := info.deltaWatches[key.ID]
res, err := cache.respondDelta(
ctx,
snapshot,
watch.Request,
watch.Response,
watch.StreamState,
)
if err != nil {
return err
}
// If we detect a nil response here, that means there has been no state change
// so we don't want to respond or remove any existing resource watches
if res != nil {
delete(info.deltaWatches, key.ID)
}
}

// this won't run if there are no delta watches
// to process.
} else {
for id, watch := range info.deltaWatches {
res, err := cache.respondDelta(
ctx,
Expand All @@ -299,7 +340,6 @@ func (cache *snapshotCache) SetSnapshot(ctx context.Context, node string, snapsh
}
}
}

return nil
}

Expand Down
22 changes: 21 additions & 1 deletion pkg/cache/v3/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ type statusInfo struct {
orderedWatches keys

// deltaWatches are indexed channels for the delta response watches and the original requests
deltaWatches map[int64]DeltaResponseWatch
deltaWatches map[int64]DeltaResponseWatch
orderedDeltaWatches keys

// the timestamp of the last watch request
lastWatchRequestTime time.Time
Expand Down Expand Up @@ -177,3 +178,22 @@ func (info *statusInfo) orderResponseWatches() {
// This is only run when we enable ADS on the cache.
sort.Sort(info.orderedWatches)
}

// orderResponseDeltaWatches will track a list of delta watch keys and order them if
// true is passed.
func (info *statusInfo) orderResponseDeltaWatches() {
info.orderedDeltaWatches = make(keys, len(info.deltaWatches))

var index int
for id, deltaWatch := range info.deltaWatches {
info.orderedDeltaWatches[index] = key{
ID: id,
TypeURL: deltaWatch.Request.TypeUrl,
}
index++
}

// Sort our list which we can use in the SetSnapshot functions.
// This is only run when we enable ADS on the cache.
sort.Sort(info.orderedDeltaWatches)
}
72 changes: 49 additions & 23 deletions pkg/server/delta/v3/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De
}
}()

// Sends a response, returns the new stream nonce
// sends a response, returns the new stream nonce
send := func(resp cache.DeltaResponse) (string, error) {
if resp == nil {
return "", errors.New("missing response")
Expand All @@ -103,6 +103,44 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De
return response.Nonce, str.Send(response)
}

// process a single delta response
process := func(resp cache.DeltaResponse) error {
typ := resp.GetDeltaRequest().GetTypeUrl()
if resp == deltaErrorResponse {
return status.Errorf(codes.Unavailable, typ+" watch failed")
}

nonce, err := send(resp)
if err != nil {
return err
}

watch := watches.deltaWatches[typ]
watch.nonce = nonce

watch.state.SetResourceVersions(resp.GetNextVersionMap())
watches.deltaWatches[typ] = watch
return nil
}

// processAll purges the deltaMuxedResponses channel
processAll := func() error {
for {
select {
// We watch the multiplexed channel for incoming responses.
case resp, more := <-watches.deltaMuxedResponses:
if !more {
break
}
if err := process(resp); err != nil {
return err
}
default:
return nil
}
}
}

if s.callbacks != nil {
if err := s.callbacks.OnDeltaStreamOpen(str.Context(), streamID, defaultTypeURL); err != nil {
return err
Expand All @@ -113,35 +151,31 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De
select {
case <-s.ctx.Done():
return nil
// We watch the multiplexed channel for incoming responses.
case resp, more := <-watches.deltaMuxedResponses:
// input stream ended or errored out
if !more {
break
}

typ := resp.GetDeltaRequest().GetTypeUrl()
if resp == deltaErrorResponse {
return status.Errorf(codes.Unavailable, typ+" watch failed")
}

nonce, err := send(resp)
if err != nil {
if err := process(resp); err != nil {
return err
}

watch := watches.deltaWatches[typ]
watch.nonce = nonce

watch.state.SetResourceVersions(resp.GetNextVersionMap())
watches.deltaWatches[typ] = watch
case req, more := <-reqCh:
// input stream ended or errored out
if !more {
return nil
}

if req == nil {
return status.Errorf(codes.Unavailable, "empty request")
}

// make sure all existing responses are processed prior to new requests to avoid deadlock
if err := processAll(); err != nil {
return err
}

if s.callbacks != nil {
if err := s.callbacks.OnStreamDeltaRequest(streamID, req); err != nil {
return err
Expand Down Expand Up @@ -184,16 +218,8 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De
s.subscribe(req.GetResourceNamesSubscribe(), &watch.state)
s.unsubscribe(req.GetResourceNamesUnsubscribe(), &watch.state)

watch.responses = make(chan cache.DeltaResponse, 1)
watch.cancel = s.cache.CreateDeltaWatch(req, watch.state, watch.responses)
watch.cancel = s.cache.CreateDeltaWatch(req, watch.state, watches.deltaMuxedResponses)
watches.deltaWatches[typeURL] = watch

go func() {
resp, more := <-watch.responses
if more {
watches.deltaMuxedResponses <- resp
}
}()
}
}
}
Expand Down
18 changes: 9 additions & 9 deletions pkg/server/delta/v3/watches.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@ type watches struct {
// newWatches creates and initializes watches.
func newWatches() watches {
// deltaMuxedResponses needs a buffer to release go-routines populating it
//
// because deltaMuxedResponses can be populated by an update from the cache
// and a request from the client, we need to create the channel with a buffer
// size of 2x the number of types to avoid deadlocks.
return watches{
deltaWatches: make(map[string]watch, int(types.UnknownType)),
deltaMuxedResponses: make(chan cache.DeltaResponse, int(types.UnknownType)),
deltaMuxedResponses: make(chan cache.DeltaResponse, int(types.UnknownType)*2),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment on why we provide 2 times the buffering here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add comment:

    // because deltaMuxedResponses can be populated by an update from the cache
// and a request from the client, we need to create the channel with a buffer
// size of 2x the number of types to avoid deadlocks.

}
}

Expand All @@ -28,13 +32,14 @@ func (w *watches) Cancel() {
for _, watch := range w.deltaWatches {
watch.Cancel()
}

close(w.deltaMuxedResponses)
}

// watch contains the necessary modifiables for receiving resource responses
type watch struct {
responses chan cache.DeltaResponse
cancel func()
nonce string
cancel func()
nonce string

state stream.StreamState
}
Expand All @@ -44,9 +49,4 @@ func (w *watch) Cancel() {
if w.cancel != nil {
w.cancel()
}
if w.responses != nil {
// w.responses should never be used by a producer once cancel() has been closed, so we can safely close it here
// This is needed to release resources taken by goroutines watching this channel
close(w.responses)
}
}
13 changes: 0 additions & 13 deletions pkg/server/delta/v3/watches_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,18 @@ import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/envoyproxy/go-control-plane/pkg/cache/v3"
)

func TestDeltaWatches(t *testing.T) {
t.Run("watches response channels are properly closed when the watches are canceled", func(t *testing.T) {
watches := newWatches()

cancelCount := 0
var channels []chan cache.DeltaResponse
// create a few watches, and ensure that the cancel function are called and the channels are closed
for i := 0; i < 5; i++ {
newWatch := watch{}
if i%2 == 0 {
newWatch.cancel = func() { cancelCount++ }
newWatch.responses = make(chan cache.DeltaResponse)
channels = append(channels, newWatch.responses)
}

watches.deltaWatches[strconv.Itoa(i)] = newWatch
Expand All @@ -30,13 +25,5 @@ func TestDeltaWatches(t *testing.T) {
watches.Cancel()

assert.Equal(t, 3, cancelCount)
for _, channel := range channels {
select {
case _, ok := <-channel:
assert.False(t, ok, "a channel was not closed")
default:
assert.Fail(t, "a channel was not closed")
}
}
})
}
2 changes: 2 additions & 0 deletions pkg/server/v3/delta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,8 @@ func TestDeltaAggregatedHandlers(t *testing.T) {
resp.recv <- r
}

// We create the server with the optional ordered ADS flag so we guarantee resource
// ordering over the stream.
s := server.NewServer(context.Background(), config, server.CallbackFuncs{})
go func() {
err := s.DeltaAggregatedResources(resp)
Expand Down