Skip to content

Commit

Permalink
fix: Send updated selector on streaming reconnect
Browse files Browse the repository at this point in the history
  • Loading branch information
keelerm84 committed Dec 18, 2024
1 parent ed311e0 commit 2eb8d86
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 11 deletions.
21 changes: 13 additions & 8 deletions internal/datasourcev2/streaming_data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,9 +306,6 @@ func (sp *StreamProcessor) consumeStream(stream *es.Stream, closeWhenReady chan<

func (sp *StreamProcessor) subscribe(closeWhenReady chan<- struct{}, selector fdv2proto.Selector) {
path := endpoints.AddPath(sp.cfg.URI, endpoints.StreamingRequestPath)
if selector.IsDefined() {
path = path + "?basis=" + selector.State()
}
req, reqErr := http.NewRequest("GET", path, nil)
if reqErr != nil {
sp.loggers.Errorf(
Expand All @@ -324,11 +321,7 @@ func (sp *StreamProcessor) subscribe(closeWhenReady chan<- struct{}, selector fd
close(closeWhenReady)
return
}
if sp.cfg.FilterKey != "" {
req.URL.RawQuery = url.Values{
"filter": {sp.cfg.FilterKey},
}.Encode()
}

if sp.headers != nil {
req.Header = maps.Clone(sp.headers)
}
Expand Down Expand Up @@ -384,6 +377,18 @@ func (sp *StreamProcessor) subscribe(closeWhenReady chan<- struct{}, selector fd
}

stream, err := es.SubscribeWithRequestAndOptions(req,
es.StreamOptionDynamicQueryParams(func() url.Values {

Check failure on line 380 in internal/datasourcev2/streaming_data_source.go

View workflow job for this annotation

GitHub Actions / ldai Linux, Go 1.18

undefined: es.StreamOptionDynamicQueryParams (typecheck)

Check failure on line 380 in internal/datasourcev2/streaming_data_source.go

View workflow job for this annotation

GitHub Actions / ldai Linux, Go 1.23

undefined: es.StreamOptionDynamicQueryParams (typecheck)

Check failure on line 380 in internal/datasourcev2/streaming_data_source.go

View workflow job for this annotation

GitHub Actions / ldai Linux, Go 1.23

undefined: es.StreamOptionDynamicQueryParams) (typecheck)

Check failure on line 380 in internal/datasourcev2/streaming_data_source.go

View workflow job for this annotation

GitHub Actions / ldai Linux, Go 1.22

undefined: es.StreamOptionDynamicQueryParams (typecheck)

Check failure on line 380 in internal/datasourcev2/streaming_data_source.go

View workflow job for this annotation

GitHub Actions / ldai Linux, Go 1.22

undefined: es.StreamOptionDynamicQueryParams) (typecheck)

Check failure on line 380 in internal/datasourcev2/streaming_data_source.go

View workflow job for this annotation

GitHub Actions / Linux, Go 1.18 / Benchmarks

undefined: eventsource.StreamOptionDynamicQueryParams

Check failure on line 380 in internal/datasourcev2/streaming_data_source.go

View workflow job for this annotation

GitHub Actions / ldotel Linux, Go 1.18

undefined: es.StreamOptionDynamicQueryParams (typecheck)

Check failure on line 380 in internal/datasourcev2/streaming_data_source.go

View workflow job for this annotation

GitHub Actions / ldotel Linux, Go 1.22

undefined: es.StreamOptionDynamicQueryParams (typecheck)

Check failure on line 380 in internal/datasourcev2/streaming_data_source.go

View workflow job for this annotation

GitHub Actions / ldotel Linux, Go 1.22

undefined: es.StreamOptionDynamicQueryParams) (typecheck)

Check failure on line 380 in internal/datasourcev2/streaming_data_source.go

View workflow job for this annotation

GitHub Actions / Linux, Go 1.18 / Unit Tests and Coverage

undefined: es.StreamOptionDynamicQueryParams (typecheck)

Check failure on line 380 in internal/datasourcev2/streaming_data_source.go

View workflow job for this annotation

GitHub Actions / ldotel Linux, Go 1.23

undefined: es.StreamOptionDynamicQueryParams (typecheck)

Check failure on line 380 in internal/datasourcev2/streaming_data_source.go

View workflow job for this annotation

GitHub Actions / ldotel Linux, Go 1.23

undefined: es.StreamOptionDynamicQueryParams) (typecheck)

Check failure on line 380 in internal/datasourcev2/streaming_data_source.go

View workflow job for this annotation

GitHub Actions / Linux, Go 1.22 / Unit Tests and Coverage

undefined: es.StreamOptionDynamicQueryParams (typecheck)

Check failure on line 380 in internal/datasourcev2/streaming_data_source.go

View workflow job for this annotation

GitHub Actions / Linux, Go 1.22 / Unit Tests and Coverage

undefined: es.StreamOptionDynamicQueryParams) (typecheck)

Check failure on line 380 in internal/datasourcev2/streaming_data_source.go

View workflow job for this annotation

GitHub Actions / Linux, Go 1.23 / Unit Tests and Coverage

undefined: es.StreamOptionDynamicQueryParams (typecheck)

Check failure on line 380 in internal/datasourcev2/streaming_data_source.go

View workflow job for this annotation

GitHub Actions / Linux, Go 1.23 / Unit Tests and Coverage

undefined: es.StreamOptionDynamicQueryParams) (typecheck)

Check failure on line 380 in internal/datasourcev2/streaming_data_source.go

View workflow job for this annotation

GitHub Actions / Linux, Go 1.22 / Contract Tests

undefined: es.StreamOptionDynamicQueryParams

Check failure on line 380 in internal/datasourcev2/streaming_data_source.go

View workflow job for this annotation

GitHub Actions / Linux, Go 1.18 / Contract Tests

undefined: eventsource.StreamOptionDynamicQueryParams

Check failure on line 380 in internal/datasourcev2/streaming_data_source.go

View workflow job for this annotation

GitHub Actions / Linux, Go 1.23 / Contract Tests

undefined: es.StreamOptionDynamicQueryParams

Check failure on line 380 in internal/datasourcev2/streaming_data_source.go

View workflow job for this annotation

GitHub Actions / Windows, Go 1.18

undefined: eventsource.StreamOptionDynamicQueryParams

Check failure on line 380 in internal/datasourcev2/streaming_data_source.go

View workflow job for this annotation

GitHub Actions / Windows, Go 1.22

undefined: es.StreamOptionDynamicQueryParams

Check failure on line 380 in internal/datasourcev2/streaming_data_source.go

View workflow job for this annotation

GitHub Actions / Windows, Go 1.23

undefined: es.StreamOptionDynamicQueryParams

Check failure on line 380 in internal/datasourcev2/streaming_data_source.go

View workflow job for this annotation

GitHub Actions / Linux, Go 1.22 / Benchmarks

undefined: es.StreamOptionDynamicQueryParams

Check failure on line 380 in internal/datasourcev2/streaming_data_source.go

View workflow job for this annotation

GitHub Actions / Linux, Go 1.23 / Benchmarks

undefined: es.StreamOptionDynamicQueryParams
values := url.Values{}
if selector := sp.dataDestination.Selector(); selector.IsDefined() {
values.Set("basis", selector.State())
}

if sp.cfg.FilterKey != "" {
values.Set("filter", sp.cfg.FilterKey)
}

return values
}),
es.StreamOptionHTTPClient(sp.client),
es.StreamOptionReadTimeout(streamReadTimeout),
es.StreamOptionInitialRetry(initialRetryDelay),
Expand Down
24 changes: 21 additions & 3 deletions internal/sharedtest/mocks/mock_data_destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type MockDataDestination struct {
Statuses chan interfaces.DataSourceStatus
dataStoreStatusProvider *mockDataStoreStatusProvider
lastStatus interfaces.DataSourceStatus
lastKnownSelector fdv2proto.Selector
lock sync.Mutex
}

Expand All @@ -40,11 +41,18 @@ func NewMockDataDestination(realStore subsystems.DataStore) *MockDataDestination
DataStore: dataStore,
Statuses: make(chan interfaces.DataSourceStatus, 10),
dataStoreStatusProvider: dataStoreStatusProvider,
lastKnownSelector: fdv2proto.NoSelector(),
}
}

func (d *MockDataDestination) Selector() fdv2proto.Selector {
d.lock.Lock()
defer d.lock.Unlock()
return d.lastKnownSelector
}

// SetBasis in this test implementation, delegates to d.DataStore.CapturedUpdates.
func (d *MockDataDestination) SetBasis(events []fdv2proto.Change, _ fdv2proto.Selector, _ bool) {
func (d *MockDataDestination) SetBasis(events []fdv2proto.Change, selector fdv2proto.Selector, _ bool) {
// For now, the selector is ignored. When the data sources start making use of it, it should be
// stored so that assertions can be made.

Expand All @@ -56,11 +64,17 @@ func (d *MockDataDestination) SetBasis(events []fdv2proto.Change, _ fdv2proto.Se
for _, coll := range collections {
AssertNotNil(coll.Kind)
}
_ = d.DataStore.Init(toposort.Sort(collections))

if err := d.DataStore.Init(toposort.Sort(collections)); err == nil {
d.lock.Lock()
d.lastKnownSelector = selector
d.lock.Unlock()
}

}

// ApplyDelta in this test implementation, delegates to d.DataStore.CapturedUpdates.
func (d *MockDataDestination) ApplyDelta(events []fdv2proto.Change, _ fdv2proto.Selector, _ bool) {
func (d *MockDataDestination) ApplyDelta(events []fdv2proto.Change, selector fdv2proto.Selector, _ bool) {
// For now, the selector is ignored. When the data sources start making use of it, it should be
// stored so that assertions can be made.

Expand All @@ -80,6 +94,10 @@ func (d *MockDataDestination) ApplyDelta(events []fdv2proto.Change, _ fdv2proto.
}
}
}

d.lock.Lock()
d.lastKnownSelector = selector
d.lock.Unlock()
}

// UpdateStatus in this test implementation, pushes a value onto the Statuses channel.
Expand Down
3 changes: 3 additions & 0 deletions subsystems/data_destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ import (
// Do not use it.
// You have been warned.
type DataDestination interface {
// Selector returns the last known selector for the data store.
Selector() fdv2proto.Selector

// SetBasis defines a new basis for the data store. This means the store must
// be emptied of any existing data before applying the events. This operation should be
// atomic with respect to any other operations that modify the store.
Expand Down

0 comments on commit 2eb8d86

Please sign in to comment.