diff --git a/internal/datasourcev2/polling_data_source.go b/internal/datasourcev2/polling_data_source.go index d81837ad..0f19e86d 100644 --- a/internal/datasourcev2/polling_data_source.go +++ b/internal/datasourcev2/polling_data_source.go @@ -90,7 +90,7 @@ func (pp *PollingProcessor) Fetch(_ context.Context) (*subsystems.Basis, error) } //nolint:revive // DataSynchronizer method. -func (pp *PollingProcessor) Sync(closeWhenReady chan<- struct{}, _ fdv2proto.Selector) { +func (pp *PollingProcessor) Sync(closeWhenReady chan<- struct{}) { pp.loggers.Infof("Starting LaunchDarkly polling with interval: %+v", pp.pollInterval) ticker := newTickerWithInitialTick(pp.pollInterval) diff --git a/internal/datasourcev2/streaming_data_source.go b/internal/datasourcev2/streaming_data_source.go index 2b573947..23698c13 100644 --- a/internal/datasourcev2/streaming_data_source.go +++ b/internal/datasourcev2/streaming_data_source.go @@ -126,9 +126,9 @@ func (sp *StreamProcessor) IsInitialized() bool { } //nolint:revive // DataSynchronizer method. -func (sp *StreamProcessor) Sync(closeWhenReady chan<- struct{}, selector fdv2proto.Selector) { +func (sp *StreamProcessor) Sync(closeWhenReady chan<- struct{}) { sp.loggers.Info("Starting LaunchDarkly streaming connection") - go sp.subscribe(closeWhenReady, selector) + go sp.subscribe(closeWhenReady) } //nolint:gocyclo @@ -304,7 +304,7 @@ func (sp *StreamProcessor) consumeStream(stream *es.Stream, closeWhenReady chan< } } -func (sp *StreamProcessor) subscribe(closeWhenReady chan<- struct{}, _ fdv2proto.Selector) { +func (sp *StreamProcessor) subscribe(closeWhenReady chan<- struct{}) { path := endpoints.AddPath(sp.cfg.URI, endpoints.StreamingRequestPath) req, reqErr := http.NewRequest("GET", path, nil) if reqErr != nil { diff --git a/internal/datasystem/fdv2_datasystem.go b/internal/datasystem/fdv2_datasystem.go index 77e57005..281f6da1 100644 --- a/internal/datasystem/fdv2_datasystem.go +++ b/internal/datasystem/fdv2_datasystem.go @@ -6,8 +6,6 @@ import ( "sync" "time" - "github.com/launchdarkly/go-server-sdk/v7/internal/fdv2proto" - "github.com/launchdarkly/go-sdk-common/v3/ldlog" "github.com/launchdarkly/go-server-sdk/v7/interfaces" "github.com/launchdarkly/go-server-sdk/v7/internal" @@ -159,7 +157,7 @@ func (f *FDv2) hasDataSources() bool { } func (f *FDv2) run(ctx context.Context, closeWhenReady chan struct{}) { - selector := f.runInitializers(ctx, closeWhenReady) + f.runInitializers(ctx, closeWhenReady) if f.hasDataSources() && f.dataStoreStatusProvider.IsStatusMonitoringEnabled() { f.launchTask(func() { @@ -167,7 +165,7 @@ func (f *FDv2) run(ctx context.Context, closeWhenReady chan struct{}) { }) } - f.runSynchronizers(ctx, closeWhenReady, selector) + f.runSynchronizers(ctx, closeWhenReady) } func (f *FDv2) runPersistentStoreOutageRecovery(ctx context.Context, statuses <-chan interfaces.DataStoreStatus) { @@ -189,12 +187,12 @@ func (f *FDv2) runPersistentStoreOutageRecovery(ctx context.Context, statuses <- } } -func (f *FDv2) runInitializers(ctx context.Context, closeWhenReady chan struct{}) fdv2proto.Selector { +func (f *FDv2) runInitializers(ctx context.Context, closeWhenReady chan struct{}) { for _, initializer := range f.initializers { f.loggers.Infof("Attempting to initialize via %s", initializer.Name()) basis, err := initializer.Fetch(ctx) if errors.Is(err, context.Canceled) { - return fdv2proto.NoSelector() + return } if err != nil { f.loggers.Warnf("Initializer %s failed: %v", initializer.Name(), err) @@ -205,12 +203,11 @@ func (f *FDv2) runInitializers(ctx context.Context, closeWhenReady chan struct{} f.readyOnce.Do(func() { close(closeWhenReady) }) - return basis.Selector + return } - return fdv2proto.NoSelector() } -func (f *FDv2) runSynchronizers(ctx context.Context, closeWhenReady chan struct{}, selector fdv2proto.Selector) { +func (f *FDv2) runSynchronizers(ctx context.Context, closeWhenReady chan struct{}) { // If the SDK was configured with no synchronizer, then (assuming no initializer succeeded), we should // trigger the ready signal to let the call to MakeClient unblock immediately. if f.primarySync == nil { @@ -224,7 +221,7 @@ func (f *FDv2) runSynchronizers(ctx context.Context, closeWhenReady chan struct{ // Instead, create a "proxy" channel just for the data source; if that is closed, we close the real one // using the sync.Once. ready := make(chan struct{}) - f.primarySync.Sync(ready, selector) + f.primarySync.Sync(ready) for { select { diff --git a/subsystems/data_source.go b/subsystems/data_source.go index f2f1bc34..c9f878b8 100644 --- a/subsystems/data_source.go +++ b/subsystems/data_source.go @@ -48,9 +48,8 @@ type DataInitializer interface { // DataSynchronizer represents a component capable of obtaining a Basis and subsequent delta updates asynchronously. type DataSynchronizer interface { DataInitializer - // Sync tells the data synchronizer to begin synchronizing data, starting from an optional fdv2proto.Selector. - // The selector may be nil indicating that a full Basis should be fetched. - Sync(closeWhenReady chan<- struct{}, selector fdv2proto.Selector) + // Sync tells the data synchronizer to begin synchronizing data. + Sync(closeWhenReady chan<- struct{}) // IsInitialized returns true if the data source has successfully initialized at some point. // // Once this is true, it should remain true even if a problem occurs later.