Skip to content

Commit 66dce37

Browse files
Rename ClientState to SubscriptionState. Ensure WatchesResources is also compatible with sotw
Signed-off-by: Valerian Roche <[email protected]>
1 parent 57978c5 commit 66dce37

File tree

18 files changed

+413
-271
lines changed

18 files changed

+413
-271
lines changed

pkg/cache/v3/cache.go

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,12 @@ type Request = discovery.DiscoveryRequest
3636
// DeltaRequest is an alias for the delta discovery request type.
3737
type DeltaRequest = discovery.DeltaDiscoveryRequest
3838

39-
// ClientState provides additional data on the client knowledge for the type matching the request
39+
// SubscriptionState provides additional data on the client knowledge for the type matching the request
4040
// This allows proper implementation of stateful aspects of the protocol (e.g. returning only some updated resources)
4141
// Though the methods may return mutable parts of the state for performance reasons,
4242
// the cache is expected to consider this state as immutable and thread safe between a watch creation and its cancellation
43-
type ClientState interface {
44-
// GetKnownResources returns the list of resources the clients has ACKed and their associated version.
43+
type SubscriptionState interface {
44+
// GetKnownResources returns a list of resources that the client has ACK'd and their associated version.
4545
// The versions are:
4646
// - delta protocol: version of the specific resource set in the response
4747
// - sotw protocol: version of the global response when the resource was last ACKed
@@ -53,8 +53,15 @@ type ClientState interface {
5353
GetSubscribedResources() map[string]struct{}
5454

5555
// IsWildcard returns whether the client has a wildcard watch.
56-
// This considers subtilities related to the current migration of wildcard definition within the protocol.
56+
// This considers subtleties related to the current migration of wildcard definitions within the protocol.
57+
// More details on the behavior of wildcard are present at https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#how-the-client-specifies-what-resources-to-return
5758
IsWildcard() bool
59+
60+
// WatchesResources returns whether at least one of the resources provided is currently being watched by the subscription.
61+
// It is currently only applicable to delta-xds.
62+
// If the request is wildcard, it will always return true,
63+
// otherwise it will compare the provided resources to the list of resources currently subscribed
64+
WatchesResources(resourceNames map[string]struct{}) bool
5865
}
5966

6067
// ConfigWatcher requests watches for configuration resources by a node, last
@@ -74,7 +81,7 @@ type ConfigWatcher interface {
7481
//
7582
// Cancel is an optional function to release resources in the producer. If
7683
// provided, the consumer may call this function multiple times.
77-
CreateWatch(*Request, ClientState, chan Response) (cancel func())
84+
CreateWatch(*Request, SubscriptionState, chan Response) (cancel func(), err error)
7885

7986
// CreateDeltaWatch returns a new open incremental xDS watch.
8087
// This is the entrypoint to propagate configuration changes the
@@ -86,7 +93,7 @@ type ConfigWatcher interface {
8693
//
8794
// Cancel is an optional function to release resources in the producer. If
8895
// provided, the consumer may call this function multiple times.
89-
CreateDeltaWatch(*DeltaRequest, ClientState, chan DeltaResponse) (cancel func())
96+
CreateDeltaWatch(*DeltaRequest, SubscriptionState, chan DeltaResponse) (cancel func(), err error)
9097
}
9198

9299
// ConfigFetcher fetches configuration resources from cache

pkg/cache/v3/delta.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ type resourceContainer struct {
2727
systemVersion string
2828
}
2929

30-
func createDeltaResponse(ctx context.Context, req *DeltaRequest, state ClientState, resources resourceContainer) *RawDeltaResponse {
30+
func createDeltaResponse(ctx context.Context, req *DeltaRequest, state SubscriptionState, resources resourceContainer) *RawDeltaResponse {
3131
// variables to build our response with
3232
var nextVersionMap map[string]string
3333
var filtered []types.Resource

pkg/cache/v3/delta_test.go

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -35,14 +35,15 @@ func TestSnapshotCacheDeltaWatch(t *testing.T) {
3535
// Make our initial request as a wildcard to get all resources and make sure the wildcard requesting works as intended
3636
for _, typ := range testTypes {
3737
watches[typ] = make(chan cache.DeltaResponse, 1)
38-
state := stream.NewStreamState(true, nil)
39-
c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
38+
state := stream.NewSubscriptionState(true, nil)
39+
_, err := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
4040
Node: &core.Node{
4141
Id: "node",
4242
},
4343
TypeUrl: typ,
4444
ResourceNamesSubscribe: names[typ],
4545
}, state, watches[typ])
46+
require.NoError(t, err)
4647
}
4748

4849
if err := c.SetSnapshot(context.Background(), key, fixture.snapshot()); err != nil {
@@ -68,17 +69,18 @@ func TestSnapshotCacheDeltaWatch(t *testing.T) {
6869
// all resources as well as individual resource removals
6970
for _, typ := range testTypes {
7071
watches[typ] = make(chan cache.DeltaResponse, 1)
71-
state := stream.NewStreamState(false, versionMap[typ])
72+
state := stream.NewSubscriptionState(false, versionMap[typ])
7273
for resource := range versionMap[typ] {
7374
state.GetSubscribedResources()[resource] = struct{}{}
7475
}
75-
c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
76+
_, err := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
7677
Node: &core.Node{
7778
Id: "node",
7879
},
7980
TypeUrl: typ,
8081
ResourceNamesSubscribe: names[typ],
8182
}, state, watches[typ])
83+
require.NoError(t, err)
8284
}
8385

8486
if count := c.GetStatusInfo(key).GetNumDeltaWatches(); count != len(testTypes) {
@@ -111,21 +113,22 @@ func TestSnapshotCacheDeltaWatch(t *testing.T) {
111113
func TestDeltaRemoveResources(t *testing.T) {
112114
c := cache.NewSnapshotCache(false, group{}, logger{t: t})
113115
watches := make(map[string]chan cache.DeltaResponse)
114-
streams := make(map[string]*stream.StreamState)
116+
streams := make(map[string]*stream.SubscriptionState)
115117

116118
// At this stage the cache is empty, so a watch is opened
117119
for _, typ := range testTypes {
118120
watches[typ] = make(chan cache.DeltaResponse, 1)
119-
state := stream.NewStreamState(true, make(map[string]string))
121+
state := stream.NewSubscriptionState(true, make(map[string]string))
120122
streams[typ] = &state
121123
// We don't specify any resource name subscriptions here because we want to make sure we test wildcard
122124
// functionality. This means we should receive all resources back without requesting a subscription by name.
123-
c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
125+
_, err := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
124126
Node: &core.Node{
125127
Id: "node",
126128
},
127129
TypeUrl: typ,
128130
}, *streams[typ], watches[typ])
131+
require.NoError(t, err)
129132
}
130133

131134
snapshot := fixture.snapshot()
@@ -141,7 +144,7 @@ func TestDeltaRemoveResources(t *testing.T) {
141144
case out := <-watches[typ]:
142145
assertResourceMapEqual(t, cache.IndexRawResourcesByName(out.(*cache.RawDeltaResponse).Resources), snapshot.GetResources(typ))
143146
nextVersionMap := out.GetNextVersionMap()
144-
streams[typ].SetResourceVersions(nextVersionMap)
147+
streams[typ].SetKnownResources(nextVersionMap)
145148
case <-time.After(time.Second):
146149
require.Fail(t, "failed to receive a snapshot response")
147150
}
@@ -152,13 +155,14 @@ func TestDeltaRemoveResources(t *testing.T) {
152155
// test the removal of certain resources from a partial snapshot
153156
for _, typ := range testTypes {
154157
watches[typ] = make(chan cache.DeltaResponse, 1)
155-
c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
158+
_, err := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
156159
Node: &core.Node{
157160
Id: "node",
158161
},
159162
TypeUrl: typ,
160163
ResponseNonce: "nonce",
161164
}, *streams[typ], watches[typ])
165+
require.NoError(t, err)
162166
}
163167

164168
assert.Equal(t, len(testTypes), c.GetStatusInfo(key).GetNumDeltaWatches(), "watches should be created for the latest version")
@@ -202,14 +206,15 @@ func TestConcurrentSetDeltaWatch(t *testing.T) {
202206
t.Fatalf("snapshot failed: %s", err)
203207
}
204208
} else {
205-
state := stream.NewStreamState(false, make(map[string]string))
206-
cancel := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
209+
state := stream.NewSubscriptionState(false, make(map[string]string))
210+
cancel, err := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
207211
Node: &core.Node{
208212
Id: id,
209213
},
210214
TypeUrl: rsrc.EndpointType,
211215
ResourceNamesSubscribe: []string{clusterName},
212216
}, state, responses)
217+
require.NoError(t, err)
213218

214219
defer cancel()
215220
}
@@ -225,21 +230,22 @@ func TestSnapshotDeltaCacheWatchTimeout(t *testing.T) {
225230

226231
// Create a non-buffered channel that will block sends.
227232
watchCh := make(chan cache.DeltaResponse)
228-
state := stream.NewStreamState(false, nil)
233+
state := stream.NewSubscriptionState(false, nil)
229234
state.SetSubscribedResources(map[string]struct{}{names[rsrc.EndpointType][0]: {}})
230-
c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
235+
_, err := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
231236
Node: &core.Node{
232237
Id: key,
233238
},
234239
TypeUrl: rsrc.EndpointType,
235240
ResourceNamesSubscribe: names[rsrc.EndpointType],
236241
}, state, watchCh)
242+
require.NoError(t, err)
237243

238244
// The first time we set the snapshot without consuming from the blocking channel, so this should time out.
239245
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
240246
defer cancel()
241247

242-
err := c.SetSnapshot(ctx, key, fixture.snapshot())
248+
err = c.SetSnapshot(ctx, key, fixture.snapshot())
243249
assert.EqualError(t, err, context.Canceled.Error())
244250

245251
// Now reset the snapshot with a consuming channel. This verifies that if setting the snapshot fails,
@@ -269,14 +275,15 @@ func TestSnapshotCacheDeltaWatchCancel(t *testing.T) {
269275
c := cache.NewSnapshotCache(true, group{}, logger{t: t})
270276
for _, typ := range testTypes {
271277
responses := make(chan cache.DeltaResponse, 1)
272-
state := stream.NewStreamState(false, make(map[string]string))
273-
cancel := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
278+
state := stream.NewSubscriptionState(false, make(map[string]string))
279+
cancel, err := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
274280
Node: &core.Node{
275281
Id: key,
276282
},
277283
TypeUrl: typ,
278284
ResourceNamesSubscribe: names[typ],
279285
}, state, responses)
286+
require.NoError(t, err)
280287

281288
// Cancel the watch
282289
cancel()

pkg/cache/v3/linear.go

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ package cache
1717
import (
1818
"context"
1919
"errors"
20+
"fmt"
2021
"strconv"
2122
"strings"
2223
"sync"
@@ -163,19 +164,19 @@ func (cache *LinearCache) notifyAll(modified map[string]struct{}) {
163164
}
164165

165166
for id, watch := range cache.deltaWatches {
166-
if !watch.WatchesResources(modified) {
167+
if !watch.subscriptionState.WatchesResources(modified) {
167168
continue
168169
}
169170

170-
res := cache.respondDelta(watch.Request, watch.Response, watch.clientState)
171+
res := cache.respondDelta(watch.Request, watch.Response, watch.subscriptionState)
171172
if res != nil {
172173
delete(cache.deltaWatches, id)
173174
}
174175
}
175176
}
176177
}
177178

178-
func (cache *LinearCache) respondDelta(request *DeltaRequest, value chan DeltaResponse, clientState ClientState) *RawDeltaResponse {
179+
func (cache *LinearCache) respondDelta(request *DeltaRequest, value chan DeltaResponse, clientState SubscriptionState) *RawDeltaResponse {
179180
resp := createDeltaResponse(context.Background(), request, clientState, resourceContainer{
180181
resourceMap: cache.resources,
181182
versionMap: cache.versionMap,
@@ -297,10 +298,10 @@ func (cache *LinearCache) GetResources() map[string]types.Resource {
297298
return resources
298299
}
299300

300-
func (cache *LinearCache) CreateWatch(request *Request, clientState ClientState, value chan Response) func() {
301+
func (cache *LinearCache) CreateWatch(request *Request, _ SubscriptionState, value chan Response) (func(), error) {
301302
if request.TypeUrl != cache.typeURL {
302303
value <- nil
303-
return nil
304+
return nil, fmt.Errorf("request type %s does not match cache type %s", request.TypeUrl, cache.typeURL)
304305
}
305306
// If the version is not up to date, check whether any requested resource has
306307
// been updated between the last version and the current version. This avoids the problem
@@ -336,7 +337,7 @@ func (cache *LinearCache) CreateWatch(request *Request, clientState ClientState,
336337
}
337338
if stale {
338339
cache.respond(value, staleResources)
339-
return nil
340+
return nil, nil
340341
}
341342
// Create open watches since versions are up to date.
342343
if len(request.ResourceNames) == 0 {
@@ -345,7 +346,7 @@ func (cache *LinearCache) CreateWatch(request *Request, clientState ClientState,
345346
cache.mu.Lock()
346347
defer cache.mu.Unlock()
347348
delete(cache.watchAll, value)
348-
}
349+
}, nil
349350
}
350351
for _, name := range request.ResourceNames {
351352
set, exists := cache.watches[name]
@@ -367,10 +368,10 @@ func (cache *LinearCache) CreateWatch(request *Request, clientState ClientState,
367368
delete(cache.watches, name)
368369
}
369370
}
370-
}
371+
}, nil
371372
}
372373

373-
func (cache *LinearCache) CreateDeltaWatch(request *DeltaRequest, clientState ClientState, value chan DeltaResponse) func() {
374+
func (cache *LinearCache) CreateDeltaWatch(request *DeltaRequest, clientState SubscriptionState, value chan DeltaResponse) (func(), error) {
374375
cache.mu.Lock()
375376
defer cache.mu.Unlock()
376377

@@ -398,12 +399,12 @@ func (cache *LinearCache) CreateDeltaWatch(request *DeltaRequest, clientState Cl
398399
cache.typeURL, clientState.GetSubscribedResources(), cache.getVersion())
399400
}
400401

401-
cache.deltaWatches[watchID] = DeltaResponseWatch{Request: request, Response: value, clientState: clientState}
402+
cache.deltaWatches[watchID] = DeltaResponseWatch{Request: request, Response: value, subscriptionState: clientState}
402403

403-
return cache.cancelDeltaWatch(watchID)
404+
return cache.cancelDeltaWatch(watchID), nil
404405
}
405406

406-
return nil
407+
return nil, nil
407408
}
408409

409410
func (cache *LinearCache) updateVersionMap(modified map[string]struct{}) error {

0 commit comments

Comments
 (0)