Skip to content

Commit

Permalink
use envoy xDS server for featuretests (#6255)
Browse files Browse the repository at this point in the history
Updates #2134.

Signed-off-by: Steve Kriss <[email protected]>
  • Loading branch information
skriss authored Mar 12, 2024
1 parent 035fa05 commit bc301a1
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 50 deletions.
14 changes: 7 additions & 7 deletions internal/featuretests/v3/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
envoy_service_discovery_v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
envoy_type_v3 "github.com/envoyproxy/go-control-plane/envoy/type/v3"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"google.golang.org/protobuf/types/known/wrapperspb"
core_v1 "k8s.io/api/core/v1"
networking_v1 "k8s.io/api/networking/v1"
Expand Down Expand Up @@ -1097,9 +1098,6 @@ func TestUnreferencedService(t *testing.T) {
rh, c, done := setup(t)
defer done()

// Equals(...) only checks resources, so explicitly
// check version & nonce here and subsequently.

// This service which is added should cause a DAG rebuild
s1 := fixture.NewService("kuard").
WithPorts(core_v1.ServicePort{Port: 80, TargetPort: intstr.FromString("8080")})
Expand Down Expand Up @@ -1141,7 +1139,8 @@ func TestUnreferencedService(t *testing.T) {
),
TypeUrl: clusterType,
})
res.assertEqualVersion(t, "1")
vers := res.VersionInfo

// This service which is added should not cause a DAG rebuild
s2 := fixture.NewService("kuard-notreferenced").
WithPorts(core_v1.ServicePort{Port: 80, TargetPort: intstr.FromInt(8080)})
Expand All @@ -1154,7 +1153,8 @@ func TestUnreferencedService(t *testing.T) {
),
TypeUrl: clusterType,
})
res.assertEqualVersion(t, "1")
assert.Equal(t, vers, res.VersionInfo)

// verifying that deleting a Service that is not referenced by an HTTPProxy,
// does not trigger a rebuild
rh.OnDelete(s2)
Expand All @@ -1165,11 +1165,11 @@ func TestUnreferencedService(t *testing.T) {
),
TypeUrl: clusterType,
})
res.assertEqualVersion(t, "1")
assert.Equal(t, vers, res.VersionInfo)

// verifying that deleting a Service that is referenced by an HTTPProxy,
// triggers a rebuild
rh.OnDelete(s1)
res = c.Request(clusterType)
res.assertEqualVersion(t, "2")
assert.NotEqual(t, vers, res.VersionInfo)
}
8 changes: 2 additions & 6 deletions internal/featuretests/v3/endpoints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,12 +242,8 @@ func TestEndpointFilter(t *testing.T) {
),
})

c.Request(endpointType, "default/kuard/bar").Equals(&envoy_service_discovery_v3.DiscoveryResponse{
TypeUrl: endpointType,
Resources: resources(t,
envoy_v3.ClusterLoadAssignment("default/kuard/bar"),
),
})
// Nonexistent endpoint shouldn't return anything.
c.Request(endpointType, "default/kuard/bar").Equals(&envoy_service_discovery_v3.DiscoveryResponse{})
}

// issue 602, test that an update from N endpoints
Expand Down
19 changes: 9 additions & 10 deletions internal/featuretests/v3/featuretests.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
envoy_service_route_v3 "github.com/envoyproxy/go-control-plane/envoy/service/route/v3"
envoy_service_secret_v3 "github.com/envoyproxy/go-control-plane/envoy/service/secret/v3"
resource "github.com/envoyproxy/go-control-plane/pkg/resource/v3"
envoy_server_v3 "github.com/envoyproxy/go-control-plane/pkg/server/v3"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -109,6 +110,9 @@ func setup(t *testing.T, opts ...any) (ResourceEventHandlerWrapper, *Contour, fu
}
}

snapshotHandler := xdscache_v3.NewSnapshotHandler(resources, log)
et.SetObserver(snapshotHandler)

registry := prometheus.NewRegistry()

builder := &dag.Builder{
Expand Down Expand Up @@ -150,7 +154,7 @@ func setup(t *testing.T, opts ...any) (ResourceEventHandlerWrapper, *Contour, fu
HoldoffMaxDelay: time.Duration(rand.Intn(500)) * time.Millisecond,
Observer: contour.NewRebuildMetricsObserver(
metrics.NewMetrics(registry),
dag.ComposeObservers(xdscache.ObserversOf(resources)...),
dag.ComposeObservers(append(xdscache.ObserversOf(resources), snapshotHandler)...),
),
Builder: builder,
}, func() bool { return true })
Expand All @@ -159,7 +163,7 @@ func setup(t *testing.T, opts ...any) (ResourceEventHandlerWrapper, *Contour, fu
require.NoError(t, err)

srv := xds.NewServer(registry)
contour_xds_v3.RegisterServer(contour_xds_v3.NewContourServer(log, xdscache.ResourcesOf(resources)...), srv)
contour_xds_v3.RegisterServer(envoy_server_v3.NewServer(context.Background(), snapshotHandler.GetCache(), contour_xds_v3.NewRequestLoggingCallbacks(log)), srv)

var wg sync.WaitGroup
ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -478,15 +482,10 @@ type Response struct {
func (r *Response) Equals(want *envoy_service_discovery_v3.DiscoveryResponse) *Contour {
r.Helper()

sort.Slice(want.Resources, func(i, j int) bool { return string(want.Resources[i].Value) < string(want.Resources[j].Value) })
sort.Slice(r.Resources, func(i, j int) bool { return string(r.Resources[i].Value) < string(r.Resources[j].Value) })

protobuf.RequireEqual(r.T, want.Resources, r.DiscoveryResponse.Resources)

return r.Contour
}

// Equals(...) only checks resources, so explicitly
// check version & nonce here and subsequently.
func (r *Response) assertEqualVersion(t *testing.T, expected string) {
t.Helper()
assert.Equal(t, expected, r.VersionInfo, "got unexpected VersionInfo")
assert.Equal(t, expected, r.Nonce, "got unexpected Nonce")
}
18 changes: 5 additions & 13 deletions internal/featuretests/v3/secrets_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,6 @@ func TestSDSShouldNotIncrementVersionNumberForUnrelatedSecret(t *testing.T) {
rh, c, done := setup(t)
defer done()

assertEqualVersion := func(t *testing.T, expected string, r *Response) {
t.Helper()
assert.Equal(t, expected, r.VersionInfo, "got unexpected VersionInfo")
assert.Equal(t, expected, r.Nonce, "got unexpected Nonce")
}

svc1 := fixture.NewService("backend").
WithPorts(core_v1.ServicePort{Name: "http", Port: 80})

Expand Down Expand Up @@ -131,17 +125,15 @@ func TestSDSShouldNotIncrementVersionNumberForUnrelatedSecret(t *testing.T) {
res.Equals(&envoy_service_discovery_v3.DiscoveryResponse{
Resources: resources(t, secret(s1)),
})
// Equals(...) only checks resources, so explicitly
// check version & nonce here and subsequently.
assertEqualVersion(t, "2", res)
vers := res.VersionInfo

// verify that requesting the same resource without change
// does not bump the current version_info.
res = c.Request(secretType)
res.Equals(&envoy_service_discovery_v3.DiscoveryResponse{
Resources: resources(t, secret(s1)),
})
assertEqualVersion(t, "2", res)
assert.Equal(t, vers, res.VersionInfo)

// s2 is not referenced by any active ingress object.
s2 := &core_v1.Secret{
Expand All @@ -158,7 +150,7 @@ func TestSDSShouldNotIncrementVersionNumberForUnrelatedSecret(t *testing.T) {
res.Equals(&envoy_service_discovery_v3.DiscoveryResponse{
Resources: resources(t, secret(s1)),
})
assertEqualVersion(t, "2", res)
assert.Equal(t, vers, res.VersionInfo)

// Verify that deleting an unreferenced secret does not
// bump the current version_info.
Expand All @@ -167,14 +159,14 @@ func TestSDSShouldNotIncrementVersionNumberForUnrelatedSecret(t *testing.T) {
res.Equals(&envoy_service_discovery_v3.DiscoveryResponse{
Resources: resources(t, secret(s1)),
})
assertEqualVersion(t, "2", res)
assert.Equal(t, vers, res.VersionInfo)

// Verify that deleting a referenced secret does
// bump the current version_info.
rh.OnDelete(s1)
res = c.Request(secretType)
res.Equals(&envoy_service_discovery_v3.DiscoveryResponse{})
assertEqualVersion(t, "3", res)
assert.NotEqual(t, vers, res.VersionInfo)
}

// issue 1169, an invalid certificate should not be
Expand Down
32 changes: 18 additions & 14 deletions internal/xdscache/v3/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,7 @@ func NewSnapshotHandler(resources []xdscache.ResourceCache, log logrus.FieldLogg
edsCache = envoy_cache_v3.NewSnapshotCache(false, &contour_xds_v3.Hash, log.WithField("context", "edsCache"))

mux = &envoy_cache_v3.MuxCache{
Caches: map[string]envoy_cache_v3.Cache{
envoy_resource_v3.ListenerType: defaultCache,
envoy_resource_v3.ClusterType: defaultCache,
envoy_resource_v3.RouteType: defaultCache,
envoy_resource_v3.SecretType: defaultCache,
envoy_resource_v3.RuntimeType: defaultCache,
envoy_resource_v3.EndpointType: edsCache,
},
Caches: map[string]envoy_cache_v3.Cache{},
Classify: func(req *envoy_service_discovery_v3.DiscoveryRequest) string {
return req.GetTypeUrl()
},
Expand All @@ -64,6 +57,14 @@ func NewSnapshotHandler(resources []xdscache.ResourceCache, log logrus.FieldLogg
}
)

for _, resourceCache := range resources {
if typeURL := resourceCache.TypeURL(); typeURL == envoy_resource_v3.EndpointType {
mux.Caches[typeURL] = edsCache
} else {
mux.Caches[typeURL] = defaultCache
}
}

sh := &SnapshotHandler{
resources: parseResources(resources),
defaultCache: defaultCache,
Expand Down Expand Up @@ -114,12 +115,15 @@ func (s *SnapshotHandler) OnChange(*dag.DAG) {
version := uuid.NewString()

// Convert caches to envoy xDS Resources.
resources := map[envoy_resource_v3.Type][]envoy_types.Resource{
envoy_resource_v3.ClusterType: asResources(s.resources[envoy_resource_v3.ClusterType].Contents()),
envoy_resource_v3.RouteType: asResources(s.resources[envoy_resource_v3.RouteType].Contents()),
envoy_resource_v3.ListenerType: asResources(s.resources[envoy_resource_v3.ListenerType].Contents()),
envoy_resource_v3.SecretType: asResources(s.resources[envoy_resource_v3.SecretType].Contents()),
envoy_resource_v3.RuntimeType: asResources(s.resources[envoy_resource_v3.RuntimeType].Contents()),
resources := map[envoy_resource_v3.Type][]envoy_types.Resource{}

for resourceType, resourceCache := range s.resources {
// Endpoints use their own cache.
if resourceType == envoy_resource_v3.EndpointType {
continue
}

resources[resourceType] = asResources(resourceCache.Contents())
}

snapshot, err := envoy_cache_v3.NewSnapshot(version, resources)
Expand Down

0 comments on commit bc301a1

Please sign in to comment.