From ec295eb2c9961122faf057afc88540454d55d02d Mon Sep 17 00:00:00 2001 From: Alec Holmes Date: Wed, 26 May 2021 16:01:51 -0400 Subject: [PATCH 1/2] fixed integration test for ads Signed-off-by: Alec Holmes --- pkg/server/v3/delta_test.go | 79 +++++++++++++++++++++++++++++++++ pkg/server/v3/server.go | 4 +- sample/bootstrap-delta-ads.yaml | 16 +------ 3 files changed, 82 insertions(+), 17 deletions(-) diff --git a/pkg/server/v3/delta_test.go b/pkg/server/v3/delta_test.go index 3fe805e6a2..74e84a52ce 100644 --- a/pkg/server/v3/delta_test.go +++ b/pkg/server/v3/delta_test.go @@ -379,3 +379,82 @@ func TestDeltaAggregatedHandlers(t *testing.T) { } } } + +func TestDeltaAggregateRequestType(t *testing.T) { + config := makeMockConfigWatcher() + s := server.NewServer(context.Background(), config, server.CallbackFuncs{}) + resp := makeMockDeltaStream(t) + resp.recv <- &discovery.DeltaDiscoveryRequest{Node: node} + if err := s.DeltaAggregatedResources(resp); err == nil { + t.Error("DeltaAggregatedResources() => got nil, want an error") + } +} + +func TestDeltaCancellations(t *testing.T) { + config := makeMockConfigWatcher() + resp := makeMockDeltaStream(t) + for _, typ := range testTypes { + resp.recv <- &discovery.DeltaDiscoveryRequest{ + Node: node, + TypeUrl: typ, + } + } + close(resp.recv) + s := server.NewServer(context.Background(), config, server.CallbackFuncs{}) + if err := s.DeltaAggregatedResources(resp); err != nil { + t.Errorf("DeltaAggregatedResources() => got %v, want no error", err) + } + if config.watches != 0 { + t.Errorf("Expect all watches cancelled, got %q", config.watches) + } +} + +func TestDeltaOpaqueRequestsChannelMuxing(t *testing.T) { + config := makeMockConfigWatcher() + resp := makeMockDeltaStream(t) + for i := 0; i < 10; i++ { + resp.recv <- &discovery.DeltaDiscoveryRequest{ + Node: node, + TypeUrl: fmt.Sprintf("%s%d", opaqueType, i%2), + // each subsequent request is assumed to supercede the previous request + ResourceNamesSubscribe: []string{fmt.Sprintf("%d", i)}, + } + } + close(resp.recv) + s := server.NewServer(context.Background(), config, server.CallbackFuncs{}) + if err := s.DeltaAggregatedResources(resp); err != nil { + t.Errorf("DeltaAggregatedResources() => got %v, want no error", err) + } + if config.watches != 0 { + t.Errorf("Expect all watches cancelled, got %q", config.watches) + } +} + +func TestDeltaCallbackError(t *testing.T) { + for _, typ := range testTypes { + t.Run(typ, func(t *testing.T) { + config := makeMockConfigWatcher() + config.deltaResponses = makeDeltaResponses() + + s := server.NewServer(context.Background(), config, server.CallbackFuncs{ + DeltaStreamOpenFunc: func(ctx context.Context, i int64, s string) error { + return errors.New("stream open error") + }, + }) + + // make a request + resp := makeMockDeltaStream(t) + resp.recv <- &discovery.DeltaDiscoveryRequest{ + Node: node, + TypeUrl: typ, + } + + // check that response fails since stream open returns error + if err := s.DeltaAggregatedResources(resp); err == nil { + t.Error("Stream() => got no error, want error") + } + + close(resp.recv) + }) + } +} diff --git a/pkg/server/v3/server.go b/pkg/server/v3/server.go index 92807ef722..b0b21602a1 100644 --- a/pkg/server/v3/server.go +++ b/pkg/server/v3/server.go @@ -95,7 +95,7 @@ func (c CallbackFuncs) OnStreamClosed(streamID int64) { // OnDeltaStreamOpen invokes DeltaStreamOpenFunc. func (c CallbackFuncs) OnDeltaStreamOpen(ctx context.Context, streamID int64, typeURL string) error { - if c.StreamOpenFunc != nil { + if c.DeltaStreamOpenFunc != nil { return c.DeltaStreamOpenFunc(ctx, streamID, typeURL) } @@ -104,7 +104,7 @@ func (c CallbackFuncs) OnDeltaStreamOpen(ctx context.Context, streamID int64, ty // OnDeltaStreamClosed invokes DeltaStreamClosedFunc. func (c CallbackFuncs) OnDeltaStreamClosed(streamID int64) { - if c.StreamClosedFunc != nil { + if c.DeltaStreamClosedFunc != nil { c.DeltaStreamClosedFunc(streamID) } } diff --git a/sample/bootstrap-delta-ads.yaml b/sample/bootstrap-delta-ads.yaml index 785da1df14..62e9f8e9c9 100644 --- a/sample/bootstrap-delta-ads.yaml +++ b/sample/bootstrap-delta-ads.yaml @@ -35,18 +35,4 @@ static_resources: address: 127.0.0.1 port_value: 18000 http2_protocol_options: {} - name: xds_cluster -layered_runtime: - layers: - - name: runtime-0 - rtds_layer: - rtds_config: - resource_api_version: V3 - ads: {} - name: runtime-0 - - name: runtime-1 - rtds_layer: - rtds_config: - resource_api_version: V3 - ads: {} - name: runtime-1 + name: xds_cluster \ No newline at end of file From 988ee8fb918cccd35e5177ce3fd29a6d44ed245a Mon Sep 17 00:00:00 2001 From: Alec Holmes Date: Thu, 27 May 2021 15:37:02 -0400 Subject: [PATCH 2/2] removed comment Signed-off-by: Alec Holmes --- pkg/server/v3/delta_test.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/pkg/server/v3/delta_test.go b/pkg/server/v3/delta_test.go index 74e84a52ce..94d1229c25 100644 --- a/pkg/server/v3/delta_test.go +++ b/pkg/server/v3/delta_test.go @@ -414,9 +414,8 @@ func TestDeltaOpaqueRequestsChannelMuxing(t *testing.T) { resp := makeMockDeltaStream(t) for i := 0; i < 10; i++ { resp.recv <- &discovery.DeltaDiscoveryRequest{ - Node: node, - TypeUrl: fmt.Sprintf("%s%d", opaqueType, i%2), - // each subsequent request is assumed to supercede the previous request + Node: node, + TypeUrl: fmt.Sprintf("%s%d", opaqueType, i%2), ResourceNamesSubscribe: []string{fmt.Sprintf("%d", i)}, } }