Skip to content

Commit

Permalink
Linearcache should respond immediately for the first wildcard delta r…
Browse files Browse the repository at this point in the history
…equest (#927)

Commit 185b42f ("Respond immediately for the first wildcard request
in a delta stream, If the corresponding snapshot exists and the response
is empty") fixed an issue that the server doesn't response with anything
when the resource for the client was empty which could block client's
initialization. However, it's not applied to the linear cache. Hence,
clients now see different behaviors with servers using different cache
and would be blocked on initialization with linear cache but not with
snapshot cache.

The commit unifies the behavior of the two caches.

Signed-off-by: Quan Tian <[email protected]>
  • Loading branch information
tnqn authored May 8, 2024
1 parent a0c04b2 commit 622985f
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 4 deletions.
4 changes: 3 additions & 1 deletion pkg/cache/v3/linear.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,9 @@ func (cache *LinearCache) respondDelta(request *DeltaRequest, value chan DeltaRe
})

// Only send a response if there were changes
if len(resp.Resources) > 0 || len(resp.RemovedResources) > 0 {
// We want to respond immediately for the first wildcard request in a stream, even if the response is empty
// otherwise, envoy won't complete initialization
if len(resp.Resources) > 0 || len(resp.RemovedResources) > 0 || (state.IsWildcard() && state.IsFirst()) {
if cache.log != nil {
cache.log.Debugf("[linear cache] node: %s, sending delta response for typeURL %s with resources: %v removed resources: %v with wildcard: %t",
request.GetNode().GetId(), request.GetTypeUrl(), GetResourceNames(resp.Resources), resp.RemovedResources, state.IsWildcard())
Expand Down
16 changes: 13 additions & 3 deletions pkg/cache/v3/linear_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,16 +119,17 @@ func validateDeltaResponse(t *testing.T, resp DeltaResponse, resources []resourc
}
}

func verifyDeltaResponse(t *testing.T, ch <-chan DeltaResponse, resources []resourceInfo, deleted []string) {
func verifyDeltaResponse(t *testing.T, ch <-chan DeltaResponse, resources []resourceInfo, deleted []string) DeltaResponse {
t.Helper()
var r DeltaResponse
select {
case r = <-ch:
case <-time.After(5 * time.Second):
t.Error("timeout waiting for delta response")
return
return nil
}
validateDeltaResponse(t, r, resources, deleted)
return r
}

func checkWatchCount(t *testing.T, c *LinearCache, name string, count int) {
Expand Down Expand Up @@ -463,11 +464,20 @@ func TestLinearDeltaWildcard(t *testing.T) {
state1 := stream.NewStreamState(true, map[string]string{})
w1 := make(chan DeltaResponse, 1)
c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state1, w1)
mustBlockDelta(t, w1)
if r1 := verifyDeltaResponse(t, w1, nil, nil); r1 != nil {
state1.SetResourceVersions(r1.GetNextVersionMap())
}
state2 := stream.NewStreamState(true, map[string]string{})
w2 := make(chan DeltaResponse, 1)
c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state2, w2)
if r2 := verifyDeltaResponse(t, w2, nil, nil); r2 != nil {
state2.SetResourceVersions(r2.GetNextVersionMap())
}

c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state1, w1)
mustBlockDelta(t, w1)
c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state2, w2)
mustBlockDelta(t, w2)
checkDeltaWatchCount(t, c, 2)

a := &endpoint.ClusterLoadAssignment{ClusterName: "a"}
Expand Down

0 comments on commit 622985f

Please sign in to comment.