Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Remove unused selector parameter #228

Merged
merged 2 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/datasourcev2/polling_data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (pp *PollingProcessor) Fetch(_ context.Context) (*subsystems.Basis, error)
}

//nolint:revive // DataSynchronizer method.
func (pp *PollingProcessor) Sync(closeWhenReady chan<- struct{}, _ fdv2proto.Selector) {
func (pp *PollingProcessor) Sync(closeWhenReady chan<- struct{}) {
pp.loggers.Infof("Starting LaunchDarkly polling with interval: %+v", pp.pollInterval)

ticker := newTickerWithInitialTick(pp.pollInterval)
Expand Down
6 changes: 3 additions & 3 deletions internal/datasourcev2/streaming_data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,9 @@ func (sp *StreamProcessor) IsInitialized() bool {
}

//nolint:revive // DataSynchronizer method.
func (sp *StreamProcessor) Sync(closeWhenReady chan<- struct{}, selector fdv2proto.Selector) {
func (sp *StreamProcessor) Sync(closeWhenReady chan<- struct{}) {
sp.loggers.Info("Starting LaunchDarkly streaming connection")
go sp.subscribe(closeWhenReady, selector)
go sp.subscribe(closeWhenReady)
}

//nolint:gocyclo
Expand Down Expand Up @@ -304,7 +304,7 @@ func (sp *StreamProcessor) consumeStream(stream *es.Stream, closeWhenReady chan<
}
}

func (sp *StreamProcessor) subscribe(closeWhenReady chan<- struct{}, _ fdv2proto.Selector) {
func (sp *StreamProcessor) subscribe(closeWhenReady chan<- struct{}) {
path := endpoints.AddPath(sp.cfg.URI, endpoints.StreamingRequestPath)
req, reqErr := http.NewRequest("GET", path, nil)
if reqErr != nil {
Expand Down
17 changes: 7 additions & 10 deletions internal/datasystem/fdv2_datasystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import (
"sync"
"time"

"github.com/launchdarkly/go-server-sdk/v7/internal/fdv2proto"

"github.com/launchdarkly/go-sdk-common/v3/ldlog"
"github.com/launchdarkly/go-server-sdk/v7/interfaces"
"github.com/launchdarkly/go-server-sdk/v7/internal"
Expand Down Expand Up @@ -159,15 +157,15 @@ func (f *FDv2) hasDataSources() bool {
}

func (f *FDv2) run(ctx context.Context, closeWhenReady chan struct{}) {
selector := f.runInitializers(ctx, closeWhenReady)
f.runInitializers(ctx, closeWhenReady)

if f.hasDataSources() && f.dataStoreStatusProvider.IsStatusMonitoringEnabled() {
f.launchTask(func() {
f.runPersistentStoreOutageRecovery(ctx, f.dataStoreStatusProvider.AddStatusListener())
})
}

f.runSynchronizers(ctx, closeWhenReady, selector)
f.runSynchronizers(ctx, closeWhenReady)
}

func (f *FDv2) runPersistentStoreOutageRecovery(ctx context.Context, statuses <-chan interfaces.DataStoreStatus) {
Expand All @@ -189,12 +187,12 @@ func (f *FDv2) runPersistentStoreOutageRecovery(ctx context.Context, statuses <-
}
}

func (f *FDv2) runInitializers(ctx context.Context, closeWhenReady chan struct{}) fdv2proto.Selector {
func (f *FDv2) runInitializers(ctx context.Context, closeWhenReady chan struct{}) {
for _, initializer := range f.initializers {
f.loggers.Infof("Attempting to initialize via %s", initializer.Name())
basis, err := initializer.Fetch(ctx)
if errors.Is(err, context.Canceled) {
return fdv2proto.NoSelector()
return
}
if err != nil {
f.loggers.Warnf("Initializer %s failed: %v", initializer.Name(), err)
Expand All @@ -205,12 +203,11 @@ func (f *FDv2) runInitializers(ctx context.Context, closeWhenReady chan struct{}
f.readyOnce.Do(func() {
close(closeWhenReady)
})
return basis.Selector
return
}
return fdv2proto.NoSelector()
}

func (f *FDv2) runSynchronizers(ctx context.Context, closeWhenReady chan struct{}, selector fdv2proto.Selector) {
func (f *FDv2) runSynchronizers(ctx context.Context, closeWhenReady chan struct{}) {
// If the SDK was configured with no synchronizer, then (assuming no initializer succeeded), we should
// trigger the ready signal to let the call to MakeClient unblock immediately.
if f.primarySync == nil {
Expand All @@ -224,7 +221,7 @@ func (f *FDv2) runSynchronizers(ctx context.Context, closeWhenReady chan struct{
// Instead, create a "proxy" channel just for the data source; if that is closed, we close the real one
// using the sync.Once.
ready := make(chan struct{})
f.primarySync.Sync(ready, selector)
f.primarySync.Sync(ready)

for {
select {
Expand Down
5 changes: 2 additions & 3 deletions subsystems/data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,8 @@ type DataInitializer interface {
// DataSynchronizer represents a component capable of obtaining a Basis and subsequent delta updates asynchronously.
type DataSynchronizer interface {
DataInitializer
// Sync tells the data synchronizer to begin synchronizing data, starting from an optional fdv2proto.Selector.
// The selector may be nil indicating that a full Basis should be fetched.
Sync(closeWhenReady chan<- struct{}, selector fdv2proto.Selector)
// Sync tells the data synchronizer to begin synchronizing data.
Sync(closeWhenReady chan<- struct{})
// IsInitialized returns true if the data source has successfully initialized at some point.
//
// Once this is true, it should remain true even if a problem occurs later.
Expand Down
Loading