Skip to content

Commit 5943db4

Browse files
Aligned naming for subscription. Simplify sotw server by removing unneeded part from ads/non-ads
1 parent cefeb71 commit 5943db4

19 files changed

+388
-431
lines changed

pkg/cache/v3/cache.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -36,21 +36,21 @@ type Request = discovery.DiscoveryRequest
3636
// DeltaRequest is an alias for the delta discovery request type.
3737
type DeltaRequest = discovery.DeltaDiscoveryRequest
3838

39-
// SubscriptionState stores the server view of the client state for a given resource type.
39+
// Subscription stores the server view of the client state for a given resource type.
4040
// This allows proper implementation of stateful aspects of the protocol (e.g. returning only some updated resources).
4141
// Though the methods may return mutable parts of the state for performance reasons,
4242
// the cache is expected to consider this state as immutable and thread safe between a watch creation and its cancellation.
43-
type SubscriptionState interface {
44-
// GetACKedResources returns a list of resources that the client has ACK'd and their associated version.
43+
type Subscription interface {
44+
// ReturnedResources returns a list of resources that clients have ACK'd and their associated version.
4545
// The versions are:
4646
// - delta protocol: version of the specific resource set in the response
4747
// - sotw protocol: version of the global response when the resource was last ACKed
48-
GetACKedResources() map[string]string
48+
ReturnedResources() map[string]string
4949

50-
// GetSubscribedResources returns the list of resources currently subscribed to by the client for the type.
50+
// SubscribedResources returns the list of resources currently subscribed to by the client for the type.
5151
// For delta it keeps track of subscription updates across requests
5252
// For sotw it is a normalized view of the last request resources
53-
GetSubscribedResources() map[string]struct{}
53+
SubscribedResources() map[string]struct{}
5454

5555
// IsWildcard returns whether the client has a wildcard watch.
5656
// This considers subtleties related to the current migration of wildcard definitions within the protocol.
@@ -81,7 +81,7 @@ type ConfigWatcher interface {
8181
//
8282
// Cancel is an optional function to release resources in the producer. If
8383
// provided, the consumer may call this function multiple times.
84-
CreateWatch(*Request, SubscriptionState, chan Response) (cancel func(), err error)
84+
CreateWatch(*Request, Subscription, chan Response) (cancel func(), err error)
8585

8686
// CreateDeltaWatch returns a new open incremental xDS watch.
8787
// This is the entrypoint to propagate configuration changes the
@@ -93,7 +93,7 @@ type ConfigWatcher interface {
9393
//
9494
// Cancel is an optional function to release resources in the producer. If
9595
// provided, the consumer may call this function multiple times.
96-
CreateDeltaWatch(*DeltaRequest, SubscriptionState, chan DeltaResponse) (cancel func(), err error)
96+
CreateDeltaWatch(*DeltaRequest, Subscription, chan DeltaResponse) (cancel func(), err error)
9797
}
9898

9999
// ConfigFetcher fetches configuration resources from cache

pkg/cache/v3/delta.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ type resourceContainer struct {
2727
systemVersion string
2828
}
2929

30-
func createDeltaResponse(ctx context.Context, req *DeltaRequest, state SubscriptionState, resources resourceContainer) *RawDeltaResponse {
30+
func createDeltaResponse(ctx context.Context, req *DeltaRequest, state Subscription, resources resourceContainer) *RawDeltaResponse {
3131
// variables to build our response with
3232
var nextVersionMap map[string]string
3333
var filtered []types.Resource
@@ -36,7 +36,7 @@ func createDeltaResponse(ctx context.Context, req *DeltaRequest, state Subscript
3636
// If we are handling a wildcard request, we want to respond with all resources
3737
switch {
3838
case state.IsWildcard():
39-
if len(state.GetACKedResources()) == 0 {
39+
if len(state.ReturnedResources()) == 0 {
4040
filtered = make([]types.Resource, 0, len(resources.resourceMap))
4141
}
4242
nextVersionMap = make(map[string]string, len(resources.resourceMap))
@@ -45,25 +45,25 @@ func createDeltaResponse(ctx context.Context, req *DeltaRequest, state Subscript
4545
// we can just set it here to be used for comparison later
4646
version := resources.versionMap[name]
4747
nextVersionMap[name] = version
48-
prevVersion, found := state.GetACKedResources()[name]
48+
prevVersion, found := state.ReturnedResources()[name]
4949
if !found || (prevVersion != version) {
5050
filtered = append(filtered, r)
5151
}
5252
}
5353

5454
// Compute resources for removal
5555
// The resource version can be set to "" here to trigger a removal even if never returned before
56-
for name := range state.GetACKedResources() {
56+
for name := range state.ReturnedResources() {
5757
if _, ok := resources.resourceMap[name]; !ok {
5858
toRemove = append(toRemove, name)
5959
}
6060
}
6161
default:
62-
nextVersionMap = make(map[string]string, len(state.GetSubscribedResources()))
62+
nextVersionMap = make(map[string]string, len(state.SubscribedResources()))
6363
// state.GetResourceVersions() may include resources no longer subscribed
6464
// In the current code this gets silently cleaned when updating the version map
65-
for name := range state.GetSubscribedResources() {
66-
prevVersion, found := state.GetACKedResources()[name]
65+
for name := range state.SubscribedResources() {
66+
prevVersion, found := state.ReturnedResources()[name]
6767
if r, ok := resources.resourceMap[name]; ok {
6868
nextVersion := resources.versionMap[name]
6969
if prevVersion != nextVersion {

pkg/cache/v3/delta_test.go

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ func TestSnapshotCacheDeltaWatch(t *testing.T) {
3535
// Make our initial request as a wildcard to get all resources and make sure the wildcard requesting works as intended
3636
for _, typ := range testTypes {
3737
watches[typ] = make(chan cache.DeltaResponse, 1)
38-
state := stream.NewSubscriptionState(true, nil)
38+
state := stream.NewSubscription(true, nil)
3939
_, err := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
4040
Node: &core.Node{
4141
Id: "node",
@@ -69,9 +69,9 @@ func TestSnapshotCacheDeltaWatch(t *testing.T) {
6969
// all resources as well as individual resource removals
7070
for _, typ := range testTypes {
7171
watches[typ] = make(chan cache.DeltaResponse, 1)
72-
state := stream.NewSubscriptionState(false, versionMap[typ])
72+
state := stream.NewSubscription(false, versionMap[typ])
7373
for resource := range versionMap[typ] {
74-
state.GetSubscribedResources()[resource] = struct{}{}
74+
state.SubscribedResources()[resource] = struct{}{}
7575
}
7676
_, err := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
7777
Node: &core.Node{
@@ -113,21 +113,21 @@ func TestSnapshotCacheDeltaWatch(t *testing.T) {
113113
func TestDeltaRemoveResources(t *testing.T) {
114114
c := cache.NewSnapshotCache(false, group{}, logger{t: t})
115115
watches := make(map[string]chan cache.DeltaResponse)
116-
streams := make(map[string]*stream.SubscriptionState)
116+
subs := make(map[string]*stream.Subscription)
117117

118118
// At this stage the cache is empty, so a watch is opened
119119
for _, typ := range testTypes {
120120
watches[typ] = make(chan cache.DeltaResponse, 1)
121-
state := stream.NewSubscriptionState(true, make(map[string]string))
122-
streams[typ] = &state
121+
state := stream.NewSubscription(true, make(map[string]string))
122+
subs[typ] = &state
123123
// We don't specify any resource name subscriptions here because we want to make sure we test wildcard
124124
// functionality. This means we should receive all resources back without requesting a subscription by name.
125125
_, err := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
126126
Node: &core.Node{
127127
Id: "node",
128128
},
129129
TypeUrl: typ,
130-
}, *streams[typ], watches[typ])
130+
}, *subs[typ], watches[typ])
131131
require.NoError(t, err)
132132
}
133133

@@ -144,7 +144,7 @@ func TestDeltaRemoveResources(t *testing.T) {
144144
case out := <-watches[typ]:
145145
assertResourceMapEqual(t, cache.IndexRawResourcesByName(out.(*cache.RawDeltaResponse).Resources), snapshot.GetResources(typ))
146146
nextVersionMap := out.GetNextVersionMap()
147-
streams[typ].SetACKedResources(nextVersionMap)
147+
subs[typ].SetReturnedResources(nextVersionMap)
148148
case <-time.After(time.Second):
149149
require.Fail(t, "failed to receive a snapshot response")
150150
}
@@ -161,7 +161,7 @@ func TestDeltaRemoveResources(t *testing.T) {
161161
},
162162
TypeUrl: typ,
163163
ResponseNonce: "nonce",
164-
}, *streams[typ], watches[typ])
164+
}, *subs[typ], watches[typ])
165165
require.NoError(t, err)
166166
}
167167

@@ -181,7 +181,7 @@ func TestDeltaRemoveResources(t *testing.T) {
181181
assert.Equal(t, []string{"otherCluster"}, out.(*cache.RawDeltaResponse).RemovedResources)
182182
nextVersionMap := out.GetNextVersionMap()
183183
// make sure the version maps are different since we no longer are tracking any endpoint resources
184-
assert.NotEqual(t, nextVersionMap, streams[testTypes[0]].GetACKedResources(), "versionMap for the endpoint resource type did not change")
184+
assert.NotEqual(t, nextVersionMap, subs[testTypes[0]].ReturnedResources(), "versionMap for the endpoint resource type did not change")
185185
case <-time.After(time.Second):
186186
assert.Fail(t, "failed to receive snapshot response")
187187
}
@@ -206,7 +206,7 @@ func TestConcurrentSetDeltaWatch(t *testing.T) {
206206
t.Fatalf("snapshot failed: %s", err)
207207
}
208208
} else {
209-
state := stream.NewSubscriptionState(false, make(map[string]string))
209+
state := stream.NewSubscription(false, make(map[string]string))
210210
cancel, err := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
211211
Node: &core.Node{
212212
Id: id,
@@ -230,7 +230,7 @@ func TestSnapshotDeltaCacheWatchTimeout(t *testing.T) {
230230

231231
// Create a non-buffered channel that will block sends.
232232
watchCh := make(chan cache.DeltaResponse)
233-
state := stream.NewSubscriptionState(false, nil)
233+
state := stream.NewSubscription(false, nil)
234234
state.SetSubscribedResources(map[string]struct{}{names[rsrc.EndpointType][0]: {}})
235235
_, err := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
236236
Node: &core.Node{
@@ -275,7 +275,7 @@ func TestSnapshotCacheDeltaWatchCancel(t *testing.T) {
275275
c := cache.NewSnapshotCache(true, group{}, logger{t: t})
276276
for _, typ := range testTypes {
277277
responses := make(chan cache.DeltaResponse, 1)
278-
state := stream.NewSubscriptionState(false, make(map[string]string))
278+
state := stream.NewSubscription(false, make(map[string]string))
279279
cancel, err := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
280280
Node: &core.Node{
281281
Id: key,

pkg/cache/v3/linear.go

Lines changed: 14 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ func NewLinearCache(typeURL string, opts ...LinearCacheOption) *LinearCache {
106106
versionMap: nil,
107107
version: 0,
108108
versionVector: make(map[string]uint64),
109+
log: log.NewDefaultLogger(),
109110
}
110111
for _, opt := range opts {
111112
opt(out)
@@ -164,31 +165,29 @@ func (cache *LinearCache) notifyAll(modified map[string]struct{}) {
164165
}
165166

166167
for id, watch := range cache.deltaWatches {
167-
if !watch.subscriptionState.WatchesResources(modified) {
168+
if !watch.subscription.WatchesResources(modified) {
168169
continue
169170
}
170171

171-
res := cache.respondDelta(watch.Request, watch.Response, watch.subscriptionState)
172+
res := cache.respondDelta(watch.Request, watch.Response, watch.subscription)
172173
if res != nil {
173174
delete(cache.deltaWatches, id)
174175
}
175176
}
176177
}
177178
}
178179

179-
func (cache *LinearCache) respondDelta(request *DeltaRequest, value chan DeltaResponse, clientState SubscriptionState) *RawDeltaResponse {
180-
resp := createDeltaResponse(context.Background(), request, clientState, resourceContainer{
180+
func (cache *LinearCache) respondDelta(request *DeltaRequest, value chan DeltaResponse, sub Subscription) *RawDeltaResponse {
181+
resp := createDeltaResponse(context.Background(), request, sub, resourceContainer{
181182
resourceMap: cache.resources,
182183
versionMap: cache.versionMap,
183184
systemVersion: cache.getVersion(),
184185
})
185186

186187
// Only send a response if there were changes
187188
if len(resp.Resources) > 0 || len(resp.RemovedResources) > 0 {
188-
if cache.log != nil {
189-
cache.log.Debugf("[linear cache] node: %s, sending delta response for typeURL %s with resources: %v removed resources: %v with wildcard: %t",
190-
request.GetNode().GetId(), request.GetTypeUrl(), GetResourceNames(resp.Resources), resp.RemovedResources, clientState.IsWildcard())
191-
}
189+
cache.log.Debugf("[linear cache] node: %s, sending delta response for typeURL %s with resources: %v removed resources: %v with wildcard: %t",
190+
request.GetNode().GetId(), request.GetTypeUrl(), GetResourceNames(resp.Resources), resp.RemovedResources, sub.IsWildcard())
192191
value <- resp
193192
return resp
194193
}
@@ -298,7 +297,7 @@ func (cache *LinearCache) GetResources() map[string]types.Resource {
298297
return resources
299298
}
300299

301-
func (cache *LinearCache) CreateWatch(request *Request, _ SubscriptionState, value chan Response) (func(), error) {
300+
func (cache *LinearCache) CreateWatch(request *Request, _ Subscription, value chan Response) (func(), error) {
302301
if request.GetTypeUrl() != cache.typeURL {
303302
value <- nil
304303
return nil, fmt.Errorf("request type %s does not match cache type %s", request.TypeUrl, cache.typeURL)
@@ -372,7 +371,7 @@ func (cache *LinearCache) CreateWatch(request *Request, _ SubscriptionState, val
372371
}, nil
373372
}
374373

375-
func (cache *LinearCache) CreateDeltaWatch(request *DeltaRequest, clientState SubscriptionState, value chan DeltaResponse) (func(), error) {
374+
func (cache *LinearCache) CreateDeltaWatch(request *DeltaRequest, sub Subscription, value chan DeltaResponse) (func(), error) {
376375
cache.mu.Lock()
377376
defer cache.mu.Unlock()
378377

@@ -385,22 +384,20 @@ func (cache *LinearCache) CreateDeltaWatch(request *DeltaRequest, clientState Su
385384
modified[name] = struct{}{}
386385
}
387386
err := cache.updateVersionMap(modified)
388-
if err != nil && cache.log != nil {
387+
if err != nil {
389388
cache.log.Errorf("failed to update version map: %v", err)
390389
}
391390
}
392-
response := cache.respondDelta(request, value, clientState)
391+
response := cache.respondDelta(request, value, sub)
393392

394393
// if respondDelta returns nil this means that there is no change in any resource version
395394
// create a new watch accordingly
396395
if response == nil {
397396
watchID := cache.nextDeltaWatchID()
398-
if cache.log != nil {
399-
cache.log.Infof("[linear cache] open delta watch ID:%d for %s Resources:%v, system version %q", watchID,
400-
cache.typeURL, clientState.GetSubscribedResources(), cache.getVersion())
401-
}
397+
cache.log.Infof("[linear cache] open delta watch ID:%d for %s Resources:%v, system version %q", watchID,
398+
cache.typeURL, sub.SubscribedResources(), cache.getVersion())
402399

403-
cache.deltaWatches[watchID] = DeltaResponseWatch{Request: request, Response: value, subscriptionState: clientState}
400+
cache.deltaWatches[watchID] = DeltaResponseWatch{Request: request, Response: value, subscription: sub}
404401

405402
return cache.cancelDeltaWatch(watchID), nil
406403
}

0 commit comments

Comments
 (0)