Skip to content

Commit

Permalink
Update handling of subscriptions to fully handle legacy wildcard beha…
Browse files Browse the repository at this point in the history
…vior

Signed-off-by: Valerian Roche <[email protected]>
  • Loading branch information
valerian-roche committed Jan 18, 2024
1 parent cefeb71 commit fff9714
Show file tree
Hide file tree
Showing 19 changed files with 450 additions and 477 deletions.
16 changes: 8 additions & 8 deletions pkg/cache/v3/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,21 +36,21 @@ type Request = discovery.DiscoveryRequest
// DeltaRequest is an alias for the delta discovery request type.
type DeltaRequest = discovery.DeltaDiscoveryRequest

// SubscriptionState stores the server view of the client state for a given resource type.
// Subscription stores the server view of the client state for a given resource type.
// This allows proper implementation of stateful aspects of the protocol (e.g. returning only some updated resources).
// Though the methods may return mutable parts of the state for performance reasons,
// the cache is expected to consider this state as immutable and thread safe between a watch creation and its cancellation.
type SubscriptionState interface {
// GetACKedResources returns a list of resources that the client has ACK'd and their associated version.
type Subscription interface {
// ReturnedResources returns a list of resources that clients have ACK'd and their associated version.
// The versions are:
// - delta protocol: version of the specific resource set in the response
// - sotw protocol: version of the global response when the resource was last ACKed
GetACKedResources() map[string]string
ReturnedResources() map[string]string

// GetSubscribedResources returns the list of resources currently subscribed to by the client for the type.
// SubscribedResources returns the list of resources currently subscribed to by the client for the type.
// For delta it keeps track of subscription updates across requests
// For sotw it is a normalized view of the last request resources
GetSubscribedResources() map[string]struct{}
SubscribedResources() map[string]struct{}

// IsWildcard returns whether the client has a wildcard watch.
// This considers subtleties related to the current migration of wildcard definitions within the protocol.
Expand Down Expand Up @@ -81,7 +81,7 @@ type ConfigWatcher interface {
//
// Cancel is an optional function to release resources in the producer. If
// provided, the consumer may call this function multiple times.
CreateWatch(*Request, SubscriptionState, chan Response) (cancel func(), err error)
CreateWatch(*Request, Subscription, chan Response) (cancel func(), err error)

// CreateDeltaWatch returns a new open incremental xDS watch.
// This is the entrypoint to propagate configuration changes the
Expand All @@ -93,7 +93,7 @@ type ConfigWatcher interface {
//
// Cancel is an optional function to release resources in the producer. If
// provided, the consumer may call this function multiple times.
CreateDeltaWatch(*DeltaRequest, SubscriptionState, chan DeltaResponse) (cancel func(), err error)
CreateDeltaWatch(*DeltaRequest, Subscription, chan DeltaResponse) (cancel func(), err error)
}

// ConfigFetcher fetches configuration resources from cache
Expand Down
14 changes: 7 additions & 7 deletions pkg/cache/v3/delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type resourceContainer struct {
systemVersion string
}

func createDeltaResponse(ctx context.Context, req *DeltaRequest, state SubscriptionState, resources resourceContainer) *RawDeltaResponse {
func createDeltaResponse(ctx context.Context, req *DeltaRequest, state Subscription, resources resourceContainer) *RawDeltaResponse {
// variables to build our response with
var nextVersionMap map[string]string
var filtered []types.Resource
Expand All @@ -36,7 +36,7 @@ func createDeltaResponse(ctx context.Context, req *DeltaRequest, state Subscript
// If we are handling a wildcard request, we want to respond with all resources
switch {
case state.IsWildcard():
if len(state.GetACKedResources()) == 0 {
if len(state.ReturnedResources()) == 0 {
filtered = make([]types.Resource, 0, len(resources.resourceMap))
}
nextVersionMap = make(map[string]string, len(resources.resourceMap))
Expand All @@ -45,25 +45,25 @@ func createDeltaResponse(ctx context.Context, req *DeltaRequest, state Subscript
// we can just set it here to be used for comparison later
version := resources.versionMap[name]
nextVersionMap[name] = version
prevVersion, found := state.GetACKedResources()[name]
prevVersion, found := state.ReturnedResources()[name]
if !found || (prevVersion != version) {
filtered = append(filtered, r)
}
}

// Compute resources for removal
// The resource version can be set to "" here to trigger a removal even if never returned before
for name := range state.GetACKedResources() {
for name := range state.ReturnedResources() {
if _, ok := resources.resourceMap[name]; !ok {
toRemove = append(toRemove, name)
}
}
default:
nextVersionMap = make(map[string]string, len(state.GetSubscribedResources()))
nextVersionMap = make(map[string]string, len(state.SubscribedResources()))
// state.GetResourceVersions() may include resources no longer subscribed
// In the current code this gets silently cleaned when updating the version map
for name := range state.GetSubscribedResources() {
prevVersion, found := state.GetACKedResources()[name]
for name := range state.SubscribedResources() {
prevVersion, found := state.ReturnedResources()[name]
if r, ok := resources.resourceMap[name]; ok {
nextVersion := resources.versionMap[name]
if prevVersion != nextVersion {
Expand Down
40 changes: 21 additions & 19 deletions pkg/cache/v3/delta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,14 @@ func TestSnapshotCacheDeltaWatch(t *testing.T) {
// Make our initial request as a wildcard to get all resources and make sure the wildcard requesting works as intended
for _, typ := range testTypes {
watches[typ] = make(chan cache.DeltaResponse, 1)
state := stream.NewSubscriptionState(true, nil)
sub := stream.NewSubscription(true, nil)
_, err := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
Node: &core.Node{
Id: "node",
},
TypeUrl: typ,
ResourceNamesSubscribe: names[typ],
}, state, watches[typ])
}, sub, watches[typ])
require.NoError(t, err)
}

Expand All @@ -69,17 +69,19 @@ func TestSnapshotCacheDeltaWatch(t *testing.T) {
// all resources as well as individual resource removals
for _, typ := range testTypes {
watches[typ] = make(chan cache.DeltaResponse, 1)
state := stream.NewSubscriptionState(false, versionMap[typ])
sub := stream.NewSubscription(false, versionMap[typ])
resources := []string{}
for resource := range versionMap[typ] {
state.GetSubscribedResources()[resource] = struct{}{}
resources = append(resources, resource)
}
sub.SetResourceSubscription(resources)
_, err := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
Node: &core.Node{
Id: "node",
},
TypeUrl: typ,
ResourceNamesSubscribe: names[typ],
}, state, watches[typ])
}, sub, watches[typ])
require.NoError(t, err)
}

Expand Down Expand Up @@ -113,21 +115,21 @@ func TestSnapshotCacheDeltaWatch(t *testing.T) {
func TestDeltaRemoveResources(t *testing.T) {
c := cache.NewSnapshotCache(false, group{}, logger{t: t})
watches := make(map[string]chan cache.DeltaResponse)
streams := make(map[string]*stream.SubscriptionState)
subs := make(map[string]*stream.Subscription)

// At this stage the cache is empty, so a watch is opened
for _, typ := range testTypes {
watches[typ] = make(chan cache.DeltaResponse, 1)
state := stream.NewSubscriptionState(true, make(map[string]string))
streams[typ] = &state
sub := stream.NewSubscription(true, make(map[string]string))
subs[typ] = &sub
// We don't specify any resource name subscriptions here because we want to make sure we test wildcard
// functionality. This means we should receive all resources back without requesting a subscription by name.
_, err := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
Node: &core.Node{
Id: "node",
},
TypeUrl: typ,
}, *streams[typ], watches[typ])
}, *subs[typ], watches[typ])
require.NoError(t, err)
}

Expand All @@ -144,7 +146,7 @@ func TestDeltaRemoveResources(t *testing.T) {
case out := <-watches[typ]:
assertResourceMapEqual(t, cache.IndexRawResourcesByName(out.(*cache.RawDeltaResponse).Resources), snapshot.GetResources(typ))
nextVersionMap := out.GetNextVersionMap()
streams[typ].SetACKedResources(nextVersionMap)
subs[typ].SetReturnedResources(nextVersionMap)
case <-time.After(time.Second):
require.Fail(t, "failed to receive a snapshot response")
}
Expand All @@ -161,7 +163,7 @@ func TestDeltaRemoveResources(t *testing.T) {
},
TypeUrl: typ,
ResponseNonce: "nonce",
}, *streams[typ], watches[typ])
}, *subs[typ], watches[typ])
require.NoError(t, err)
}

Expand All @@ -181,7 +183,7 @@ func TestDeltaRemoveResources(t *testing.T) {
assert.Equal(t, []string{"otherCluster"}, out.(*cache.RawDeltaResponse).RemovedResources)
nextVersionMap := out.GetNextVersionMap()
// make sure the version maps are different since we no longer are tracking any endpoint resources
assert.NotEqual(t, nextVersionMap, streams[testTypes[0]].GetACKedResources(), "versionMap for the endpoint resource type did not change")
assert.NotEqual(t, nextVersionMap, subs[testTypes[0]].ReturnedResources(), "versionMap for the endpoint resource type did not change")
case <-time.After(time.Second):
assert.Fail(t, "failed to receive snapshot response")
}
Expand All @@ -206,14 +208,14 @@ func TestConcurrentSetDeltaWatch(t *testing.T) {
t.Fatalf("snapshot failed: %s", err)
}
} else {
state := stream.NewSubscriptionState(false, make(map[string]string))
sub := stream.NewSubscription(false, make(map[string]string))
cancel, err := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
Node: &core.Node{
Id: id,
},
TypeUrl: rsrc.EndpointType,
ResourceNamesSubscribe: []string{clusterName},
}, state, responses)
}, sub, responses)
require.NoError(t, err)

defer cancel()
Expand All @@ -230,15 +232,15 @@ func TestSnapshotDeltaCacheWatchTimeout(t *testing.T) {

// Create a non-buffered channel that will block sends.
watchCh := make(chan cache.DeltaResponse)
state := stream.NewSubscriptionState(false, nil)
state.SetSubscribedResources(map[string]struct{}{names[rsrc.EndpointType][0]: {}})
sub := stream.NewSubscription(false, nil)
sub.UpdateResourceSubscriptions([]string{names[rsrc.EndpointType][0]}, nil)
_, err := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
Node: &core.Node{
Id: key,
},
TypeUrl: rsrc.EndpointType,
ResourceNamesSubscribe: names[rsrc.EndpointType],
}, state, watchCh)
}, sub, watchCh)
require.NoError(t, err)

// The first time we set the snapshot without consuming from the blocking channel, so this should time out.
Expand Down Expand Up @@ -275,14 +277,14 @@ func TestSnapshotCacheDeltaWatchCancel(t *testing.T) {
c := cache.NewSnapshotCache(true, group{}, logger{t: t})
for _, typ := range testTypes {
responses := make(chan cache.DeltaResponse, 1)
state := stream.NewSubscriptionState(false, make(map[string]string))
sub := stream.NewSubscription(false, make(map[string]string))
cancel, err := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
Node: &core.Node{
Id: key,
},
TypeUrl: typ,
ResourceNamesSubscribe: names[typ],
}, state, responses)
}, sub, responses)
require.NoError(t, err)

// Cancel the watch
Expand Down
31 changes: 14 additions & 17 deletions pkg/cache/v3/linear.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ func NewLinearCache(typeURL string, opts ...LinearCacheOption) *LinearCache {
versionMap: nil,
version: 0,
versionVector: make(map[string]uint64),
log: log.NewDefaultLogger(),
}
for _, opt := range opts {
opt(out)
Expand Down Expand Up @@ -164,31 +165,29 @@ func (cache *LinearCache) notifyAll(modified map[string]struct{}) {
}

for id, watch := range cache.deltaWatches {
if !watch.subscriptionState.WatchesResources(modified) {
if !watch.subscription.WatchesResources(modified) {
continue
}

res := cache.respondDelta(watch.Request, watch.Response, watch.subscriptionState)
res := cache.respondDelta(watch.Request, watch.Response, watch.subscription)
if res != nil {
delete(cache.deltaWatches, id)
}
}
}
}

func (cache *LinearCache) respondDelta(request *DeltaRequest, value chan DeltaResponse, clientState SubscriptionState) *RawDeltaResponse {
resp := createDeltaResponse(context.Background(), request, clientState, resourceContainer{
func (cache *LinearCache) respondDelta(request *DeltaRequest, value chan DeltaResponse, sub Subscription) *RawDeltaResponse {
resp := createDeltaResponse(context.Background(), request, sub, resourceContainer{
resourceMap: cache.resources,
versionMap: cache.versionMap,
systemVersion: cache.getVersion(),
})

// Only send a response if there were changes
if len(resp.Resources) > 0 || len(resp.RemovedResources) > 0 {
if cache.log != nil {
cache.log.Debugf("[linear cache] node: %s, sending delta response for typeURL %s with resources: %v removed resources: %v with wildcard: %t",
request.GetNode().GetId(), request.GetTypeUrl(), GetResourceNames(resp.Resources), resp.RemovedResources, clientState.IsWildcard())
}
cache.log.Debugf("[linear cache] node: %s, sending delta response for typeURL %s with resources: %v removed resources: %v with wildcard: %t",
request.GetNode().GetId(), request.GetTypeUrl(), GetResourceNames(resp.Resources), resp.RemovedResources, sub.IsWildcard())
value <- resp
return resp
}
Expand Down Expand Up @@ -298,7 +297,7 @@ func (cache *LinearCache) GetResources() map[string]types.Resource {
return resources
}

func (cache *LinearCache) CreateWatch(request *Request, _ SubscriptionState, value chan Response) (func(), error) {
func (cache *LinearCache) CreateWatch(request *Request, _ Subscription, value chan Response) (func(), error) {
if request.GetTypeUrl() != cache.typeURL {
value <- nil
return nil, fmt.Errorf("request type %s does not match cache type %s", request.TypeUrl, cache.typeURL)
Expand Down Expand Up @@ -372,7 +371,7 @@ func (cache *LinearCache) CreateWatch(request *Request, _ SubscriptionState, val
}, nil
}

func (cache *LinearCache) CreateDeltaWatch(request *DeltaRequest, clientState SubscriptionState, value chan DeltaResponse) (func(), error) {
func (cache *LinearCache) CreateDeltaWatch(request *DeltaRequest, sub Subscription, value chan DeltaResponse) (func(), error) {
cache.mu.Lock()
defer cache.mu.Unlock()

Expand All @@ -385,22 +384,20 @@ func (cache *LinearCache) CreateDeltaWatch(request *DeltaRequest, clientState Su
modified[name] = struct{}{}
}
err := cache.updateVersionMap(modified)
if err != nil && cache.log != nil {
if err != nil {
cache.log.Errorf("failed to update version map: %v", err)
}
}
response := cache.respondDelta(request, value, clientState)
response := cache.respondDelta(request, value, sub)

// if respondDelta returns nil this means that there is no change in any resource version
// create a new watch accordingly
if response == nil {
watchID := cache.nextDeltaWatchID()
if cache.log != nil {
cache.log.Infof("[linear cache] open delta watch ID:%d for %s Resources:%v, system version %q", watchID,
cache.typeURL, clientState.GetSubscribedResources(), cache.getVersion())
}
cache.log.Infof("[linear cache] open delta watch ID:%d for %s Resources:%v, system version %q", watchID,
cache.typeURL, sub.SubscribedResources(), cache.getVersion())

cache.deltaWatches[watchID] = DeltaResponseWatch{Request: request, Response: value, subscriptionState: clientState}
cache.deltaWatches[watchID] = DeltaResponseWatch{Request: request, Response: value, subscription: sub}

return cache.cancelDeltaWatch(watchID), nil
}
Expand Down
Loading

0 comments on commit fff9714

Please sign in to comment.