From 17df32e5cb68dddd160bdf7796de8b0c1c1e766f Mon Sep 17 00:00:00 2001 From: John Murret Date: Thu, 9 May 2024 12:55:13 -0600 Subject: [PATCH] NET-9084 - add tests to peering endpoint and blockingquery package to assert blocking works properly. (#21078) --- agent/blockingquery/blockingquery.go | 6 + agent/blockingquery/blockingquery_test.go | 337 ++++++++++++++++++++- agent/blockingquery/mock_FSMServer.go | 122 ++++++++ agent/blockingquery/mock_RequestOptions.go | 94 ++++++ agent/blockingquery/mock_ResponseMeta.go | 62 ++++ agent/consul/rpc_test.go | 216 +------------ agent/peering_endpoint_test.go | 41 +++ 7 files changed, 662 insertions(+), 216 deletions(-) create mode 100644 agent/blockingquery/mock_FSMServer.go create mode 100644 agent/blockingquery/mock_RequestOptions.go create mode 100644 agent/blockingquery/mock_ResponseMeta.go diff --git a/agent/blockingquery/blockingquery.go b/agent/blockingquery/blockingquery.go index 3e073a1ffab2..32f170cc1ce8 100644 --- a/agent/blockingquery/blockingquery.go +++ b/agent/blockingquery/blockingquery.go @@ -28,6 +28,8 @@ type QueryFn func(memdb.WatchSet, *state.Store) error // RequestOptions are options used by Server.blockingQuery to modify the // behaviour of the query operation, or to populate response metadata. +// +//go:generate mockery --name RequestOptions --inpackage type RequestOptions interface { GetToken() string GetMinQueryIndex() uint64 @@ -37,6 +39,8 @@ type RequestOptions interface { // ResponseMeta is an interface used to populate the response struct // with metadata about the query and the state of the server. +// +//go:generate mockery --name ResponseMeta --inpackage type ResponseMeta interface { SetLastContact(time.Duration) SetKnownLeader(bool) @@ -47,6 +51,8 @@ type ResponseMeta interface { // FSMServer is interface into the stateful components of a Consul server, such // as memdb or raft leadership. +// +//go:generate mockery --name FSMServer --inpackage type FSMServer interface { ConsistentRead() error DecrementBlockingQueries() uint64 diff --git a/agent/blockingquery/blockingquery_test.go b/agent/blockingquery/blockingquery_test.go index 5861ed399164..494cc41b42ad 100644 --- a/agent/blockingquery/blockingquery_test.go +++ b/agent/blockingquery/blockingquery_test.go @@ -3,5 +3,338 @@ package blockingquery -// TODO: move tests from the consul package, rpc_test.go, TestServer_blockingQuery -// here using mock for FSMServer w/ structs.QueryOptions and structs.QueryOptions +import ( + "fmt" + "github.com/hashicorp/consul/agent/consul/state" + "github.com/hashicorp/go-memdb" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "testing" + "time" +) + +func TestServer_blockingQuery(t *testing.T) { + t.Parallel() + getFSM := func(t *testing.T, additionalCfgFunc func(mockFSM *MockFSMServer)) *MockFSMServer { + fsm := NewMockFSMServer(t) + testCh := make(chan struct{}) + tombstoneGC, err := state.NewTombstoneGC(time.Second, time.Second) + require.NoError(t, err) + store := state.NewStateStore(tombstoneGC) + fsm.On("GetShutdownChannel").Return(testCh) + fsm.On("GetState").Return(store) + fsm.On("SetQueryMeta", mock.Anything, mock.Anything).Return(nil) + if additionalCfgFunc != nil { + additionalCfgFunc(fsm) + } + return fsm + } + + getOpts := func(t *testing.T, additionalCfgFunc func(options *MockRequestOptions)) *MockRequestOptions { + requestOpts := NewMockRequestOptions(t) + requestOpts.On("GetRequireConsistent").Return(false) + requestOpts.On("GetToken").Return("fake-token") + if additionalCfgFunc != nil { + additionalCfgFunc(requestOpts) + } + return requestOpts + } + + getMeta := func(t *testing.T, additionalCfgFunc func(mockMeta *MockResponseMeta)) *MockResponseMeta { + meta := NewMockResponseMeta(t) + if additionalCfgFunc != nil { + additionalCfgFunc(meta) + } + return meta + } + + // Perform a non-blocking query. Note that it's significant that the meta has + // a zero index in response - the implied opts.MinQueryIndex is also zero but + // this should not block still. + t.Run("non-blocking query", func(t *testing.T) { + var calls int + fn := func(_ memdb.WatchSet, _ *state.Store) error { + calls++ + return nil + } + err := Query(getFSM(t, nil), getOpts(t, func(mockOpts *MockRequestOptions) { + mockOpts.On("GetMinQueryIndex").Return(uint64(0)) + }), getMeta(t, nil), fn) + require.NoError(t, err) + require.Equal(t, 1, calls) + }) + + // Perform a blocking query that gets woken up and loops around once. + t.Run("blocking query - single loop", func(t *testing.T) { + opts := getOpts(t, func(options *MockRequestOptions) { + options.On("GetMinQueryIndex").Return(uint64(1)) + options.On("GetMaxQueryTime").Return(1*time.Second, nil) + }) + + meta := getMeta(t, func(mockMeta *MockResponseMeta) { + mockMeta.On("GetIndex").Return(uint64(1)) + }) + + fsm := getFSM(t, func(mockFSM *MockFSMServer) { + mockFSM.On("RPCQueryTimeout", mock.Anything).Return(1 * time.Second) + mockFSM.On("IncrementBlockingQueries").Return(uint64(1)) + mockFSM.On("DecrementBlockingQueries").Return(uint64(1)) + }) + + var calls int + fn := func(ws memdb.WatchSet, _ *state.Store) error { + if calls == 0 { + meta.On("GetIndex").Return(uint64(3)) + + fakeCh := make(chan struct{}) + close(fakeCh) + ws.Add(fakeCh) + } else { + meta.On("GetIndex").Return(uint64(4)) + } + calls++ + return nil + } + err := Query(fsm, opts, meta, fn) + require.NoError(t, err) + require.Equal(t, 2, calls) + }) + + // Perform a blocking query that returns a zero index from blocking func (e.g. + // no state yet). This should still return an empty response immediately, but + // with index of 1 and then block on the next attempt. In one sense zero index + // is not really a valid response from a state method that is not an error but + // in practice a lot of state store operations do return it unless they + // explicitly special checks to turn 0 into 1. Often this is not caught or + // covered by tests but eventually when hit in the wild causes blocking + // clients to busy loop and burn CPU. This test ensure that blockingQuery + // systematically does the right thing to prevent future bugs like that. + t.Run("blocking query with 0 modifyIndex from state func", func(t *testing.T) { + opts := getOpts(t, func(options *MockRequestOptions) { + options.On("GetMinQueryIndex").Return(uint64(0)) + }) + + meta := getMeta(t, func(mockMeta *MockResponseMeta) { + mockMeta.On("GetIndex").Return(uint64(1)) + }) + + fsm := getFSM(t, func(mockFSM *MockFSMServer) { + mockFSM.On("RPCQueryTimeout", mock.Anything).Return(1 * time.Second) + mockFSM.On("IncrementBlockingQueries").Return(uint64(1)) + mockFSM.On("DecrementBlockingQueries").Return(uint64(1)) + }) + var calls int + fn := func(ws memdb.WatchSet, _ *state.Store) error { + if opts.GetMinQueryIndex() > 0 { + // If client requested blocking, block forever. This is simulating + // waiting for the watched resource to be initialized/written to giving + // it a non-zero index. Note the timeout on the query options is relied + // on to stop the test taking forever. + fakeCh := make(chan struct{}) + ws.Add(fakeCh) + } + meta.On("GetIndex").Return(uint64(0)) + calls++ + return nil + } + err := Query(fsm, opts, meta, fn) + require.NoError(t, err) + require.Equal(t, 1, calls) + require.Equal(t, uint64(1), meta.GetIndex(), + "expect fake index of 1 to force client to block on next update") + + // Simulate client making next request + opts = getOpts(t, func(options *MockRequestOptions) { + options.On("GetMinQueryIndex").Return(uint64(1)) + options.On("GetMaxQueryTime").Return(20*time.Millisecond, nil) + }) + + // This time we should block even though the func returns index 0 still + t0 := time.Now() + require.NoError(t, Query(fsm, opts, meta, fn)) + t1 := time.Now() + require.Equal(t, 2, calls) + require.Equal(t, uint64(1), meta.GetIndex(), + "expect fake index of 1 to force client to block on next update") + require.True(t, t1.Sub(t0) > 20*time.Millisecond, + "should have actually blocked waiting for timeout") + + }) + + // Perform a query that blocks and gets interrupted when the state store + // is abandoned. + t.Run("blocking query interrupted by abandonCh", func(t *testing.T) { + opts := getOpts(t, func(options *MockRequestOptions) { + options.On("GetMinQueryIndex").Return(uint64(3)) + options.On("GetMaxQueryTime").Return(20*time.Millisecond, nil) + }) + + meta := getMeta(t, func(mockMeta *MockResponseMeta) { + mockMeta.On("GetIndex").Return(uint64(1)) + }) + + fsm := getFSM(t, func(mockFSM *MockFSMServer) { + mockFSM.On("RPCQueryTimeout", mock.Anything).Return(1 * time.Second) + mockFSM.On("IncrementBlockingQueries").Return(uint64(1)) + mockFSM.On("DecrementBlockingQueries").Return(uint64(1)) + }) + + var calls int + fn := func(_ memdb.WatchSet, _ *state.Store) error { + if calls == 0 { + meta.On("GetIndex").Return(uint64(1)) + + fsm.GetState().Abandon() + } + calls++ + return nil + } + err := Query(fsm, opts, meta, fn) + require.NoError(t, err) + require.Equal(t, 1, calls) + }) + + t.Run("non-blocking query for item that does not exist", func(t *testing.T) { + opts := getOpts(t, func(options *MockRequestOptions) { + options.On("GetMinQueryIndex").Return(uint64(3)) + options.On("GetMaxQueryTime").Return(20*time.Millisecond, nil) + }) + + meta := getMeta(t, func(mockMeta *MockResponseMeta) { + mockMeta.On("GetIndex").Return(uint64(1)) + }) + + fsm := getFSM(t, func(mockFSM *MockFSMServer) { + mockFSM.On("RPCQueryTimeout", mock.Anything).Return(1 * time.Second) + mockFSM.On("IncrementBlockingQueries").Return(uint64(1)) + mockFSM.On("DecrementBlockingQueries").Return(uint64(1)) + }) + calls := 0 + fn := func(_ memdb.WatchSet, _ *state.Store) error { + calls++ + return ErrNotFound + } + + err := Query(fsm, opts, meta, fn) + require.NoError(t, err) + require.Equal(t, 1, calls) + }) + + t.Run("blocking query for item that does not exist", func(t *testing.T) { + opts := getOpts(t, func(options *MockRequestOptions) { + options.On("GetMinQueryIndex").Return(uint64(3)) + options.On("GetMaxQueryTime").Return(100*time.Millisecond, nil) + }) + + meta := getMeta(t, func(mockMeta *MockResponseMeta) { + mockMeta.On("GetIndex").Return(uint64(1)) + }) + + fsm := getFSM(t, func(mockFSM *MockFSMServer) { + mockFSM.On("RPCQueryTimeout", mock.Anything).Return(1 * time.Second) + mockFSM.On("IncrementBlockingQueries").Return(uint64(1)) + mockFSM.On("DecrementBlockingQueries").Return(uint64(1)) + }) + calls := 0 + fn := func(ws memdb.WatchSet, _ *state.Store) error { + calls++ + if calls == 1 { + meta.On("GetIndex").Return(uint64(3)) + + ch := make(chan struct{}) + close(ch) + ws.Add(ch) + return ErrNotFound + } + meta.On("GetIndex").Return(uint64(5)) + return ErrNotFound + } + + err := Query(fsm, opts, meta, fn) + require.NoError(t, err) + require.Equal(t, 2, calls) + }) + + t.Run("blocking query for item that existed and is removed", func(t *testing.T) { + opts := getOpts(t, func(options *MockRequestOptions) { + options.On("GetMinQueryIndex").Return(uint64(3)) + // this query taks 1.002 sceonds locally so setting the timeout to 2 seconds + options.On("GetMaxQueryTime").Return(2*time.Second, nil) + }) + + meta := getMeta(t, func(mockMeta *MockResponseMeta) { + mockMeta.On("GetIndex").Return(uint64(3)) + }) + + fsm := getFSM(t, func(mockFSM *MockFSMServer) { + mockFSM.On("RPCQueryTimeout", mock.Anything).Return(1 * time.Second) + mockFSM.On("IncrementBlockingQueries").Return(uint64(1)) + mockFSM.On("DecrementBlockingQueries").Return(uint64(1)) + }) + calls := 0 + fn := func(ws memdb.WatchSet, _ *state.Store) error { + calls++ + if calls == 1 { + + ch := make(chan struct{}) + close(ch) + ws.Add(ch) + return nil + } + meta = getMeta(t, func(mockMeta *MockResponseMeta) { + meta.On("GetIndex").Return(uint64(5)) + }) + return ErrNotFound + } + + start := time.Now() + require.NoError(t, Query(fsm, opts, meta, fn)) + queryDuration := time.Since(start) + maxQueryDuration, err := opts.GetMaxQueryTime() + require.NoError(t, err) + require.True(t, queryDuration < maxQueryDuration, fmt.Sprintf("query timed out - queryDuration: %v, maxQueryDuration: %v", queryDuration, maxQueryDuration)) + require.NoError(t, err) + require.Equal(t, 2, calls) + }) + + t.Run("blocking query for non-existent item that is created", func(t *testing.T) { + opts := getOpts(t, func(options *MockRequestOptions) { + options.On("GetMinQueryIndex").Return(uint64(3)) + // this query taks 1.002 sceonds locally so setting the timeout to 2 seconds + options.On("GetMaxQueryTime").Return(2*time.Second, nil) + }) + + meta := getMeta(t, func(mockMeta *MockResponseMeta) { + mockMeta.On("GetIndex").Return(uint64(3)) + }) + + fsm := getFSM(t, func(mockFSM *MockFSMServer) { + mockFSM.On("RPCQueryTimeout", mock.Anything).Return(1 * time.Second) + mockFSM.On("IncrementBlockingQueries").Return(uint64(1)) + mockFSM.On("DecrementBlockingQueries").Return(uint64(1)) + }) + calls := 0 + fn := func(ws memdb.WatchSet, _ *state.Store) error { + calls++ + if calls == 1 { + ch := make(chan struct{}) + close(ch) + ws.Add(ch) + return ErrNotFound + } + meta = getMeta(t, func(mockMeta *MockResponseMeta) { + meta.On("GetIndex").Return(uint64(5)) + }) + return nil + } + + start := time.Now() + require.NoError(t, Query(fsm, opts, meta, fn)) + queryDuration := time.Since(start) + maxQueryDuration, err := opts.GetMaxQueryTime() + require.NoError(t, err) + require.True(t, queryDuration < maxQueryDuration, fmt.Sprintf("query timed out - queryDuration: %v, maxQueryDuration: %v", queryDuration, maxQueryDuration)) + require.NoError(t, err) + require.Equal(t, 2, calls) + }) +} diff --git a/agent/blockingquery/mock_FSMServer.go b/agent/blockingquery/mock_FSMServer.go new file mode 100644 index 000000000000..1e62f391b8dd --- /dev/null +++ b/agent/blockingquery/mock_FSMServer.go @@ -0,0 +1,122 @@ +// Code generated by mockery v2.32.4. DO NOT EDIT. + +package blockingquery + +import ( + time "time" + + state "github.com/hashicorp/consul/agent/consul/state" + mock "github.com/stretchr/testify/mock" +) + +// MockFSMServer is an autogenerated mock type for the FSMServer type +type MockFSMServer struct { + mock.Mock +} + +// ConsistentRead provides a mock function with given fields: +func (_m *MockFSMServer) ConsistentRead() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// DecrementBlockingQueries provides a mock function with given fields: +func (_m *MockFSMServer) DecrementBlockingQueries() uint64 { + ret := _m.Called() + + var r0 uint64 + if rf, ok := ret.Get(0).(func() uint64); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(uint64) + } + + return r0 +} + +// GetShutdownChannel provides a mock function with given fields: +func (_m *MockFSMServer) GetShutdownChannel() chan struct{} { + ret := _m.Called() + + var r0 chan struct{} + if rf, ok := ret.Get(0).(func() chan struct{}); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(chan struct{}) + } + } + + return r0 +} + +// GetState provides a mock function with given fields: +func (_m *MockFSMServer) GetState() *state.Store { + ret := _m.Called() + + var r0 *state.Store + if rf, ok := ret.Get(0).(func() *state.Store); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*state.Store) + } + } + + return r0 +} + +// IncrementBlockingQueries provides a mock function with given fields: +func (_m *MockFSMServer) IncrementBlockingQueries() uint64 { + ret := _m.Called() + + var r0 uint64 + if rf, ok := ret.Get(0).(func() uint64); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(uint64) + } + + return r0 +} + +// RPCQueryTimeout provides a mock function with given fields: _a0 +func (_m *MockFSMServer) RPCQueryTimeout(_a0 time.Duration) time.Duration { + ret := _m.Called(_a0) + + var r0 time.Duration + if rf, ok := ret.Get(0).(func(time.Duration) time.Duration); ok { + r0 = rf(_a0) + } else { + r0 = ret.Get(0).(time.Duration) + } + + return r0 +} + +// SetQueryMeta provides a mock function with given fields: _a0, _a1 +func (_m *MockFSMServer) SetQueryMeta(_a0 ResponseMeta, _a1 string) { + _m.Called(_a0, _a1) +} + +// NewMockFSMServer creates a new instance of MockFSMServer. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockFSMServer(t interface { + mock.TestingT + Cleanup(func()) +}) *MockFSMServer { + mock := &MockFSMServer{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/agent/blockingquery/mock_RequestOptions.go b/agent/blockingquery/mock_RequestOptions.go new file mode 100644 index 000000000000..7e57c86b36de --- /dev/null +++ b/agent/blockingquery/mock_RequestOptions.go @@ -0,0 +1,94 @@ +// Code generated by mockery v2.32.4. DO NOT EDIT. + +package blockingquery + +import ( + time "time" + + mock "github.com/stretchr/testify/mock" +) + +// MockRequestOptions is an autogenerated mock type for the RequestOptions type +type MockRequestOptions struct { + mock.Mock +} + +// GetMaxQueryTime provides a mock function with given fields: +func (_m *MockRequestOptions) GetMaxQueryTime() (time.Duration, error) { + ret := _m.Called() + + var r0 time.Duration + var r1 error + if rf, ok := ret.Get(0).(func() (time.Duration, error)); ok { + return rf() + } + if rf, ok := ret.Get(0).(func() time.Duration); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(time.Duration) + } + + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// GetMinQueryIndex provides a mock function with given fields: +func (_m *MockRequestOptions) GetMinQueryIndex() uint64 { + ret := _m.Called() + + var r0 uint64 + if rf, ok := ret.Get(0).(func() uint64); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(uint64) + } + + return r0 +} + +// GetRequireConsistent provides a mock function with given fields: +func (_m *MockRequestOptions) GetRequireConsistent() bool { + ret := _m.Called() + + var r0 bool + if rf, ok := ret.Get(0).(func() bool); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + +// GetToken provides a mock function with given fields: +func (_m *MockRequestOptions) GetToken() string { + ret := _m.Called() + + var r0 string + if rf, ok := ret.Get(0).(func() string); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(string) + } + + return r0 +} + +// NewMockRequestOptions creates a new instance of MockRequestOptions. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockRequestOptions(t interface { + mock.TestingT + Cleanup(func()) +}) *MockRequestOptions { + mock := &MockRequestOptions{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/agent/blockingquery/mock_ResponseMeta.go b/agent/blockingquery/mock_ResponseMeta.go new file mode 100644 index 000000000000..c038b4bcd5a4 --- /dev/null +++ b/agent/blockingquery/mock_ResponseMeta.go @@ -0,0 +1,62 @@ +// Code generated by mockery v2.32.4. DO NOT EDIT. + +package blockingquery + +import ( + time "time" + + mock "github.com/stretchr/testify/mock" +) + +// MockResponseMeta is an autogenerated mock type for the ResponseMeta type +type MockResponseMeta struct { + mock.Mock +} + +// GetIndex provides a mock function with given fields: +func (_m *MockResponseMeta) GetIndex() uint64 { + ret := _m.Called() + + var r0 uint64 + if rf, ok := ret.Get(0).(func() uint64); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(uint64) + } + + return r0 +} + +// SetIndex provides a mock function with given fields: _a0 +func (_m *MockResponseMeta) SetIndex(_a0 uint64) { + _m.Called(_a0) +} + +// SetKnownLeader provides a mock function with given fields: _a0 +func (_m *MockResponseMeta) SetKnownLeader(_a0 bool) { + _m.Called(_a0) +} + +// SetLastContact provides a mock function with given fields: _a0 +func (_m *MockResponseMeta) SetLastContact(_a0 time.Duration) { + _m.Called(_a0) +} + +// SetResultsFilteredByACLs provides a mock function with given fields: _a0 +func (_m *MockResponseMeta) SetResultsFilteredByACLs(_a0 bool) { + _m.Called(_a0) +} + +// NewMockResponseMeta creates a new instance of MockResponseMeta. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockResponseMeta(t interface { + mock.TestingT + Cleanup(func()) +}) *MockResponseMeta { + mock := &MockResponseMeta{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/agent/consul/rpc_test.go b/agent/consul/rpc_test.go index 39351c98ca92..11dbeef5db54 100644 --- a/agent/consul/rpc_test.go +++ b/agent/consul/rpc_test.go @@ -24,7 +24,6 @@ import ( "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-memdb" "github.com/hashicorp/raft" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "google.golang.org/grpc" @@ -232,136 +231,12 @@ func (m *MockSink) Close() error { return nil } +// TestServer_blockingQuery tests authenticated and unauthenticated calls. The +// other blocking query tests reside in blockingquery_test.go in the blockingquery package. func TestServer_blockingQuery(t *testing.T) { t.Parallel() _, s := testServerWithConfig(t) - // Perform a non-blocking query. Note that it's significant that the meta has - // a zero index in response - the implied opts.MinQueryIndex is also zero but - // this should not block still. - t.Run("non-blocking query", func(t *testing.T) { - var opts structs.QueryOptions - var meta structs.QueryMeta - var calls int - fn := func(_ memdb.WatchSet, _ *state.Store) error { - calls++ - return nil - } - err := s.blockingQuery(&opts, &meta, fn) - require.NoError(t, err) - require.Equal(t, 1, calls) - }) - - // Perform a blocking query that gets woken up and loops around once. - t.Run("blocking query - single loop", func(t *testing.T) { - opts := structs.QueryOptions{ - MinQueryIndex: 3, - } - var meta structs.QueryMeta - var calls int - fn := func(ws memdb.WatchSet, _ *state.Store) error { - if calls == 0 { - meta.Index = 3 - - fakeCh := make(chan struct{}) - close(fakeCh) - ws.Add(fakeCh) - } else { - meta.Index = 4 - } - calls++ - return nil - } - err := s.blockingQuery(&opts, &meta, fn) - require.NoError(t, err) - require.Equal(t, 2, calls) - }) - - // Perform a blocking query that returns a zero index from blocking func (e.g. - // no state yet). This should still return an empty response immediately, but - // with index of 1 and then block on the next attempt. In one sense zero index - // is not really a valid response from a state method that is not an error but - // in practice a lot of state store operations do return it unless they - // explicitly special checks to turn 0 into 1. Often this is not caught or - // covered by tests but eventually when hit in the wild causes blocking - // clients to busy loop and burn CPU. This test ensure that blockingQuery - // systematically does the right thing to prevent future bugs like that. - t.Run("blocking query with 0 modifyIndex from state func", func(t *testing.T) { - opts := structs.QueryOptions{ - MinQueryIndex: 0, - } - var meta structs.QueryMeta - var calls int - fn := func(ws memdb.WatchSet, _ *state.Store) error { - if opts.MinQueryIndex > 0 { - // If client requested blocking, block forever. This is simulating - // waiting for the watched resource to be initialized/written to giving - // it a non-zero index. Note the timeout on the query options is relied - // on to stop the test taking forever. - fakeCh := make(chan struct{}) - ws.Add(fakeCh) - } - meta.Index = 0 - calls++ - return nil - } - require.NoError(t, s.blockingQuery(&opts, &meta, fn)) - assert.Equal(t, 1, calls) - assert.Equal(t, uint64(1), meta.Index, - "expect fake index of 1 to force client to block on next update") - - // Simulate client making next request - opts.MinQueryIndex = 1 - opts.MaxQueryTime = 20 * time.Millisecond // Don't wait too long - - // This time we should block even though the func returns index 0 still - t0 := time.Now() - require.NoError(t, s.blockingQuery(&opts, &meta, fn)) - t1 := time.Now() - assert.Equal(t, 2, calls) - assert.Equal(t, uint64(1), meta.Index, - "expect fake index of 1 to force client to block on next update") - assert.True(t, t1.Sub(t0) > 20*time.Millisecond, - "should have actually blocked waiting for timeout") - - }) - - // Perform a query that blocks and gets interrupted when the state store - // is abandoned. - t.Run("blocking query interrupted by abandonCh", func(t *testing.T) { - opts := structs.QueryOptions{ - MinQueryIndex: 3, - } - var meta structs.QueryMeta - var calls int - fn := func(_ memdb.WatchSet, _ *state.Store) error { - if calls == 0 { - meta.Index = 3 - - snap, err := s.fsm.Snapshot() - if err != nil { - t.Fatalf("err: %v", err) - } - defer snap.Release() - - buf := bytes.NewBuffer(nil) - sink := &MockSink{buf, false} - if err := snap.Persist(sink); err != nil { - t.Fatalf("err: %v", err) - } - - if err := s.fsm.Restore(sink); err != nil { - t.Fatalf("err: %v", err) - } - } - calls++ - return nil - } - err := s.blockingQuery(&opts, &meta, fn) - require.NoError(t, err) - require.Equal(t, 1, calls) - }) - t.Run("ResultsFilteredByACLs is reset for unauthenticated calls", func(t *testing.T) { opts := structs.QueryOptions{ Token: "", @@ -394,93 +269,6 @@ func TestServer_blockingQuery(t *testing.T) { require.NoError(t, err) require.True(t, meta.ResultsFilteredByACLs, "ResultsFilteredByACLs should be honored for authenticated calls") }) - - t.Run("non-blocking query for item that does not exist", func(t *testing.T) { - opts := structs.QueryOptions{} - meta := structs.QueryMeta{} - calls := 0 - fn := func(_ memdb.WatchSet, _ *state.Store) error { - calls++ - return errNotFound - } - - err := s.blockingQuery(&opts, &meta, fn) - require.NoError(t, err) - require.Equal(t, 1, calls) - }) - - t.Run("blocking query for item that does not exist", func(t *testing.T) { - opts := structs.QueryOptions{MinQueryIndex: 3, MaxQueryTime: 100 * time.Millisecond} - meta := structs.QueryMeta{} - calls := 0 - fn := func(ws memdb.WatchSet, _ *state.Store) error { - calls++ - if calls == 1 { - meta.Index = 3 - - ch := make(chan struct{}) - close(ch) - ws.Add(ch) - return errNotFound - } - meta.Index = 5 - return errNotFound - } - - err := s.blockingQuery(&opts, &meta, fn) - require.NoError(t, err) - require.Equal(t, 2, calls) - }) - - t.Run("blocking query for item that existed and is removed", func(t *testing.T) { - opts := structs.QueryOptions{MinQueryIndex: 3, MaxQueryTime: 100 * time.Millisecond} - meta := structs.QueryMeta{} - calls := 0 - fn := func(ws memdb.WatchSet, _ *state.Store) error { - calls++ - if calls == 1 { - meta.Index = 3 - - ch := make(chan struct{}) - close(ch) - ws.Add(ch) - return nil - } - meta.Index = 5 - return errNotFound - } - - start := time.Now() - err := s.blockingQuery(&opts, &meta, fn) - require.True(t, time.Since(start) < opts.MaxQueryTime, "query timed out") - require.NoError(t, err) - require.Equal(t, 2, calls) - }) - - t.Run("blocking query for non-existent item that is created", func(t *testing.T) { - opts := structs.QueryOptions{MinQueryIndex: 3, MaxQueryTime: 100 * time.Millisecond} - meta := structs.QueryMeta{} - calls := 0 - fn := func(ws memdb.WatchSet, _ *state.Store) error { - calls++ - if calls == 1 { - meta.Index = 3 - - ch := make(chan struct{}) - close(ch) - ws.Add(ch) - return errNotFound - } - meta.Index = 5 - return nil - } - - start := time.Now() - err := s.blockingQuery(&opts, &meta, fn) - require.True(t, time.Since(start) < opts.MaxQueryTime, "query timed out") - require.NoError(t, err) - require.Equal(t, 2, calls) - }) } func TestRPC_ReadyForConsistentReads(t *testing.T) { diff --git a/agent/peering_endpoint_test.go b/agent/peering_endpoint_test.go index ba3b704b8b5a..925033f3342f 100644 --- a/agent/peering_endpoint_test.go +++ b/agent/peering_endpoint_test.go @@ -556,6 +556,8 @@ func TestHTTP_Peering_Read(t *testing.T) { _, err = a.rpcClientPeering.PeeringWrite(ctx, bar) require.NoError(t, err) + var lastIndex uint64 + t.Run("return foo", func(t *testing.T) { req, err := http.NewRequest("GET", "/v1/peering/foo", nil) require.NoError(t, err) @@ -578,6 +580,8 @@ func TestHTTP_Peering_Read(t *testing.T) { require.Equal(t, 0, len(apiResp.StreamStatus.ImportedServices)) require.Equal(t, 0, len(apiResp.StreamStatus.ExportedServices)) + + lastIndex = getIndex(t, resp) }) t.Run("not found", func(t *testing.T) { @@ -588,6 +592,43 @@ func TestHTTP_Peering_Read(t *testing.T) { require.Equal(t, http.StatusNotFound, resp.Code) require.Equal(t, "Peering not found for \"baz\"", resp.Body.String()) }) + + const timeout = 5 * time.Second + t.Run("read blocking query result", func(t *testing.T) { + var ( + // out and resp are not safe to read until reading from errCh + out api.Peering + resp = httptest.NewRecorder() + errCh = make(chan error, 1) + ) + go func() { + url := fmt.Sprintf("/v1/peering/foo?index=%d&wait=%s", lastIndex, timeout) + req, err := http.NewRequest("GET", url, nil) + if err != nil { + errCh <- err + return + } + + a.srv.h.ServeHTTP(resp, req) + require.Equal(t, http.StatusOK, resp.Code) + err = json.NewDecoder(resp.Body).Decode(&out) + errCh <- err + }() + + time.Sleep(200 * time.Millisecond) + + // update peering + foo.Peering.Meta["spooky-key"] = "boo!" + _, err = a.rpcClientPeering.PeeringWrite(ctx, foo) + require.NoError(t, err) + + if err := <-errCh; err != nil { + require.NoError(t, err) + } + + require.Equal(t, "boo!", out.Meta["spooky-key"]) + require.Equal(t, "blocking-query", resp.Header().Get("X-Consul-Query-Backend")) + }) } func TestHTTP_Peering_Delete(t *testing.T) {