diff --git a/pkg/server/delta/v3/server.go b/pkg/server/delta/v3/server.go index e556e54a74..eb6c90c4a8 100644 --- a/pkg/server/delta/v3/server.go +++ b/pkg/server/delta/v3/server.go @@ -245,6 +245,11 @@ func (s *server) unsubscribe(resources []string, streamState *stream.StreamState // * detect the version change, and return the resource (as an update) // * detect the resource deletion, and set it as removed in the response streamState.GetKnownResources()[resource] = "" + } else { + // Clean-up the state version for this resource. + // This addresses https://github.com/envoyproxy/go-control-plane/issues/583, where a resource unsubscribed then subscribed again + // is not sent again while envoy expects it. + delete(streamState.GetKnownResources(), resource) } delete(sv, resource) } diff --git a/pkg/server/v3/delta_test.go b/pkg/server/v3/delta_test.go index 66ddc5bd1a..d0f6bec212 100644 --- a/pkg/server/v3/delta_test.go +++ b/pkg/server/v3/delta_test.go @@ -10,6 +10,7 @@ import ( "google.golang.org/grpc" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" "github.com/envoyproxy/go-control-plane/pkg/cache/types" @@ -467,7 +468,7 @@ func TestDeltaWildcardSubscriptions(t *testing.T) { }, } - validateResponse := func(t *testing.T, replies <-chan *discovery.DeltaDiscoveryResponse, expectedResources []string, expectedRemovedResources []string) { + validateResponse := func(t *testing.T, replies <-chan *discovery.DeltaDiscoveryResponse, expectedResources []string, expectedRemovedResources []string) string { t.Helper() select { case response := <-replies: @@ -480,8 +481,10 @@ func TestDeltaWildcardSubscriptions(t *testing.T) { assert.ElementsMatch(t, names, expectedResources) assert.ElementsMatch(t, response.RemovedResources, expectedRemovedResources) } + return response.Nonce case <-time.After(1 * time.Second): - t.Fatalf("got no response") + require.Fail(t, "got no response") + return "" } } @@ -530,6 +533,41 @@ func TestDeltaWildcardSubscriptions(t *testing.T) { }) + t.Run("unsubscribed resources are sent again if re-subscribed later on and the version has not changed", func(t *testing.T) { + resp := makeMockDeltaStream(t) + defer close(resp.recv) + s := server.NewServer(context.Background(), config, server.CallbackFuncs{}) + go func() { + err := s.DeltaAggregatedResources(resp) + assert.NoError(t, err) + }() + + resp.recv <- &discovery.DeltaDiscoveryRequest{ + Node: node, + TypeUrl: rsrc.EndpointType, + ResourceNamesSubscribe: []string{"endpoints0", "endpoints1"}, + } + nonce := validateResponse(t, resp.sent, []string{"endpoints0", "endpoints1"}, nil) + + // Unsubscribe from endpoints0 + resp.recv <- &discovery.DeltaDiscoveryRequest{ + Node: node, + TypeUrl: rsrc.EndpointType, + ResponseNonce: nonce, + ResourceNamesUnsubscribe: []string{"endpoints0"}, + } + // No reply is expected here + + // Subscribe again to endpoints0 + resp.recv <- &discovery.DeltaDiscoveryRequest{ + Node: node, + TypeUrl: rsrc.EndpointType, + ResponseNonce: nonce, + ResourceNamesSubscribe: []string{"endpoints0"}, + } + validateResponse(t, resp.sent, []string{"endpoints0"}, nil) + }) + t.Run("* subscribtion/unsubscription support", func(t *testing.T) { resp := makeMockDeltaStream(t) defer close(resp.recv)