diff --git a/pkg/cache/v3/cache.go b/pkg/cache/v3/cache.go index 1c6d228222..081b0a216f 100644 --- a/pkg/cache/v3/cache.go +++ b/pkg/cache/v3/cache.go @@ -36,21 +36,21 @@ type Request = discovery.DiscoveryRequest // DeltaRequest is an alias for the delta discovery request type. type DeltaRequest = discovery.DeltaDiscoveryRequest -// SubscriptionState stores the server view of the client state for a given resource type. +// Subscription stores the server view of the client state for a given resource type. // This allows proper implementation of stateful aspects of the protocol (e.g. returning only some updated resources). // Though the methods may return mutable parts of the state for performance reasons, // the cache is expected to consider this state as immutable and thread safe between a watch creation and its cancellation. -type SubscriptionState interface { - // GetACKedResources returns a list of resources that the client has ACK'd and their associated version. +type Subscription interface { + // ReturnedResources returns a list of resources that clients have ACK'd and their associated version. // The versions are: // - delta protocol: version of the specific resource set in the response // - sotw protocol: version of the global response when the resource was last ACKed - GetACKedResources() map[string]string + ReturnedResources() map[string]string - // GetSubscribedResources returns the list of resources currently subscribed to by the client for the type. + // SubscribedResources returns the list of resources currently subscribed to by the client for the type. // For delta it keeps track of subscription updates across requests // For sotw it is a normalized view of the last request resources - GetSubscribedResources() map[string]struct{} + SubscribedResources() map[string]struct{} // IsWildcard returns whether the client has a wildcard watch. // This considers subtleties related to the current migration of wildcard definitions within the protocol. @@ -81,7 +81,7 @@ type ConfigWatcher interface { // // Cancel is an optional function to release resources in the producer. If // provided, the consumer may call this function multiple times. - CreateWatch(*Request, SubscriptionState, chan Response) (cancel func(), err error) + CreateWatch(*Request, Subscription, chan Response) (cancel func(), err error) // CreateDeltaWatch returns a new open incremental xDS watch. // This is the entrypoint to propagate configuration changes the @@ -93,7 +93,7 @@ type ConfigWatcher interface { // // Cancel is an optional function to release resources in the producer. If // provided, the consumer may call this function multiple times. - CreateDeltaWatch(*DeltaRequest, SubscriptionState, chan DeltaResponse) (cancel func(), err error) + CreateDeltaWatch(*DeltaRequest, Subscription, chan DeltaResponse) (cancel func(), err error) } // ConfigFetcher fetches configuration resources from cache diff --git a/pkg/cache/v3/delta.go b/pkg/cache/v3/delta.go index 276dce4e60..c7920d9fd2 100644 --- a/pkg/cache/v3/delta.go +++ b/pkg/cache/v3/delta.go @@ -27,7 +27,7 @@ type resourceContainer struct { systemVersion string } -func createDeltaResponse(ctx context.Context, req *DeltaRequest, state SubscriptionState, resources resourceContainer) *RawDeltaResponse { +func createDeltaResponse(ctx context.Context, req *DeltaRequest, state Subscription, resources resourceContainer) *RawDeltaResponse { // variables to build our response with var nextVersionMap map[string]string var filtered []types.Resource @@ -36,7 +36,7 @@ func createDeltaResponse(ctx context.Context, req *DeltaRequest, state Subscript // If we are handling a wildcard request, we want to respond with all resources switch { case state.IsWildcard(): - if len(state.GetACKedResources()) == 0 { + if len(state.ReturnedResources()) == 0 { filtered = make([]types.Resource, 0, len(resources.resourceMap)) } nextVersionMap = make(map[string]string, len(resources.resourceMap)) @@ -45,7 +45,7 @@ func createDeltaResponse(ctx context.Context, req *DeltaRequest, state Subscript // we can just set it here to be used for comparison later version := resources.versionMap[name] nextVersionMap[name] = version - prevVersion, found := state.GetACKedResources()[name] + prevVersion, found := state.ReturnedResources()[name] if !found || (prevVersion != version) { filtered = append(filtered, r) } @@ -53,17 +53,17 @@ func createDeltaResponse(ctx context.Context, req *DeltaRequest, state Subscript // Compute resources for removal // The resource version can be set to "" here to trigger a removal even if never returned before - for name := range state.GetACKedResources() { + for name := range state.ReturnedResources() { if _, ok := resources.resourceMap[name]; !ok { toRemove = append(toRemove, name) } } default: - nextVersionMap = make(map[string]string, len(state.GetSubscribedResources())) + nextVersionMap = make(map[string]string, len(state.SubscribedResources())) // state.GetResourceVersions() may include resources no longer subscribed // In the current code this gets silently cleaned when updating the version map - for name := range state.GetSubscribedResources() { - prevVersion, found := state.GetACKedResources()[name] + for name := range state.SubscribedResources() { + prevVersion, found := state.ReturnedResources()[name] if r, ok := resources.resourceMap[name]; ok { nextVersion := resources.versionMap[name] if prevVersion != nextVersion { diff --git a/pkg/cache/v3/delta_test.go b/pkg/cache/v3/delta_test.go index 445cdf0a61..44f176db4b 100644 --- a/pkg/cache/v3/delta_test.go +++ b/pkg/cache/v3/delta_test.go @@ -35,14 +35,14 @@ func TestSnapshotCacheDeltaWatch(t *testing.T) { // Make our initial request as a wildcard to get all resources and make sure the wildcard requesting works as intended for _, typ := range testTypes { watches[typ] = make(chan cache.DeltaResponse, 1) - state := stream.NewSubscriptionState(true, nil) + sub := stream.NewSubscription(true, nil) _, err := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{ Node: &core.Node{ Id: "node", }, TypeUrl: typ, ResourceNamesSubscribe: names[typ], - }, state, watches[typ]) + }, sub, watches[typ]) require.NoError(t, err) } @@ -69,17 +69,19 @@ func TestSnapshotCacheDeltaWatch(t *testing.T) { // all resources as well as individual resource removals for _, typ := range testTypes { watches[typ] = make(chan cache.DeltaResponse, 1) - state := stream.NewSubscriptionState(false, versionMap[typ]) + sub := stream.NewSubscription(false, versionMap[typ]) + resources := []string{} for resource := range versionMap[typ] { - state.GetSubscribedResources()[resource] = struct{}{} + resources = append(resources, resource) } + sub.SetResourceSubscription(resources) _, err := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{ Node: &core.Node{ Id: "node", }, TypeUrl: typ, ResourceNamesSubscribe: names[typ], - }, state, watches[typ]) + }, sub, watches[typ]) require.NoError(t, err) } @@ -113,13 +115,13 @@ func TestSnapshotCacheDeltaWatch(t *testing.T) { func TestDeltaRemoveResources(t *testing.T) { c := cache.NewSnapshotCache(false, group{}, logger{t: t}) watches := make(map[string]chan cache.DeltaResponse) - streams := make(map[string]*stream.SubscriptionState) + subs := make(map[string]*stream.Subscription) // At this stage the cache is empty, so a watch is opened for _, typ := range testTypes { watches[typ] = make(chan cache.DeltaResponse, 1) - state := stream.NewSubscriptionState(true, make(map[string]string)) - streams[typ] = &state + sub := stream.NewSubscription(true, make(map[string]string)) + subs[typ] = &sub // We don't specify any resource name subscriptions here because we want to make sure we test wildcard // functionality. This means we should receive all resources back without requesting a subscription by name. _, err := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{ @@ -127,7 +129,7 @@ func TestDeltaRemoveResources(t *testing.T) { Id: "node", }, TypeUrl: typ, - }, *streams[typ], watches[typ]) + }, *subs[typ], watches[typ]) require.NoError(t, err) } @@ -144,7 +146,7 @@ func TestDeltaRemoveResources(t *testing.T) { case out := <-watches[typ]: assertResourceMapEqual(t, cache.IndexRawResourcesByName(out.(*cache.RawDeltaResponse).Resources), snapshot.GetResources(typ)) nextVersionMap := out.GetNextVersionMap() - streams[typ].SetACKedResources(nextVersionMap) + subs[typ].SetReturnedResources(nextVersionMap) case <-time.After(time.Second): require.Fail(t, "failed to receive a snapshot response") } @@ -161,7 +163,7 @@ func TestDeltaRemoveResources(t *testing.T) { }, TypeUrl: typ, ResponseNonce: "nonce", - }, *streams[typ], watches[typ]) + }, *subs[typ], watches[typ]) require.NoError(t, err) } @@ -181,7 +183,7 @@ func TestDeltaRemoveResources(t *testing.T) { assert.Equal(t, []string{"otherCluster"}, out.(*cache.RawDeltaResponse).RemovedResources) nextVersionMap := out.GetNextVersionMap() // make sure the version maps are different since we no longer are tracking any endpoint resources - assert.NotEqual(t, nextVersionMap, streams[testTypes[0]].GetACKedResources(), "versionMap for the endpoint resource type did not change") + assert.NotEqual(t, nextVersionMap, subs[testTypes[0]].ReturnedResources(), "versionMap for the endpoint resource type did not change") case <-time.After(time.Second): assert.Fail(t, "failed to receive snapshot response") } @@ -206,14 +208,14 @@ func TestConcurrentSetDeltaWatch(t *testing.T) { t.Fatalf("snapshot failed: %s", err) } } else { - state := stream.NewSubscriptionState(false, make(map[string]string)) + sub := stream.NewSubscription(false, make(map[string]string)) cancel, err := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{ Node: &core.Node{ Id: id, }, TypeUrl: rsrc.EndpointType, ResourceNamesSubscribe: []string{clusterName}, - }, state, responses) + }, sub, responses) require.NoError(t, err) defer cancel() @@ -230,15 +232,15 @@ func TestSnapshotDeltaCacheWatchTimeout(t *testing.T) { // Create a non-buffered channel that will block sends. watchCh := make(chan cache.DeltaResponse) - state := stream.NewSubscriptionState(false, nil) - state.SetSubscribedResources(map[string]struct{}{names[rsrc.EndpointType][0]: {}}) + sub := stream.NewSubscription(false, nil) + sub.UpdateResourceSubscriptions([]string{names[rsrc.EndpointType][0]}, nil) _, err := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{ Node: &core.Node{ Id: key, }, TypeUrl: rsrc.EndpointType, ResourceNamesSubscribe: names[rsrc.EndpointType], - }, state, watchCh) + }, sub, watchCh) require.NoError(t, err) // The first time we set the snapshot without consuming from the blocking channel, so this should time out. @@ -275,14 +277,14 @@ func TestSnapshotCacheDeltaWatchCancel(t *testing.T) { c := cache.NewSnapshotCache(true, group{}, logger{t: t}) for _, typ := range testTypes { responses := make(chan cache.DeltaResponse, 1) - state := stream.NewSubscriptionState(false, make(map[string]string)) + sub := stream.NewSubscription(false, make(map[string]string)) cancel, err := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{ Node: &core.Node{ Id: key, }, TypeUrl: typ, ResourceNamesSubscribe: names[typ], - }, state, responses) + }, sub, responses) require.NoError(t, err) // Cancel the watch diff --git a/pkg/cache/v3/linear.go b/pkg/cache/v3/linear.go index 3b6d0d9b81..c4d29aa932 100644 --- a/pkg/cache/v3/linear.go +++ b/pkg/cache/v3/linear.go @@ -106,6 +106,7 @@ func NewLinearCache(typeURL string, opts ...LinearCacheOption) *LinearCache { versionMap: nil, version: 0, versionVector: make(map[string]uint64), + log: log.NewDefaultLogger(), } for _, opt := range opts { opt(out) @@ -164,11 +165,11 @@ func (cache *LinearCache) notifyAll(modified map[string]struct{}) { } for id, watch := range cache.deltaWatches { - if !watch.subscriptionState.WatchesResources(modified) { + if !watch.subscription.WatchesResources(modified) { continue } - res := cache.respondDelta(watch.Request, watch.Response, watch.subscriptionState) + res := cache.respondDelta(watch.Request, watch.Response, watch.subscription) if res != nil { delete(cache.deltaWatches, id) } @@ -176,8 +177,8 @@ func (cache *LinearCache) notifyAll(modified map[string]struct{}) { } } -func (cache *LinearCache) respondDelta(request *DeltaRequest, value chan DeltaResponse, clientState SubscriptionState) *RawDeltaResponse { - resp := createDeltaResponse(context.Background(), request, clientState, resourceContainer{ +func (cache *LinearCache) respondDelta(request *DeltaRequest, value chan DeltaResponse, sub Subscription) *RawDeltaResponse { + resp := createDeltaResponse(context.Background(), request, sub, resourceContainer{ resourceMap: cache.resources, versionMap: cache.versionMap, systemVersion: cache.getVersion(), @@ -185,10 +186,8 @@ func (cache *LinearCache) respondDelta(request *DeltaRequest, value chan DeltaRe // Only send a response if there were changes if len(resp.Resources) > 0 || len(resp.RemovedResources) > 0 { - if cache.log != nil { - cache.log.Debugf("[linear cache] node: %s, sending delta response for typeURL %s with resources: %v removed resources: %v with wildcard: %t", - request.GetNode().GetId(), request.GetTypeUrl(), GetResourceNames(resp.Resources), resp.RemovedResources, clientState.IsWildcard()) - } + cache.log.Debugf("[linear cache] node: %s, sending delta response for typeURL %s with resources: %v removed resources: %v with wildcard: %t", + request.GetNode().GetId(), request.GetTypeUrl(), GetResourceNames(resp.Resources), resp.RemovedResources, sub.IsWildcard()) value <- resp return resp } @@ -298,7 +297,7 @@ func (cache *LinearCache) GetResources() map[string]types.Resource { return resources } -func (cache *LinearCache) CreateWatch(request *Request, _ SubscriptionState, value chan Response) (func(), error) { +func (cache *LinearCache) CreateWatch(request *Request, _ Subscription, value chan Response) (func(), error) { if request.GetTypeUrl() != cache.typeURL { value <- nil return nil, fmt.Errorf("request type %s does not match cache type %s", request.TypeUrl, cache.typeURL) @@ -372,7 +371,7 @@ func (cache *LinearCache) CreateWatch(request *Request, _ SubscriptionState, val }, nil } -func (cache *LinearCache) CreateDeltaWatch(request *DeltaRequest, clientState SubscriptionState, value chan DeltaResponse) (func(), error) { +func (cache *LinearCache) CreateDeltaWatch(request *DeltaRequest, sub Subscription, value chan DeltaResponse) (func(), error) { cache.mu.Lock() defer cache.mu.Unlock() @@ -385,22 +384,20 @@ func (cache *LinearCache) CreateDeltaWatch(request *DeltaRequest, clientState Su modified[name] = struct{}{} } err := cache.updateVersionMap(modified) - if err != nil && cache.log != nil { + if err != nil { cache.log.Errorf("failed to update version map: %v", err) } } - response := cache.respondDelta(request, value, clientState) + response := cache.respondDelta(request, value, sub) // if respondDelta returns nil this means that there is no change in any resource version // create a new watch accordingly if response == nil { watchID := cache.nextDeltaWatchID() - if cache.log != nil { - cache.log.Infof("[linear cache] open delta watch ID:%d for %s Resources:%v, system version %q", watchID, - cache.typeURL, clientState.GetSubscribedResources(), cache.getVersion()) - } + cache.log.Infof("[linear cache] open delta watch ID:%d for %s Resources:%v, system version %q", watchID, + cache.typeURL, sub.SubscribedResources(), cache.getVersion()) - cache.deltaWatches[watchID] = DeltaResponseWatch{Request: request, Response: value, subscriptionState: clientState} + cache.deltaWatches[watchID] = DeltaResponseWatch{Request: request, Response: value, subscription: sub} return cache.cancelDeltaWatch(watchID), nil } diff --git a/pkg/cache/v3/linear_test.go b/pkg/cache/v3/linear_test.go index 1414e9893e..d3287bd398 100644 --- a/pkg/cache/v3/linear_test.go +++ b/pkg/cache/v3/linear_test.go @@ -190,38 +190,38 @@ func hashResource(t *testing.T, resource types.Resource) string { } func createWildcardDeltaWatch(c *LinearCache, w chan DeltaResponse) error { - state := stream.NewSubscriptionState(true, nil) - if _, err := c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state, w); err != nil { + sub := stream.NewSubscription(true, nil) + if _, err := c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &sub, w); err != nil { return err } resp := <-w - state.SetACKedResources(resp.GetNextVersionMap()) - _, err := c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state, w) // Ensure the watch is set properly with cache values + sub.SetReturnedResources(resp.GetNextVersionMap()) + _, err := c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &sub, w) // Ensure the watch is set properly with cache values return err } func TestLinearInitialResources(t *testing.T) { - streamState := stream.NewSubscriptionState(false, map[string]string{}) + sub := stream.NewSubscription(false, map[string]string{}) c := NewLinearCache(testType, WithInitialResources(map[string]types.Resource{"a": testResource("a"), "b": testResource("b")})) w := make(chan Response, 1) - _, err := c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType}, streamState, w) + _, err := c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType}, sub, w) require.NoError(t, err) verifyResponse(t, w, "0", 1) - _, err = c.CreateWatch(&Request{TypeUrl: testType}, streamState, w) + _, err = c.CreateWatch(&Request{TypeUrl: testType}, sub, w) require.NoError(t, err) verifyResponse(t, w, "0", 2) checkVersionMapNotSet(t, c) } func TestLinearCornerCases(t *testing.T) { - streamState := stream.NewSubscriptionState(false, map[string]string{}) + sub := stream.NewSubscription(false, map[string]string{}) c := NewLinearCache(testType) err := c.UpdateResource("a", nil) assert.Error(t, err, "expected error on nil resource") // create an incorrect type URL request w := make(chan Response, 1) - _, err = c.CreateWatch(&Request{TypeUrl: "test"}, streamState, w) + _, err = c.CreateWatch(&Request{TypeUrl: "test"}, sub, w) assert.Error(t, err, "watch should fail to be created") select { case r := <-w: @@ -232,18 +232,18 @@ func TestLinearCornerCases(t *testing.T) { } func TestLinearBasic(t *testing.T) { - streamState := stream.NewSubscriptionState(false, map[string]string{}) + sub := stream.NewSubscription(false, map[string]string{}) c := NewLinearCache(testType) // Create watches before a resource is ready w1 := make(chan Response, 1) - _, err := c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w1) + _, err := c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, sub, w1) require.NoError(t, err) mustBlock(t, w1) checkVersionMapNotSet(t, c) w := make(chan Response, 1) - _, err = c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, streamState, w) + _, err = c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, sub, w) require.NoError(t, err) mustBlock(t, w) checkWatchCount(t, c, "a", 2) @@ -255,11 +255,11 @@ func TestLinearBasic(t *testing.T) { verifyResponse(t, w, "1", 1) // Request again, should get same response - _, err = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w) + _, err = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, sub, w) require.NoError(t, err) checkWatchCount(t, c, "a", 0) verifyResponse(t, w, "1", 1) - _, err = c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, streamState, w) + _, err = c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, sub, w) require.NoError(t, err) checkWatchCount(t, c, "a", 0) verifyResponse(t, w, "1", 1) @@ -267,10 +267,10 @@ func TestLinearBasic(t *testing.T) { // Add another element and update the first, response should be different require.NoError(t, c.UpdateResource("b", testResource("b"))) require.NoError(t, c.UpdateResource("a", testResource("aa"))) - _, err = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w) + _, err = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, sub, w) require.NoError(t, err) verifyResponse(t, w, "3", 1) - _, err = c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, streamState, w) + _, err = c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, sub, w) require.NoError(t, err) verifyResponse(t, w, "3", 2) // Ensure the version map was not created as we only ever used stow watches @@ -278,16 +278,16 @@ func TestLinearBasic(t *testing.T) { } func TestLinearSetResources(t *testing.T) { - streamState := stream.NewSubscriptionState(false, map[string]string{}) + sub := stream.NewSubscription(false, map[string]string{}) c := NewLinearCache(testType) // Create new resources w1 := make(chan Response, 1) - _, err := c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w1) + _, err := c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, sub, w1) require.NoError(t, err) mustBlock(t, w1) w2 := make(chan Response, 1) - _, err = c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, streamState, w2) + _, err = c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, sub, w2) require.NoError(t, err) mustBlock(t, w2) c.SetResources(map[string]types.Resource{ @@ -298,10 +298,10 @@ func TestLinearSetResources(t *testing.T) { verifyResponse(t, w2, "1", 2) // the version was only incremented once for all resources // Add another element and update the first, response should be different - _, err = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, streamState, w1) + _, err = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, sub, w1) require.NoError(t, err) mustBlock(t, w1) - _, err = c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, streamState, w2) + _, err = c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, sub, w2) require.NoError(t, err) mustBlock(t, w2) c.SetResources(map[string]types.Resource{ @@ -313,10 +313,10 @@ func TestLinearSetResources(t *testing.T) { verifyResponse(t, w2, "2", 3) // Delete resource - _, err = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "2"}, streamState, w1) + _, err = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "2"}, sub, w1) require.NoError(t, err) mustBlock(t, w1) - _, err = c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "2"}, streamState, w2) + _, err = c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "2"}, sub, w2) require.NoError(t, err) mustBlock(t, w2) c.SetResources(map[string]types.Resource{ @@ -345,56 +345,56 @@ func TestLinearGetResources(t *testing.T) { } func TestLinearVersionPrefix(t *testing.T) { - streamState := stream.NewSubscriptionState(false, map[string]string{}) + sub := stream.NewSubscription(false, map[string]string{}) c := NewLinearCache(testType, WithVersionPrefix("instance1-")) w := make(chan Response, 1) - _, err := c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w) + _, err := c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, sub, w) require.NoError(t, err) verifyResponse(t, w, "instance1-0", 0) require.NoError(t, c.UpdateResource("a", testResource("a"))) - _, err = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w) + _, err = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, sub, w) require.NoError(t, err) verifyResponse(t, w, "instance1-1", 1) - _, err = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "instance1-1"}, streamState, w) + _, err = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "instance1-1"}, sub, w) require.NoError(t, err) mustBlock(t, w) checkWatchCount(t, c, "a", 1) } func TestLinearDeletion(t *testing.T) { - streamState := stream.NewSubscriptionState(false, map[string]string{}) + sub := stream.NewSubscription(false, map[string]string{}) c := NewLinearCache(testType, WithInitialResources(map[string]types.Resource{"a": testResource("a"), "b": testResource("b")})) w := make(chan Response, 1) - _, err := c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w) + _, err := c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, sub, w) require.NoError(t, err) mustBlock(t, w) checkWatchCount(t, c, "a", 1) require.NoError(t, c.DeleteResource("a")) verifyResponse(t, w, "1", 0) checkWatchCount(t, c, "a", 0) - _, err = c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, streamState, w) + _, err = c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, sub, w) require.NoError(t, err) verifyResponse(t, w, "1", 1) checkWatchCount(t, c, "b", 0) require.NoError(t, c.DeleteResource("b")) - _, err = c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, streamState, w) + _, err = c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, sub, w) require.NoError(t, err) verifyResponse(t, w, "2", 0) checkWatchCount(t, c, "b", 0) } func TestLinearWatchTwo(t *testing.T) { - streamState := stream.NewSubscriptionState(false, map[string]string{}) + sub := stream.NewSubscription(false, map[string]string{}) c := NewLinearCache(testType, WithInitialResources(map[string]types.Resource{"a": testResource("a"), "b": testResource("b")})) w := make(chan Response, 1) - _, err := c.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w) + _, err := c.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: "0"}, sub, w) require.NoError(t, err) mustBlock(t, w) w1 := make(chan Response, 1) - _, err = c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, streamState, w1) + _, err = c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, sub, w1) require.NoError(t, err) mustBlock(t, w1) require.NoError(t, c.UpdateResource("a", testResource("aa"))) @@ -404,13 +404,13 @@ func TestLinearWatchTwo(t *testing.T) { } func TestLinearCancel(t *testing.T) { - streamState := stream.NewSubscriptionState(false, map[string]string{}) + sub := stream.NewSubscription(false, map[string]string{}) c := NewLinearCache(testType) require.NoError(t, c.UpdateResource("a", testResource("a"))) // cancel watch-all w := make(chan Response, 1) - cancel, err := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, streamState, w) + cancel, err := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, sub, w) require.NoError(t, err) mustBlock(t, w) checkWatchCount(t, c, "a", 1) @@ -418,7 +418,7 @@ func TestLinearCancel(t *testing.T) { checkWatchCount(t, c, "a", 0) // cancel watch for "a" - cancel, err = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, streamState, w) + cancel, err = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, sub, w) require.NoError(t, err) mustBlock(t, w) checkWatchCount(t, c, "a", 1) @@ -429,13 +429,13 @@ func TestLinearCancel(t *testing.T) { w2 := make(chan Response, 1) w3 := make(chan Response, 1) w4 := make(chan Response, 1) - cancel, err = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, streamState, w) + cancel, err = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, sub, w) require.NoError(t, err) - cancel2, err := c.CreateWatch(&Request{ResourceNames: []string{"b"}, TypeUrl: testType, VersionInfo: "1"}, streamState, w2) + cancel2, err := c.CreateWatch(&Request{ResourceNames: []string{"b"}, TypeUrl: testType, VersionInfo: "1"}, sub, w2) require.NoError(t, err) - cancel3, err := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, streamState, w3) + cancel3, err := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, sub, w3) require.NoError(t, err) - cancel4, err := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, streamState, w4) + cancel4, err := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, sub, w4) require.NoError(t, err) mustBlock(t, w) mustBlock(t, w2) @@ -458,7 +458,7 @@ func TestLinearCancel(t *testing.T) { // TODO(mattklein123): This test requires GOMAXPROCS or -parallel >= 100. This should be // rewritten to not require that. This is not the case in the GH actions environment. func TestLinearConcurrentSetWatch(t *testing.T) { - streamState := stream.NewSubscriptionState(false, map[string]string{}) + sub := stream.NewSubscription(false, map[string]string{}) c := NewLinearCache(testType) n := 50 for i := 0; i < 2*n; i++ { @@ -478,7 +478,7 @@ func TestLinearConcurrentSetWatch(t *testing.T) { ResourceNames: []string{id, id2}, VersionInfo: "0", TypeUrl: testType, - }, streamState, value) + }, sub, value) require.NoError(t, err) // wait until all updates apply verifyResponse(t, value, "", 1) @@ -490,14 +490,14 @@ func TestLinearConcurrentSetWatch(t *testing.T) { func TestLinearDeltaWildcard(t *testing.T) { c := NewLinearCache(testType) - state1 := stream.NewSubscriptionState(true, map[string]string{}) + sub1 := stream.NewSubscription(true, map[string]string{}) w1 := make(chan DeltaResponse, 1) - _, err := c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state1, w1) + _, err := c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &sub1, w1) require.NoError(t, err) mustBlockDelta(t, w1) - state2 := stream.NewSubscriptionState(true, map[string]string{}) + sub2 := stream.NewSubscription(true, map[string]string{}) w2 := make(chan DeltaResponse, 1) - _, err = c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state2, w2) + _, err = c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &sub2, w2) require.NoError(t, err) mustBlockDelta(t, w1) checkDeltaWatchCount(t, c, 2) @@ -522,18 +522,18 @@ func TestLinearDeltaExistingResources(t *testing.T) { err = c.UpdateResource("b", b) require.NoError(t, err) - state := stream.NewSubscriptionState(false, nil) - state.SetSubscribedResources(map[string]struct{}{"b": {}, "c": {}}) // watching b and c - not interested in a + sub := stream.NewSubscription(false, nil) + sub.UpdateResourceSubscriptions([]string{"b", "c"}, nil) // watching b and c - not interested in a w := make(chan DeltaResponse, 1) - _, err = c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state, w) + _, err = c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &sub, w) require.NoError(t, err) checkDeltaWatchCount(t, c, 0) verifyDeltaResponse(t, w, []resourceInfo{{"b", hashB}}, []string{}) - state = stream.NewSubscriptionState(false, nil) - state.SetSubscribedResources(map[string]struct{}{"a": {}, "b": {}}) + sub = stream.NewSubscription(false, nil) + sub.UpdateResourceSubscriptions([]string{"a", "b"}, nil) w = make(chan DeltaResponse, 1) - _, err = c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state, w) + _, err = c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &sub, w) require.NoError(t, err) checkDeltaWatchCount(t, c, 0) verifyDeltaResponse(t, w, []resourceInfo{{"b", hashB}, {"a", hashA}}, nil) @@ -550,18 +550,18 @@ func TestLinearDeltaInitialResourcesVersionSet(t *testing.T) { err = c.UpdateResource("b", b) require.NoError(t, err) - state := stream.NewSubscriptionState(false, map[string]string{"b": hashB}) - state.SetSubscribedResources(map[string]struct{}{"a": {}, "b": {}}) + sub := stream.NewSubscription(false, map[string]string{"b": hashB}) + sub.UpdateResourceSubscriptions([]string{"a", "b"}, nil) w := make(chan DeltaResponse, 1) - _, err = c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state, w) + _, err = c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &sub, w) require.NoError(t, err) checkDeltaWatchCount(t, c, 0) verifyDeltaResponse(t, w, []resourceInfo{{"a", hashA}}, nil) // b is up to date and shouldn't be returned - state = stream.NewSubscriptionState(false, map[string]string{"a": hashA, "b": hashB}) - state.SetSubscribedResources(map[string]struct{}{"a": {}, "b": {}}) + sub = stream.NewSubscription(false, map[string]string{"a": hashA, "b": hashB}) + sub.UpdateResourceSubscriptions([]string{"a", "b"}, nil) w = make(chan DeltaResponse, 1) - _, err = c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state, w) + _, err = c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &sub, w) require.NoError(t, err) mustBlockDelta(t, w) checkDeltaWatchCount(t, c, 1) @@ -586,19 +586,19 @@ func TestLinearDeltaResourceUpdate(t *testing.T) { // There is currently no delta watch checkVersionMapNotSet(t, c) - state := stream.NewSubscriptionState(false, nil) - state.SetSubscribedResources(map[string]struct{}{"a": {}, "b": {}}) + sub := stream.NewSubscription(false, nil) + sub.UpdateResourceSubscriptions([]string{"a", "b"}, nil) w := make(chan DeltaResponse, 1) - _, err = c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state, w) + _, err = c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &sub, w) require.NoError(t, err) checkDeltaWatchCount(t, c, 0) verifyDeltaResponse(t, w, []resourceInfo{{"b", hashB}, {"a", hashA}}, nil) checkVersionMapSet(t, c) - state = stream.NewSubscriptionState(false, map[string]string{"a": hashA, "b": hashB}) - state.SetSubscribedResources(map[string]struct{}{"a": {}, "b": {}}) + sub = stream.NewSubscription(false, map[string]string{"a": hashA, "b": hashB}) + sub.UpdateResourceSubscriptions([]string{"a", "b"}, nil) w = make(chan DeltaResponse, 1) - _, err = c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state, w) + _, err = c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &sub, w) require.NoError(t, err) mustBlockDelta(t, w) checkDeltaWatchCount(t, c, 1) @@ -624,18 +624,18 @@ func TestLinearDeltaResourceDelete(t *testing.T) { err = c.UpdateResource("b", b) require.NoError(t, err) - state := stream.NewSubscriptionState(false, nil) - state.SetSubscribedResources(map[string]struct{}{"a": {}, "b": {}}) + sub := stream.NewSubscription(false, nil) + sub.UpdateResourceSubscriptions([]string{"a", "b"}, nil) w := make(chan DeltaResponse, 1) - _, err = c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state, w) + _, err = c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &sub, w) require.NoError(t, err) checkDeltaWatchCount(t, c, 0) verifyDeltaResponse(t, w, []resourceInfo{{"b", hashB}, {"a", hashA}}, nil) - state = stream.NewSubscriptionState(false, map[string]string{"a": hashA, "b": hashB}) - state.SetSubscribedResources(map[string]struct{}{"a": {}, "b": {}}) + sub = stream.NewSubscription(false, map[string]string{"a": hashA, "b": hashB}) + sub.UpdateResourceSubscriptions([]string{"a", "b"}, nil) w = make(chan DeltaResponse, 1) - _, err = c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state, w) + _, err = c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &sub, w) require.NoError(t, err) mustBlockDelta(t, w) checkDeltaWatchCount(t, c, 1) @@ -651,14 +651,14 @@ func TestLinearDeltaResourceDelete(t *testing.T) { func TestLinearDeltaMultiResourceUpdates(t *testing.T) { c := NewLinearCache(testType) - state := stream.NewSubscriptionState(false, nil) - state.SetSubscribedResources(map[string]struct{}{"a": {}, "b": {}}) + sub := stream.NewSubscription(false, nil) + sub.UpdateResourceSubscriptions([]string{"a", "b"}, nil) w := make(chan DeltaResponse, 1) checkVersionMapNotSet(t, c) assert.Equal(t, 0, c.NumResources()) // Initial update - _, err := c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state, w) + _, err := c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &sub, w) require.NoError(t, err) mustBlockDelta(t, w) checkDeltaWatchCount(t, c, 1) @@ -674,10 +674,10 @@ func TestLinearDeltaMultiResourceUpdates(t *testing.T) { validateDeltaResponse(t, resp, []resourceInfo{{"a", hashA}, {"b", hashB}}, nil) checkVersionMapSet(t, c) assert.Equal(t, 2, c.NumResources()) - state.SetACKedResources(resp.GetNextVersionMap()) + sub.SetReturnedResources(resp.GetNextVersionMap()) // Multiple updates - _, err = c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state, w) + _, err = c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &sub, w) require.NoError(t, err) mustBlockDelta(t, w) checkDeltaWatchCount(t, c, 1) @@ -695,10 +695,10 @@ func TestLinearDeltaMultiResourceUpdates(t *testing.T) { validateDeltaResponse(t, resp, []resourceInfo{{"a", hashA}, {"b", hashB}}, nil) checkVersionMapSet(t, c) assert.Equal(t, 2, c.NumResources()) - state.SetACKedResources(resp.GetNextVersionMap()) + sub.SetReturnedResources(resp.GetNextVersionMap()) // Update/add/delete - _, err = c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state, w) + _, err = c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &sub, w) require.NoError(t, err) mustBlockDelta(t, w) checkDeltaWatchCount(t, c, 1) @@ -715,10 +715,10 @@ func TestLinearDeltaMultiResourceUpdates(t *testing.T) { validateDeltaResponse(t, resp, []resourceInfo{{"a", hashA}}, []string{"b"}) checkVersionMapSet(t, c) assert.Equal(t, 2, c.NumResources()) - state.SetACKedResources(resp.GetNextVersionMap()) + sub.SetReturnedResources(resp.GetNextVersionMap()) // Re-add previously deleted watched resource - _, err = c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state, w) + _, err = c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &sub, w) require.NoError(t, err) mustBlockDelta(t, w) checkDeltaWatchCount(t, c, 1) @@ -732,7 +732,7 @@ func TestLinearDeltaMultiResourceUpdates(t *testing.T) { validateDeltaResponse(t, resp, []resourceInfo{{"b", hashB}}, nil) // d is not watched and should not be returned checkVersionMapSet(t, c) assert.Equal(t, 2, c.NumResources()) - state.SetACKedResources(resp.GetNextVersionMap()) + sub.SetReturnedResources(resp.GetNextVersionMap()) // Wildcard create/update require.NoError(t, createWildcardDeltaWatch(c, w)) @@ -780,9 +780,9 @@ func TestLinearMixedWatches(t *testing.T) { require.NoError(t, err) assert.Equal(t, 2, c.NumResources()) - sotwState := stream.NewSubscriptionState(false, nil) + sotwSub := stream.NewSubscription(false, nil) w := make(chan Response, 1) - _, err = c.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: c.getVersion()}, &sotwState, w) + _, err = c.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: c.getVersion()}, &sotwSub, w) require.NoError(t, err) mustBlock(t, w) checkVersionMapNotSet(t, c) @@ -797,17 +797,17 @@ func TestLinearMixedWatches(t *testing.T) { verifyResponse(t, w, c.getVersion(), 1) checkVersionMapNotSet(t, c) - _, err = c.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: c.getVersion()}, &sotwState, w) + _, err = c.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: c.getVersion()}, &sotwSub, w) require.NoError(t, err) mustBlock(t, w) checkVersionMapNotSet(t, c) - deltaState := stream.NewSubscriptionState(false, map[string]string{"a": hashA, "b": hashB}) - deltaState.SetSubscribedResources(map[string]struct{}{"a": {}, "b": {}}) + deltaSub := stream.NewSubscription(false, map[string]string{"a": hashA, "b": hashB}) + deltaSub.UpdateResourceSubscriptions([]string{"a", "b"}, nil) wd := make(chan DeltaResponse, 1) // Initial update - _, err = c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &deltaState, wd) + _, err = c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &deltaSub, wd) require.NoError(t, err) mustBlockDelta(t, wd) checkDeltaWatchCount(t, c, 1) diff --git a/pkg/cache/v3/mux.go b/pkg/cache/v3/mux.go index d4bb5791a9..2a7a84a49d 100644 --- a/pkg/cache/v3/mux.go +++ b/pkg/cache/v3/mux.go @@ -36,24 +36,22 @@ type MuxCache struct { var _ Cache = &MuxCache{} -func (mux *MuxCache) CreateWatch(request *Request, state SubscriptionState, value chan Response) (func(), error) { +func (mux *MuxCache) CreateWatch(request *Request, sub Subscription, value chan Response) (func(), error) { key := mux.Classify(request) cache, exists := mux.Caches[key] if !exists { - value <- nil return nil, fmt.Errorf("no cache defined for key %s", key) } - return cache.CreateWatch(request, state, value) + return cache.CreateWatch(request, sub, value) } -func (mux *MuxCache) CreateDeltaWatch(request *DeltaRequest, state SubscriptionState, value chan DeltaResponse) (func(), error) { +func (mux *MuxCache) CreateDeltaWatch(request *DeltaRequest, sub Subscription, value chan DeltaResponse) (func(), error) { key := mux.ClassifyDelta(request) cache, exists := mux.Caches[key] if !exists { - value <- nil return nil, fmt.Errorf("no cache defined for key %s", key) } - return cache.CreateDeltaWatch(request, state, value) + return cache.CreateDeltaWatch(request, sub, value) } func (mux *MuxCache) Fetch(context.Context, *Request) (Response, error) { diff --git a/pkg/cache/v3/simple.go b/pkg/cache/v3/simple.go index d2c0d9a88f..b609d19f51 100644 --- a/pkg/cache/v3/simple.go +++ b/pkg/cache/v3/simple.go @@ -309,7 +309,7 @@ func (cache *snapshotCache) respondDeltaWatches(ctx context.Context, info *statu snapshot, watch.Request, watch.Response, - watch.subscriptionState, + watch.subscription, ) if err != nil { return err @@ -327,7 +327,7 @@ func (cache *snapshotCache) respondDeltaWatches(ctx context.Context, info *statu snapshot, watch.Request, watch.Response, - watch.subscriptionState, + watch.subscription, ) if err != nil { return err @@ -384,7 +384,7 @@ func superset(names map[string]bool, resources map[string]types.ResourceWithTTL) // CreateWatch returns a watch for an xDS request. A nil function may be // returned if an error occurs. -func (cache *snapshotCache) CreateWatch(request *Request, clientState SubscriptionState, value chan Response) (func(), error) { +func (cache *snapshotCache) CreateWatch(request *Request, sub Subscription, value chan Response) (func(), error) { nodeID := cache.hash.ID(request.GetNode()) cache.mu.Lock() @@ -408,7 +408,7 @@ func (cache *snapshotCache) CreateWatch(request *Request, clientState Subscripti } if exists { - knownResourceNames := clientState.GetACKedResources() + knownResourceNames := sub.ReturnedResources() diff := []string{} for _, r := range request.GetResourceNames() { if _, ok := knownResourceNames[r]; !ok { @@ -524,7 +524,7 @@ func createResponse(ctx context.Context, request *Request, resources map[string] } // CreateDeltaWatch returns a watch for a delta xDS request which implements the Simple SnapshotCache. -func (cache *snapshotCache) CreateDeltaWatch(request *DeltaRequest, clientState SubscriptionState, value chan DeltaResponse) (func(), error) { +func (cache *snapshotCache) CreateDeltaWatch(request *DeltaRequest, sub Subscription, value chan DeltaResponse) (func(), error) { nodeID := cache.hash.ID(request.GetNode()) t := request.GetTypeUrl() @@ -553,7 +553,7 @@ func (cache *snapshotCache) CreateDeltaWatch(request *DeltaRequest, clientState if err != nil { cache.log.Errorf("failed to compute version for snapshot resources inline: %s", err) } - response, err := cache.respondDelta(context.Background(), snapshot, request, value, clientState) + response, err := cache.respondDelta(context.Background(), snapshot, request, value, sub) if err != nil { cache.log.Errorf("failed to respond with delta response: %s", err) } @@ -565,12 +565,12 @@ func (cache *snapshotCache) CreateDeltaWatch(request *DeltaRequest, clientState watchID := cache.nextDeltaWatchID() if exists { - cache.log.Infof("open delta watch ID:%d for %s Resources:%v from nodeID: %q, version %q", watchID, t, clientState.GetSubscribedResources(), nodeID, snapshot.GetVersion(t)) + cache.log.Infof("open delta watch ID:%d for %s Resources:%v from nodeID: %q, version %q", watchID, t, sub.SubscribedResources(), nodeID, snapshot.GetVersion(t)) } else { - cache.log.Infof("open delta watch ID:%d for %s Resources:%v from nodeID: %q", watchID, t, clientState.GetSubscribedResources(), nodeID) + cache.log.Infof("open delta watch ID:%d for %s Resources:%v from nodeID: %q", watchID, t, sub.SubscribedResources(), nodeID) } - info.setDeltaResponseWatch(watchID, DeltaResponseWatch{Request: request, Response: value, subscriptionState: clientState}) + info.setDeltaResponseWatch(watchID, DeltaResponseWatch{Request: request, Response: value, subscription: sub}) return cache.cancelDeltaWatch(nodeID, watchID), nil } @@ -578,20 +578,20 @@ func (cache *snapshotCache) CreateDeltaWatch(request *DeltaRequest, clientState } // Respond to a delta watch with the provided snapshot value. If the response is nil, there has been no state change. -func (cache *snapshotCache) respondDelta(ctx context.Context, snapshot ResourceSnapshot, request *DeltaRequest, value chan DeltaResponse, clientState SubscriptionState) (*RawDeltaResponse, error) { - resp := createDeltaResponse(ctx, request, clientState, resourceContainer{ +func (cache *snapshotCache) respondDelta(ctx context.Context, snapshot ResourceSnapshot, request *DeltaRequest, value chan DeltaResponse, sub Subscription) (*RawDeltaResponse, error) { + resp := createDeltaResponse(ctx, request, sub, resourceContainer{ resourceMap: snapshot.GetResources(request.GetTypeUrl()), versionMap: snapshot.GetVersionMap(request.GetTypeUrl()), systemVersion: snapshot.GetVersion(request.GetTypeUrl()), }) // Only send a response if there were changes - // We want to respond immediately for the first request in a stream if it is wildcard, even if the response is empty + // We want to respond immediately for the first request in a subscription if it is wildcard, even if the response is empty // otherwise, envoy won't complete initialization - if len(resp.Resources) > 0 || len(resp.RemovedResources) > 0 || (clientState.IsWildcard() && request.ResponseNonce == "") { + if len(resp.Resources) > 0 || len(resp.RemovedResources) > 0 || (sub.IsWildcard() && request.ResponseNonce == "") { if cache.log != nil { cache.log.Debugf("node: %s, sending delta response for typeURL %s with resources: %v removed resources: %v with wildcard: %t", - request.GetNode().GetId(), request.GetTypeUrl(), GetResourceNames(resp.Resources), resp.RemovedResources, clientState.IsWildcard()) + request.GetNode().GetId(), request.GetTypeUrl(), GetResourceNames(resp.Resources), resp.RemovedResources, sub.IsWildcard()) } select { case value <- resp: diff --git a/pkg/cache/v3/simple_test.go b/pkg/cache/v3/simple_test.go index 8b29f9cdbd..f2c961d1b2 100644 --- a/pkg/cache/v3/simple_test.go +++ b/pkg/cache/v3/simple_test.go @@ -131,13 +131,13 @@ func TestSnapshotCacheWithTTL(t *testing.T) { wg := sync.WaitGroup{} // All the resources should respond immediately when version is not up to date. - streamState := stream.NewSubscriptionState(false, map[string]string{}) + sub := stream.NewSubscription(false, map[string]string{}) for _, typ := range testTypes { wg.Add(1) t.Run(typ, func(t *testing.T) { defer wg.Done() value := make(chan cache.Response, 1) - _, err := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}, streamState, value) + _, err := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}, sub, value) require.NoError(t, err) select { case out := <-value: @@ -147,9 +147,9 @@ func TestSnapshotCacheWithTTL(t *testing.T) { if !reflect.DeepEqual(cache.IndexResourcesByName(out.(*cache.RawResponse).Resources), snapshotWithTTL.GetResourcesAndTTL(typ)) { t.Errorf("get resources %v, want %v", out.(*cache.RawResponse).Resources, snapshotWithTTL.GetResourcesAndTTL(typ)) } - // Update streamState + // Update sub to track what was returned for _, resource := range out.GetRequest().GetResourceNames() { - streamState.GetACKedResources()[resource] = fixture.version + sub.ReturnedResources()[resource] = fixture.version } case <-time.After(2 * time.Second): t.Errorf("failed to receive snapshot response") @@ -170,7 +170,7 @@ func TestSnapshotCacheWithTTL(t *testing.T) { for { value := make(chan cache.Response, 1) cancel, err := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ], VersionInfo: fixture.version}, - streamState, value) + sub, value) require.NoError(t, err) select { @@ -189,7 +189,7 @@ func TestSnapshotCacheWithTTL(t *testing.T) { updatesByType[typ]++ for _, resource := range out.GetRequest().GetResourceNames() { - streamState.GetACKedResources()[resource] = fixture.version + sub.ReturnedResources()[resource] = fixture.version } case <-end: cancel() @@ -232,9 +232,9 @@ func TestSnapshotCache(t *testing.T) { // try to get endpoints with incorrect list of names // should not receive response value := make(chan cache.Response, 1) - streamState := stream.NewSubscriptionState(false, map[string]string{}) + sub := stream.NewSubscription(false, map[string]string{}) _, err = c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: rsrc.EndpointType, ResourceNames: []string{"none"}}, - streamState, value) + sub, value) require.NoError(t, err) select { @@ -246,9 +246,9 @@ func TestSnapshotCache(t *testing.T) { for _, typ := range testTypes { t.Run(typ, func(t *testing.T) { value := make(chan cache.Response, 1) - streamState := stream.NewSubscriptionState(false, map[string]string{}) + sub := stream.NewSubscription(false, map[string]string{}) _, err := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}, - streamState, value) + sub, value) require.NoError(t, err) select { case out := <-value: @@ -300,10 +300,10 @@ func TestSnapshotCacheFetch(t *testing.T) { func TestSnapshotCacheWatch(t *testing.T) { c := cache.NewSnapshotCache(true, group{}, logger{t: t}) watches := make(map[string]chan cache.Response) - streamState := stream.NewSubscriptionState(false, map[string]string{}) + sub := stream.NewSubscription(false, map[string]string{}) for _, typ := range testTypes { watches[typ] = make(chan cache.Response, 1) - _, err := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}, streamState, watches[typ]) + _, err := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}, sub, watches[typ]) require.NoError(t, err) } if err := c.SetSnapshot(context.Background(), key, fixture.snapshot()); err != nil { @@ -321,7 +321,7 @@ func TestSnapshotCacheWatch(t *testing.T) { t.Errorf("get resources %v, want %v", out.(*cache.RawResponse).Resources, snapshot.GetResourcesAndTTL(typ)) } for _, resource := range out.GetRequest().GetResourceNames() { - streamState.GetACKedResources()[resource] = fixture.version + sub.ReturnedResources()[resource] = fixture.version } case <-time.After(time.Second): t.Fatal("failed to receive snapshot response") @@ -333,7 +333,7 @@ func TestSnapshotCacheWatch(t *testing.T) { for _, typ := range testTypes { watches[typ] = make(chan cache.Response, 1) _, err := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ], VersionInfo: fixture.version}, - streamState, watches[typ]) + sub, watches[typ]) require.NoError(t, err) } if count := c.GetStatusInfo(key).GetNumWatches(); count != len(testTypes) { @@ -379,11 +379,11 @@ func TestConcurrentSetWatch(t *testing.T) { t.Fatalf("failed to set snapshot %q: %s", id, err) } } else { - streamState := stream.NewSubscriptionState(false, map[string]string{}) + sub := stream.NewSubscription(false, map[string]string{}) cancel, err := c.CreateWatch(&discovery.DiscoveryRequest{ Node: &core.Node{Id: id}, TypeUrl: rsrc.EndpointType, - }, streamState, value) + }, sub, value) require.NoError(t, err) defer cancel() @@ -394,10 +394,10 @@ func TestConcurrentSetWatch(t *testing.T) { func TestSnapshotCacheWatchCancel(t *testing.T) { c := cache.NewSnapshotCache(true, group{}, logger{t: t}) - streamState := stream.NewSubscriptionState(false, map[string]string{}) + sub := stream.NewSubscription(false, map[string]string{}) for _, typ := range testTypes { value := make(chan cache.Response, 1) - cancel, err := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}, streamState, value) + cancel, err := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}, sub, value) require.NoError(t, err) cancel() } @@ -422,9 +422,9 @@ func TestSnapshotCacheWatchTimeout(t *testing.T) { // Create a non-buffered channel that will block sends. watchCh := make(chan cache.Response) - streamState := stream.NewSubscriptionState(false, map[string]string{}) + sub := stream.NewSubscription(false, map[string]string{}) _, err := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: rsrc.EndpointType, ResourceNames: names[rsrc.EndpointType]}, - streamState, watchCh) + sub, watchCh) require.NoError(t, err) // The first time we set the snapshot without consuming from the blocking channel, so this should time out. @@ -479,9 +479,9 @@ func TestSnapshotCreateWatchWithResourcePreviouslyNotRequested(t *testing.T) { // Request resource with name=ClusterName go func() { - state := stream.NewSubscriptionState(false, map[string]string{}) + sub := stream.NewSubscription(false, map[string]string{}) _, err := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: rsrc.EndpointType, ResourceNames: []string{clusterName}}, - &state, watch) + &sub, watch) require.NoError(t, err) }() @@ -500,10 +500,10 @@ func TestSnapshotCreateWatchWithResourcePreviouslyNotRequested(t *testing.T) { // Request additional resource with name=clusterName2 for same version go func() { - state := stream.NewSubscriptionState(false, map[string]string{}) - state.SetACKedResources(map[string]string{clusterName: fixture.version}) + sub := stream.NewSubscription(false, map[string]string{}) + sub.SetReturnedResources(map[string]string{clusterName: fixture.version}) _, err := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: rsrc.EndpointType, VersionInfo: fixture.version, - ResourceNames: []string{clusterName, clusterName2}}, &state, watch) + ResourceNames: []string{clusterName, clusterName2}}, &sub, watch) require.NoError(t, err) }() @@ -520,10 +520,10 @@ func TestSnapshotCreateWatchWithResourcePreviouslyNotRequested(t *testing.T) { } // Repeat request for with same version and make sure a watch is created - state := stream.NewSubscriptionState(false, map[string]string{}) - state.SetACKedResources(map[string]string{clusterName: fixture.version, clusterName2: fixture.version}) + sub := stream.NewSubscription(false, map[string]string{}) + sub.SetReturnedResources(map[string]string{clusterName: fixture.version, clusterName2: fixture.version}) if cancel, err := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: rsrc.EndpointType, VersionInfo: fixture.version, - ResourceNames: []string{clusterName, clusterName2}}, &state, watch); cancel == nil { + ResourceNames: []string{clusterName, clusterName2}}, &sub, watch); cancel == nil { t.Fatal("Should create a watch") } else { require.NoError(t, err) @@ -653,9 +653,9 @@ func TestAvertPanicForWatchOnNonExistentSnapshot(t *testing.T) { ResourceNames: []string{"rtds"}, TypeUrl: rsrc.RuntimeType, } - ss := stream.NewSubscriptionState(false, map[string]string{"cluster": "abcdef"}) + sub := stream.NewSubscription(false, map[string]string{"cluster": "abcdef"}) responder := make(chan cache.Response) - _, err := c.CreateWatch(req, &ss, responder) + _, err := c.CreateWatch(req, &sub, responder) require.NoError(t, err) go func() { diff --git a/pkg/cache/v3/status.go b/pkg/cache/v3/status.go index 7086d9568a..18465f6dc5 100644 --- a/pkg/cache/v3/status.go +++ b/pkg/cache/v3/status.go @@ -100,8 +100,8 @@ type DeltaResponseWatch struct { // Response is the channel to push the delta responses to Response chan DeltaResponse - // VersionMap for the stream - subscriptionState SubscriptionState + // Subscription stores the current client subscription state. + subscription Subscription } // newStatusInfo initializes a status info data structure. diff --git a/pkg/server/config/config.go b/pkg/server/config/config.go index b746acfab9..8dd31a2923 100644 --- a/pkg/server/config/config.go +++ b/pkg/server/config/config.go @@ -1,15 +1,20 @@ package config +import "github.com/envoyproxy/go-control-plane/pkg/log" + // Opts for individual xDS implementations that can be // utilized through the functional opts pattern. type Opts struct { // If true respond to ADS requests with a guaranteed resource ordering Ordered bool + + Logger log.Logger } func NewOpts() Opts { return Opts{ Ordered: false, + Logger: log.NewDefaultLogger(), } } diff --git a/pkg/server/delta/v3/server.go b/pkg/server/delta/v3/server.go index 89a85eba02..1cb70159e6 100644 --- a/pkg/server/delta/v3/server.go +++ b/pkg/server/delta/v3/server.go @@ -12,6 +12,7 @@ import ( core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" "github.com/envoyproxy/go-control-plane/pkg/cache/v3" + "github.com/envoyproxy/go-control-plane/pkg/log" "github.com/envoyproxy/go-control-plane/pkg/resource/v3" "github.com/envoyproxy/go-control-plane/pkg/server/config" "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" @@ -49,6 +50,13 @@ type server struct { opts config.Opts } +// WithLogger configures the server logger. Defaults to no logging +func WithLogger(logger log.Logger) config.XDSOption { + return func(o *config.Opts) { + o.Logger = logger + } +} + // NewServer creates a delta xDS specific server which utilizes a ConfigWatcher and delta Callbacks. func NewServer(ctx context.Context, config cache.ConfigWatcher, callbacks Callbacks, opts ...config.XDSOption) Server { s := &server{ @@ -118,7 +126,7 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De watch := watches.deltaWatches[typ] watch.nonce = nonce - watch.state.SetACKedResources(resp.GetNextVersionMap()) + watch.subscription.SetReturnedResources(resp.GetNextVersionMap()) watches.deltaWatches[typ] = watch return nil } @@ -204,22 +212,21 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De // cancel existing watch to (re-)request a newer version watch, ok := watches.deltaWatches[typeURL] if !ok { - // Initialize the state of the stream. - // Since there was no previous state, we know we're handling the first request of this type + // Initialize the state of the type subscription. + // Since there was no previous subscription, we know we're handling the first request of this type // so we set the initial resource versions if we have any. - // We also set the stream as wildcard based on its legacy meaning (no resource name sent in resource_names_subscribe). - // If the state starts with this legacy mode, adding new resources will not unsubscribe from wildcard. + // We also set the subscription as wildcard based on its legacy meaning (no resource name sent in resource_names_subscribe). + // If the subscription starts with this legacy mode, adding new resources will not unsubscribe from wildcard. // It can still be done by explicitly unsubscribing from "*" - watch.state = stream.NewSubscriptionState(len(req.GetResourceNamesSubscribe()) == 0, req.GetInitialResourceVersions()) + watch.subscription = stream.NewSubscription(len(req.GetResourceNamesSubscribe()) == 0, req.GetInitialResourceVersions()) } else { watch.Cancel() } - s.subscribe(req.GetResourceNamesSubscribe(), &watch.state) - s.unsubscribe(req.GetResourceNamesUnsubscribe(), &watch.state) + watch.subscription.UpdateResourceSubscriptions(req.GetResourceNamesSubscribe(), req.GetResourceNamesUnsubscribe()) var err error - watch.cancel, err = s.cache.CreateDeltaWatch(req, watch.state, watches.deltaMuxedResponses) + watch.cancel, err = s.cache.CreateDeltaWatch(req, watch.subscription, watches.deltaMuxedResponses) if err != nil { return err } @@ -253,40 +260,3 @@ func (s *server) DeltaStreamHandler(str stream.DeltaStream, typeURL string) erro return s.processDelta(str, reqCh, typeURL) } - -// When we subscribe, we just want to make the cache know we are subscribing to a resource. -// Even if the stream is wildcard, we keep the list of explicitly subscribed resources as the wildcard subscription can be discarded later on. -func (s *server) subscribe(resources []string, streamState *stream.SubscriptionState) { - sv := streamState.GetSubscribedResources() - for _, resource := range resources { - if resource == "*" { - streamState.SetWildcard(true) - continue - } - sv[resource] = struct{}{} - } -} - -// Unsubscriptions remove resources from the stream's subscribed resource list. -// If a client explicitly unsubscribes from a wildcard request, the stream is updated and now watches only subscribed resources. -func (s *server) unsubscribe(resources []string, streamState *stream.SubscriptionState) { - sv := streamState.GetSubscribedResources() - for _, resource := range resources { - if resource == "*" { - streamState.SetWildcard(false) - continue - } - if _, ok := sv[resource]; ok && streamState.IsWildcard() { - // The XDS protocol states that: - // * if a watch is currently wildcard - // * a resource is explicitly unsubscribed by name - // Then the control-plane must return in the response whether the resource is removed (if no longer present for this node) - // or still existing. In the latter case the entire resource must be returned, same as if it had been created or updated - // To achieve that, we mark the resource as having been returned with an empty version. While creating the response, the cache will either: - // * detect the version change, and return the resource (as an update) - // * detect the resource deletion, and set it as removed in the response - streamState.GetACKedResources()[resource] = "" - } - delete(sv, resource) - } -} diff --git a/pkg/server/delta/v3/watches.go b/pkg/server/delta/v3/watches.go index 2cced953e2..98e4991b49 100644 --- a/pkg/server/delta/v3/watches.go +++ b/pkg/server/delta/v3/watches.go @@ -41,7 +41,7 @@ type watch struct { cancel func() nonce string - state stream.SubscriptionState + subscription stream.Subscription } // Cancel calls terminate and cancel diff --git a/pkg/server/sotw/v3/ads.go b/pkg/server/sotw/v3/ads.go index 98903d201b..8139319f5b 100644 --- a/pkg/server/sotw/v3/ads.go +++ b/pkg/server/sotw/v3/ads.go @@ -13,24 +13,8 @@ import ( // process handles a bi-di stream request func (s *server) processADS(sw *streamWrapper, reqCh chan *discovery.DiscoveryRequest, defaultTypeURL string) error { - // We make a responder channel here so we can multiplex responses from the dynamic channels. - sw.watches.addWatch(resource.AnyType, &watch{ - // Create a buffered channel the size of the known resource types. - response: make(chan cache.Response, types.UnknownType), - cancel: func() { - close(sw.watches.responders[resource.AnyType].response) - }, - }) - - process := func(resp cache.Response) error { - nonce, err := sw.send(resp) - if err != nil { - return err - } - - sw.watches.responders[resp.GetRequest().GetTypeUrl()].nonce = nonce - return nil - } + // Create a buffered channel the size of the known resource types. + respChan := make(chan cache.Response, types.UnknownType) // Instead of creating a separate channel for each incoming request and abandoning the old one // This algorithm uses (and reuses) a single channel for all request types and guarantees @@ -43,9 +27,9 @@ func (s *server) processADS(sw *streamWrapper, reqCh chan *discovery.DiscoveryRe for { select { // We watch the multiplexed ADS channel for incoming responses. - case res := <-sw.watches.responders[resource.AnyType].response: + case res := <-respChan: if res.GetRequest().GetTypeUrl() != typeURL { - if err := process(res); err != nil { + if err := sw.send(res); err != nil { return err } } @@ -63,9 +47,8 @@ func (s *server) processADS(sw *streamWrapper, reqCh chan *discovery.DiscoveryRe select { case <-s.ctx.Done(): return nil - // We only watch the multiplexed channel since all values will come through from process. - case res := <-sw.watches.responders[resource.AnyType].response: - if err := process(res); err != nil { + case res := <-respChan: + if err := sw.send(res); err != nil { return status.Errorf(codes.Unavailable, err.Error()) } case req, ok := <-reqCh: @@ -86,9 +69,6 @@ func (s *server) processADS(sw *streamWrapper, reqCh chan *discovery.DiscoveryRe req.Node = sw.node } - // Nonces can be reused across streams; we verify nonce only if nonce is not initialized. - nonce := req.GetResponseNonce() - // type URL is required for ADS but is implicit for xDS if defaultTypeURL == resource.AnyType { if req.GetTypeUrl() == "" { @@ -103,61 +83,46 @@ func (s *server) processADS(sw *streamWrapper, reqCh chan *discovery.DiscoveryRe } typeURL := req.GetTypeUrl() - streamState, ok := sw.streamState[typeURL] - if !ok { - // Supports legacy wildcard mode - // Wildcard will be set to true if no resource is set - streamState = stream.NewSubscriptionState(len(req.ResourceNames) == 0, nil) - } - - // ToDo: track ACK through subscription state - if lastResponse, ok := sw.lastDiscoveryResponses[typeURL]; ok { - if lastResponse.nonce == "" || lastResponse.nonce == nonce { - // Let's record Resource names that a client has received. - streamState.SetACKedResources(lastResponse.resources) + var subscription stream.Subscription + w, ok := sw.watches.responders[typeURL] + if ok { + if w.nonce != "" && req.GetResponseNonce() != w.nonce { + // The request does not match the stream nonce, ignore it as per + // https://www.envoyproxy.io/docs/envoy/v1.28.0/api-docs/xds_protocol#resource-updates + // Ignore this request and wait for the next one + // This behavior is being discussed in https://github.com/envoyproxy/envoy/issues/10363 + // as it might create a race in edge cases, but it matches the current protocol defintion + s.opts.Logger.Debugf("[sotw ads] Skipping request as nonce is stale for %s", typeURL) + break } - } - - updateSubscriptionResources(req, &streamState) - // Use the multiplexed channel for new watches. - responder := sw.watches.responders[resource.AnyType].response - if w, ok := sw.watches.responders[typeURL]; ok { - // We've found a pre-existing watch, lets check and update if needed. - // If these requirements aren't satisfied, leave an open watch. - if w.nonce == "" || w.nonce == nonce { - w.close() + // We found an existing watch + // Close it to ensure the Cache will not reply to it while we modify the subscription state + w.close() - // Only process if we have an existing watch otherwise go ahead and create. - if err := processAllExcept(typeURL); err != nil { - return err - } - - cancel, err := s.cache.CreateWatch(req, streamState, responder) - if err != nil { - return err - } - - sw.watches.addWatch(typeURL, &watch{ - cancel: cancel, - response: responder, - }) - } - } else { - // No pre-existing watch exists, let's create one. - // We need to precompute the watches first then open a watch in the cache. - cancel, err := s.cache.CreateWatch(req, streamState, responder) - if err != nil { + // Only process if we have an existing watch otherwise go ahead and create. + if err := processAllExcept(typeURL); err != nil { return err } - sw.watches.addWatch(typeURL, &watch{ - cancel: cancel, - response: responder, - }) + subscription = w.sub + } else { + s.opts.Logger.Debugf("[sotw ads] New subscription for type %s and stream %d", typeURL, sw.ID) + subscription = stream.NewSubscription(len(req.ResourceNames) == 0, nil) + } + + subscription.SetResourceSubscription(req.GetResourceNames()) + + cancel, err := s.cache.CreateWatch(req, subscription, respChan) + if err != nil { + return err } - sw.streamState[req.TypeUrl] = streamState + sw.watches.addWatch(typeURL, &watch{ + cancel: cancel, + response: respChan, + sub: subscription, + }) } } } diff --git a/pkg/server/sotw/v3/server.go b/pkg/server/sotw/v3/server.go index 8144e46d41..173c3f9a8b 100644 --- a/pkg/server/sotw/v3/server.go +++ b/pkg/server/sotw/v3/server.go @@ -18,12 +18,14 @@ package sotw import ( "context" "errors" + "fmt" "strconv" "sync/atomic" core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" "github.com/envoyproxy/go-control-plane/pkg/cache/v3" + "github.com/envoyproxy/go-control-plane/pkg/log" "github.com/envoyproxy/go-control-plane/pkg/server/config" "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" ) @@ -65,6 +67,13 @@ func WithOrderedADS() config.XDSOption { } } +// WithLogger configures the server logger. Defaults to no logging +func WithLogger(logger log.Logger) config.XDSOption { + return func(o *config.Opts) { + o.Logger = logger + } +} + type server struct { cache cache.ConfigWatcher callbacks Callbacks @@ -88,23 +97,18 @@ type streamWrapper struct { callbacks Callbacks // callbacks for performing actions through stream lifecycle node *core.Node // registered xDS client - - // The below fields are used for tracking resource - // cache state and should be maintained per stream. - streamState map[string]stream.SubscriptionState - lastDiscoveryResponses map[string]lastDiscoveryResponse } // Send packages the necessary resources before sending on the gRPC stream, // and sets the current state of the world. -func (s *streamWrapper) send(resp cache.Response) (string, error) { +func (s *streamWrapper) send(resp cache.Response) error { if resp == nil { - return "", errors.New("missing response") + return errors.New("missing response") } out, err := resp.GetDiscoveryResponse() if err != nil { - return "", err + return err } // increment nonce and convert it to base10 @@ -112,24 +116,28 @@ func (s *streamWrapper) send(resp cache.Response) (string, error) { version, err := resp.GetVersion() if err != nil { - return "", err + return err } - lastResponse := lastDiscoveryResponse{ - nonce: out.GetNonce(), - resources: make(map[string]string), + w, ok := s.watches.responders[resp.GetRequest().GetTypeUrl()] + if !ok { + return fmt.Errorf("no current watch for %s", resp.GetRequest().GetTypeUrl()) } + + w.nonce = out.Nonce + // ToDo(valerian-roche): properly return the resources actually sent to the client + resources := make(map[string]string, len(resp.GetRequest().GetResourceNames())) for _, r := range resp.GetRequest().GetResourceNames() { - lastResponse.resources[r] = version + resources[r] = version } - s.lastDiscoveryResponses[resp.GetRequest().GetTypeUrl()] = lastResponse + w.sub.SetReturnedResources(resources) // Register with the callbacks provided that we are sending the response. if s.callbacks != nil { s.callbacks.OnStreamResponse(resp.GetContext(), s.ID, resp.GetRequest(), out) } - return out.GetNonce(), s.stream.Send(out) + return s.stream.Send(out) } // Shutdown closes all open watches, and notifies API consumers the stream has closed. @@ -140,15 +148,6 @@ func (s *streamWrapper) shutdown() { } } -// Discovery response that is sent over GRPC stream. -// We need to record what resource names are already sent to a client -// So if the client requests a new name we can respond back -// regardless current snapshot version (even if it is not changed yet) -type lastDiscoveryResponse struct { - nonce string - resources map[string]string -} - // StreamHandler converts a blocking read call to channels and initiates stream processing func (s *server) StreamHandler(stream stream.Stream, typeURL string) error { // a channel for receiving incoming requests diff --git a/pkg/server/sotw/v3/watches.go b/pkg/server/sotw/v3/watches.go index d781f663e6..43b3bedabe 100644 --- a/pkg/server/sotw/v3/watches.go +++ b/pkg/server/sotw/v3/watches.go @@ -7,6 +7,7 @@ import ( discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" "github.com/envoyproxy/go-control-plane/pkg/cache/types" "github.com/envoyproxy/go-control-plane/pkg/cache/v3" + "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" ) // watches for all xDS resource types @@ -63,8 +64,11 @@ func (w *watches) recompute(ctx context.Context, req <-chan *discovery.Discovery // watch contains the necessary modifiable data for receiving resource responses type watch struct { cancel func() - nonce string response chan cache.Response + + sub stream.Subscription + // Nonce of the latest response sent for this type + nonce string } // close cancels an open watch diff --git a/pkg/server/sotw/v3/xds.go b/pkg/server/sotw/v3/xds.go index 87b7350be1..cea848601d 100644 --- a/pkg/server/sotw/v3/xds.go +++ b/pkg/server/sotw/v3/xds.go @@ -26,9 +26,7 @@ func (s *server) process(str stream.Stream, reqCh chan *discovery.DiscoveryReque node: &core.Node{}, // node may only be set on the first discovery request // a collection of stack allocated watches per request type. - watches: newWatches(), - streamState: make(map[string]stream.SubscriptionState), - lastDiscoveryResponses: make(map[string]lastDiscoveryResponse), + watches: newWatches(), } // cleanup once our stream has ended. @@ -40,6 +38,22 @@ func (s *server) process(str stream.Stream, reqCh chan *discovery.DiscoveryReque } } + // type URL is required for ADS but is implicit for xDS + if defaultTypeURL == resource.AnyType && s.opts.Ordered { + // When using ADS we need to order responses. + // This is guaranteed in the xDS protocol specification + // as ADS is required to be eventually consistent. + // More details can be found here if interested: + // https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#eventual-consistency-considerations + + // Trigger a different code path specifically for ADS. + // We want resource ordering so things don't get sent before they should. + // This is a blocking call and will exit the process function + // on successful completion. + s.opts.Logger.Debugf("[sotw] Switching to ordered ADS implementation for stream %d", sw.ID) + return s.processADS(&sw, reqCh, defaultTypeURL) + } + // do an initial recompute so we can load the first 2 channels: // <-reqCh // s.ctx.Done() @@ -76,88 +90,58 @@ func (s *server) process(str stream.Stream, reqCh chan *discovery.DiscoveryReque req.Node = sw.node } - // nonces can be reused across streams; we verify nonce only if nonce is not initialized - nonce := req.GetResponseNonce() - // type URL is required for ADS but is implicit for xDS if defaultTypeURL == resource.AnyType { if req.GetTypeUrl() == "" { return status.Errorf(codes.InvalidArgument, "type URL is required for ADS") } - - // When using ADS we need to order responses. - // This is guaranteed in the xDS protocol specification - // as ADS is required to be eventually consistent. - // More details can be found here if interested: - // https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#eventual-consistency-considerations - if s.opts.Ordered { - // send our first request on the stream again so it doesn't get - // lost in processing on the new control loop - // There's a risk (albeit very limited) that we'd end up handling requests in the wrong order here. - // If envoy is using ADS for endpoints, and clusters are added in short sequence, - // the following request might include a new cluster and be discarded as the previous one will be handled after. - go func() { - reqCh <- req - }() - - // Trigger a different code path specifically for ADS. - // We want resource ordering so things don't get sent before they should. - // This is a blocking call and will exit the process function - // on successful completion. - return s.processADS(&sw, reqCh, defaultTypeURL) - } } else if req.GetTypeUrl() == "" { req.TypeUrl = defaultTypeURL } - streamState := sw.streamState[req.TypeUrl] - if s.callbacks != nil { if err := s.callbacks.OnStreamRequest(sw.ID, req); err != nil { return err } } - if lastResponse, ok := sw.lastDiscoveryResponses[req.GetTypeUrl()]; ok { - if lastResponse.nonce == "" || lastResponse.nonce == nonce { - // Let's record Resource names that a client has received. - streamState.SetACKedResources(lastResponse.resources) + typeURL := req.GetTypeUrl() + var subscription stream.Subscription + w, ok := sw.watches.responders[typeURL] + if ok { + if w.nonce != "" && req.GetResponseNonce() != w.nonce { + // The request does not match the stream nonce, ignore it as per + // https://www.envoyproxy.io/docs/envoy/v1.28.0/api-docs/xds_protocol#resource-updates + // Ignore this request and wait for the next one + // This behavior is being discussed in https://github.com/envoyproxy/envoy/issues/10363 + // as it might create a race in edge cases, but it matches the current protocol defintion + s.opts.Logger.Debugf("[sotw ads] Skipping request as nonce is stale for %s", typeURL) + break } - } - updateSubscriptionResources(req, &streamState) + // We found an existing watch + // Close it to ensure the Cache will not reply to it while we modify the subscription state + w.close() - typeURL := req.GetTypeUrl() - responder := make(chan cache.Response, 1) - if w, ok := sw.watches.responders[typeURL]; ok { - // We've found a pre-existing watch, lets check and update if needed. - // If these requirements aren't satisfied, leave an open watch. - if w.nonce == "" || w.nonce == nonce { - w.close() - - cancel, err := s.cache.CreateWatch(req, streamState, responder) - if err != nil { - return err - } - sw.watches.addWatch(typeURL, &watch{ - cancel: cancel, - response: responder, - }) - } + subscription = w.sub } else { - // No pre-existing watch exists, let's create one. - // We need to precompute the watches first then open a watch in the cache. - cancel, err := s.cache.CreateWatch(req, streamState, responder) - if err != nil { - return err - } - sw.watches.addWatch(typeURL, &watch{ - cancel: cancel, - response: responder, - }) + s.opts.Logger.Debugf("[sotw] New subscription for type %s and stream %d", typeURL, sw.ID) + subscription = stream.NewSubscription(len(req.ResourceNames) == 0, nil) } - sw.streamState[req.TypeUrl] = streamState + subscription.SetResourceSubscription(req.GetResourceNames()) + + responder := make(chan cache.Response, 1) + + cancel, err := s.cache.CreateWatch(req, subscription, responder) + if err != nil { + return err + } + sw.watches.addWatch(typeURL, &watch{ + cancel: cancel, + response: responder, + sub: subscription, + }) // Recompute the dynamic select cases for this stream. sw.watches.recompute(s.ctx, reqCh) @@ -169,45 +153,10 @@ func (s *server) process(str stream.Stream, reqCh chan *discovery.DiscoveryReque } res := value.Interface().(cache.Response) - nonce, err := sw.send(res) + err := sw.send(res) if err != nil { return err } - - sw.watches.responders[res.GetRequest().GetTypeUrl()].nonce = nonce } } } - -// updateSubscriptionResources provides a normalized view of resources to be used in Cache -// It is also implementing the new behavior of wildcard as described in -// https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#how-the-client-specifies-what-resources-to-return -func updateSubscriptionResources(req *discovery.DiscoveryRequest, subscriptionState *stream.SubscriptionState) { - subscribedResources := make(map[string]struct{}, len(req.ResourceNames)) - explicitWildcard := false - for _, resource := range req.ResourceNames { - if resource == "*" { - explicitWildcard = true - } else { - subscribedResources[resource] = struct{}{} - } - } - - if subscriptionState.IsWildcard() && len(req.ResourceNames) == 0 && len(subscriptionState.GetSubscribedResources()) == 0 { - // We were wildcard and no resource has been subscribed - // Legacy wildcard mode states that we remain in wildcard mode - subscriptionState.SetWildcard(true) - } else if explicitWildcard { - // Explicit subscription to wildcard - // Documentation states that we should no longer allow to fallback to the previous case - // and no longer setting wildcard would no longer subscribe to anything - // For now we ignore this case and will not support unsubscribing in this case - subscriptionState.SetWildcard(true) - } else { - // The subscription is currently not wildcard, or there are resources or have been resources subscribed to - // This is no longer the legacy wildcard case as described by the specification - subscriptionState.SetWildcard(false) - } - subscriptionState.SetSubscribedResources(subscribedResources) - -} diff --git a/pkg/server/stream/v3/subscription.go b/pkg/server/stream/v3/subscription.go index 84da9211cc..a7ecb1c572 100644 --- a/pkg/server/stream/v3/subscription.go +++ b/pkg/server/stream/v3/subscription.go @@ -1,73 +1,145 @@ package stream -// SubscriptionState stores the server view of a given type subscription in a stream. -type SubscriptionState struct { +const ( + explicitWildcard = "*" +) + +// Subscription stores the server view of a given type subscription in a stream. +type Subscription struct { // wildcard indicates if the subscription currently has a wildcard watch. wildcard bool + // allowLegacyWildcard indicates that the stream never provided any resource + // and is de facto wildcard. + // As soon as a resource or an explicit subscription to wildcard is provided, + // this flag will be set to false + allowLegacyWildcard bool + // subscribedResourceNames provides the resources explicitly requested by the client // This list might be non-empty even when set as wildcard. subscribedResourceNames map[string]struct{} - // ackedResources contains the resources acknowledged by the client and the acknowledged versions. - ackedResources map[string]string + // returnedResources contains the resources acknowledged by the client and the acknowledged versions. + returnedResources map[string]string } -// NewSubscriptionState initializes a stream state. -func NewSubscriptionState(wildcard bool, initialResourceVersions map[string]string) SubscriptionState { - state := SubscriptionState{ +// NewSubscription initializes a subscription state. +func NewSubscription(wildcard bool, initialResourceVersions map[string]string) Subscription { + state := Subscription{ wildcard: wildcard, + allowLegacyWildcard: wildcard, subscribedResourceNames: map[string]struct{}{}, - ackedResources: initialResourceVersions, + returnedResources: initialResourceVersions, } if initialResourceVersions == nil { - state.ackedResources = make(map[string]string) + state.returnedResources = make(map[string]string) } return state } -// GetSubscribedResources returns the list of resources currently explicitly subscribed to -// If the request is set to wildcard it may be empty -// Currently populated only when using delta-xds -func (s SubscriptionState) GetSubscribedResources() map[string]struct{} { - return s.subscribedResourceNames -} +// SetResourceSubscription updates the subscribed resources (including the wildcard state) +// based on the full state of subscribed resources provided in the request +// Used in sotw subscriptions +// Behavior is based on +// https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#how-the-client-specifies-what-resources-to-return +func (s *Subscription) SetResourceSubscription(subscribed []string) { + if s.allowLegacyWildcard { + if len(subscribed) == 0 { + // We were wildcard based on legacy behavior and still don't request any resource + // The watch remains wildcard + return + } else { + // A resource was provided (might be an explicit wildcard) + // Documentation states that we should no longer allow to fallback to the previous case + // and no longer setting wildcard would no longer subscribe to anything + s.allowLegacyWildcard = false + } + } -// SetSubscribedResources is setting the list of resources currently explicitly subscribed to -// It is decorrelated from the wildcard state of the stream -// Currently used only when using delta-xds -func (s *SubscriptionState) SetSubscribedResources(subscribedResourceNames map[string]struct{}) { - s.subscribedResourceNames = subscribedResourceNames -} + subscribedResources := make(map[string]struct{}, len(subscribed)) + explicitWildcardSet := false + for _, resource := range subscribed { + if resource == explicitWildcard { + explicitWildcardSet = true + } else { + subscribedResources[resource] = struct{}{} + } + } -// GetACKedResources returns the list of resources acknowledged by the client -// and their acknowledged version -func (s SubscriptionState) GetACKedResources() map[string]string { - return s.ackedResources + // Explicit subscription to wildcard as we are not in legacy wildcard behavior + s.wildcard = explicitWildcardSet + s.subscribedResourceNames = subscribedResources } -// SetACKedResources sets a list of resource versions currently known by the client -// The cache can use this state to compute resources added/updated/deleted -func (s *SubscriptionState) SetACKedResources(resourceVersions map[string]string) { - s.ackedResources = resourceVersions +// UpdateResourceSubscriptions updates the subscribed resources (including the wildcard state) +// based on newly subscribed or unsubscribed resources +// Used in delta subscriptions +func (s *Subscription) UpdateResourceSubscriptions(subscribed []string, unsubscribed []string) { + // Handles legacy wildcard behavior first to exit if we are still in this behavior + if s.allowLegacyWildcard { + // The protocol (as of v1.29.0) only references subscribed as triggering + // exiting legacy wildcard behavior, so we currently not check unsubscribed + if len(subscribed) == 0 { + // We were wildcard based on legacy behavior and still don't request any resource + // The watch remains wildcard + return + } else { + // A resource was provided (might be an explicit wildcard) + // Documentation states that we should no longer allow to fallback to the previous case + // and no longer setting wildcard would no longer subscribe to anything + // The watch does remain wildcard if not explicitly unsubscribed (from the example in + // https://www.envoyproxy.io/docs/envoy/v1.29.0/api-docs/xds_protocol#how-the-client-specifies-what-resources-to-return) + s.allowLegacyWildcard = false + } + } + + // Handle subscriptions first + for _, resource := range subscribed { + if resource == explicitWildcard { + s.wildcard = true + continue + } + s.subscribedResourceNames[resource] = struct{}{} + } + + // Then unsubscriptions + for _, resource := range unsubscribed { + if resource == explicitWildcard { + s.wildcard = false + continue + } + if _, ok := s.subscribedResourceNames[resource]; ok && s.wildcard { + // The XDS protocol states that: + // * if a watch is currently wildcard + // * a resource is explicitly unsubscribed by name + // Then the control-plane must return in the response whether the resource is removed (if no longer present for this node) + // or still existing. In the latter case the entire resource must be returned, same as if it had been created or updated + // To achieve that, we mark the resource as having been returned with an empty version. While creating the response, the cache will either: + // * detect the version change, and return the resource (as an update) + // * detect the resource deletion, and set it as removed in the response + s.returnedResources[resource] = "" + } + delete(s.subscribedResourceNames, resource) + } } -// SetWildcard will set the subscription to return all known resources -func (s *SubscriptionState) SetWildcard(wildcard bool) { - s.wildcard = wildcard +// SubscribedResources returns the list of resources currently explicitly subscribed to +// If the request is set to wildcard it may be empty +func (s Subscription) SubscribedResources() map[string]struct{} { + return s.subscribedResourceNames } // IsWildcard returns whether or not the subscription currently has a wildcard watch -func (s SubscriptionState) IsWildcard() bool { +func (s Subscription) IsWildcard() bool { return s.wildcard } // WatchesResources returns whether at least one of the resources provided is currently being watched by the subscription. // If the request is wildcard, it will always return true, // otherwise it will compare the provided resources to the list of resources currently subscribed -func (s SubscriptionState) WatchesResources(resourceNames map[string]struct{}) bool { +func (s Subscription) WatchesResources(resourceNames map[string]struct{}) bool { if s.wildcard { return true } @@ -78,3 +150,15 @@ func (s SubscriptionState) WatchesResources(resourceNames map[string]struct{}) b } return false } + +// ReturnedResources returns the list of resources returned to the client +// and their version +func (s Subscription) ReturnedResources() map[string]string { + return s.returnedResources +} + +// SetReturnedResources sets a list of resource versions currently known by the client +// The cache can use this state to compute resources added/updated/deleted +func (s *Subscription) SetReturnedResources(resourceVersions map[string]string) { + s.returnedResources = resourceVersions +} diff --git a/pkg/server/v3/delta_test.go b/pkg/server/v3/delta_test.go index 95e8a3dacc..7215287143 100644 --- a/pkg/server/v3/delta_test.go +++ b/pkg/server/v3/delta_test.go @@ -20,7 +20,7 @@ import ( "github.com/envoyproxy/go-control-plane/pkg/test/resource/v3" ) -func (config *mockConfigWatcher) CreateDeltaWatch(req *discovery.DeltaDiscoveryRequest, state cache.SubscriptionState, out chan cache.DeltaResponse) (func(), error) { +func (config *mockConfigWatcher) CreateDeltaWatch(req *discovery.DeltaDiscoveryRequest, state cache.Subscription, out chan cache.DeltaResponse) (func(), error) { config.deltaCounts[req.GetTypeUrl()] = config.deltaCounts[req.GetTypeUrl()] + 1 // This is duplicated from pkg/cache/v3/delta.go as private there @@ -37,7 +37,7 @@ func (config *mockConfigWatcher) CreateDeltaWatch(req *discovery.DeltaDiscoveryR // If we are handling a wildcard request, we want to respond with all resources switch { case state.IsWildcard(): - if len(state.GetACKedResources()) == 0 { + if len(state.ReturnedResources()) == 0 { filtered = make([]types.Resource, 0, len(resourceMap)) } nextVersionMap = make(map[string]string, len(resourceMap)) @@ -46,24 +46,24 @@ func (config *mockConfigWatcher) CreateDeltaWatch(req *discovery.DeltaDiscoveryR // we can just set it here to be used for comparison later version := versionMap[name] nextVersionMap[name] = version - prevVersion, found := state.GetACKedResources()[name] + prevVersion, found := state.ReturnedResources()[name] if !found || (prevVersion != version) { filtered = append(filtered, r) } } // Compute resources for removal - for name := range state.GetACKedResources() { + for name := range state.ReturnedResources() { if _, ok := resourceMap[name]; !ok { toRemove = append(toRemove, name) } } default: - nextVersionMap = make(map[string]string, len(state.GetSubscribedResources())) + nextVersionMap = make(map[string]string, len(state.SubscribedResources())) // state.GetResourceVersions() may include resources no longer subscribed // In the current code this gets silently cleaned when updating the version map - for name := range state.GetSubscribedResources() { - prevVersion, found := state.GetACKedResources()[name] + for name := range state.SubscribedResources() { + prevVersion, found := state.ReturnedResources()[name] if r, ok := resourceMap[name]; ok { nextVersion := versionMap[name] if prevVersion != nextVersion { diff --git a/pkg/server/v3/server_test.go b/pkg/server/v3/server_test.go index 3604f88525..ed0f530ff6 100644 --- a/pkg/server/v3/server_test.go +++ b/pkg/server/v3/server_test.go @@ -49,7 +49,7 @@ type mockConfigWatcher struct { mu *sync.RWMutex } -func (config *mockConfigWatcher) CreateWatch(req *discovery.DiscoveryRequest, _ cache.SubscriptionState, out chan cache.Response) (func(), error) { +func (config *mockConfigWatcher) CreateWatch(req *discovery.DiscoveryRequest, _ cache.Subscription, out chan cache.Response) (func(), error) { typ := req.GetTypeUrl() config.counts[typ] = config.counts[typ] + 1 if len(config.responses[typ]) > 0 {