From bb30d44c7e1a218712714685a6c00b383bc23186 Mon Sep 17 00:00:00 2001 From: Max Englander Date: Fri, 21 Feb 2025 16:40:09 -0500 Subject: [PATCH 1/2] go/vt/discovery: configurable logger Signed-off-by: Max Englander --- go/vt/discovery/discovery_options.go | 67 ++++++++++++++++++++++++++ go/vt/discovery/healthcheck.go | 54 +++++++++++++-------- go/vt/discovery/tablet_health_check.go | 10 ++-- go/vt/discovery/topology_watcher.go | 40 +++++++++++---- 4 files changed, 139 insertions(+), 32 deletions(-) create mode 100644 go/vt/discovery/discovery_options.go diff --git a/go/vt/discovery/discovery_options.go b/go/vt/discovery/discovery_options.go new file mode 100644 index 00000000000..c5a2b1762d3 --- /dev/null +++ b/go/vt/discovery/discovery_options.go @@ -0,0 +1,67 @@ +/* +Copyright 2025 The Vitess Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package discovery + +import ( + "vitess.io/vitess/go/vt/logutil" +) + +// discoveryOptions configure a discovery components. discoveryOptions are set +// by the DiscoveryOption values passed to the component constructors. +type discoveryOptions struct { + logger logutil.Logger +} + +// DiscoveryOption configures how we perform certain operations. +type DiscoveryOption interface { + apply(*discoveryOptions) +} + +// funcDiscoveryOption wraps a function that modifies discoveryOptions into +// an implementation of the DiscoveryOption interface. +type funcDiscoveryOption struct { + f func(*discoveryOptions) +} + +func defaultOptions() discoveryOptions { + return discoveryOptions{ + logger: logutil.NewConsoleLogger(), + } +} + +func withOptions(dos ...DiscoveryOption) discoveryOptions { + os := defaultOptions() + for _, do := range dos { + do.apply(&os) + } + return os +} + +func (fhco *funcDiscoveryOption) apply(dos *discoveryOptions) { + fhco.f(dos) +} + +func newFuncDiscoveryOption(f func(*discoveryOptions)) *funcDiscoveryOption { + return &funcDiscoveryOption{ + f: f, + } +} + +// WithLogger accepts a custom logger to use in a discovery component. If this +// option is not provided then the default system logger will be used. +func WithLogger(l logutil.Logger) DiscoveryOption { + return newFuncDiscoveryOption(func(o *discoveryOptions) { + o.logger = l + }) +} diff --git a/go/vt/discovery/healthcheck.go b/go/vt/discovery/healthcheck.go index 5734749b167..55e49a6bc64 100644 --- a/go/vt/discovery/healthcheck.go +++ b/go/vt/discovery/healthcheck.go @@ -52,6 +52,7 @@ import ( "vitess.io/vitess/go/netutil" "vitess.io/vitess/go/stats" "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/logutil" "vitess.io/vitess/go/vt/proto/query" "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/proto/vtrpc" @@ -188,8 +189,10 @@ func FilteringKeyspaces() bool { return len(KeyspacesToWatch) > 0 } -type KeyspaceShardTabletType string -type tabletAliasString string +type ( + KeyspaceShardTabletType string + tabletAliasString string +) // HealthCheck declares what the TabletGateway needs from the HealthCheck type HealthCheck interface { @@ -299,6 +302,9 @@ type HealthCheckImpl struct { subscribers map[chan *TabletHealth]struct{} // loadTablets trigger is used to immediately load a new primary tablet when the current one has been demoted loadTabletsTrigger chan struct{} + // options contains optional settings used to modify HealthCheckImpl + // behavior. + options discoveryOptions } // NewVTGateHealthCheckFilters returns healthcheck filters for vtgate. @@ -350,9 +356,9 @@ func NewVTGateHealthCheckFilters() (filters TabletFilters, err error) { // filters. // // Is one or more filters to apply when determining what tablets we want to stream healthchecks from. -func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Duration, topoServer *topo.Server, localCell, cellsToWatch string, filters TabletFilter) *HealthCheckImpl { - log.Infof("loading tablets for cells: %v", cellsToWatch) - +func NewHealthCheck( + ctx context.Context, retryDelay, healthCheckTimeout time.Duration, topoServer *topo.Server, localCell, cellsToWatch string, filters TabletFilter, opts ...DiscoveryOption, +) *HealthCheckImpl { hc := &HealthCheckImpl{ ts: topoServer, cell: localCell, @@ -364,7 +370,11 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur subscribers: make(map[chan *TabletHealth]struct{}), cellAliases: make(map[string]string), loadTabletsTrigger: make(chan struct{}, 1), + options: withOptions(opts...), } + + hc.logger().Infof("loading tablets for cells: %v", cellsToWatch) + var topoWatchers []*TopologyWatcher cells := strings.Split(cellsToWatch, ",") if cellsToWatch == "" { @@ -372,11 +382,11 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur } for _, c := range cells { - log.Infof("Setting up healthcheck for cell: %v", c) + hc.logger().Infof("Setting up healthcheck for cell: %v", c) if c == "" { continue } - topoWatchers = append(topoWatchers, NewTopologyWatcher(ctx, topoServer, hc, filters, c, refreshInterval, refreshKnownTablets)) + topoWatchers = append(topoWatchers, NewTopologyWatcher(ctx, topoServer, hc, filters, c, refreshInterval, refreshKnownTablets, opts...)) } hc.topoWatchers = topoWatchers @@ -401,7 +411,7 @@ func (hc *HealthCheckImpl) AddTablet(tablet *topodata.Tablet) { return } - log.Infof("Adding tablet to healthcheck: %v", tablet) + hc.logger().Infof("Adding tablet to healthcheck: %v", tablet) hc.mu.Lock() defer hc.mu.Unlock() if hc.healthByAlias == nil { @@ -419,6 +429,7 @@ func (hc *HealthCheckImpl) AddTablet(tablet *topodata.Tablet) { cancelFunc: cancelFunc, Tablet: tablet, Target: target, + logger: hc.logger(), } // add to our datastore @@ -426,7 +437,7 @@ func (hc *HealthCheckImpl) AddTablet(tablet *topodata.Tablet) { tabletAlias := topoproto.TabletAliasString(tablet.Alias) if _, ok := hc.healthByAlias[tabletAliasString(tabletAlias)]; ok { // We should not add a tablet that we already have - log.Errorf("Program bug: tried to add existing tablet: %v to healthcheck", tabletAlias) + hc.logger().Errorf("Program bug: tried to add existing tablet: %v to healthcheck", tabletAlias) return } hc.healthByAlias[tabletAliasString(tabletAlias)] = thc @@ -454,7 +465,7 @@ func (hc *HealthCheckImpl) ReplaceTablet(old, new *topodata.Tablet) { } func (hc *HealthCheckImpl) deleteTablet(tablet *topodata.Tablet) { - log.Infof("Removing tablet from healthcheck: %v", tablet) + hc.logger().Infof("Removing tablet from healthcheck: %v", tablet) hc.mu.Lock() defer hc.mu.Unlock() @@ -497,7 +508,7 @@ func (hc *HealthCheckImpl) deleteTablet(tablet *topodata.Tablet) { // delete from authoritative map th, ok := hc.healthByAlias[tabletAlias] if !ok { - log.Infof("We have no health data for tablet: %v, it might have been deleted already", tablet) + hc.logger().Infof("We have no health data for tablet: %v, it might have been deleted already", tablet) return } // Calling this will end the context associated with th.checkConn, @@ -516,7 +527,7 @@ func (hc *HealthCheckImpl) updateHealth(th *TabletHealth, prevTarget *query.Targ // so that we're not racing to update it and in effect re-adding a copy of the // tablet record that was deleted if _, ok := hc.healthByAlias[tabletAlias]; !ok { - log.Infof("Tablet %v has been deleted, skipping health update", th.Tablet) + hc.logger().Infof("Tablet %v has been deleted, skipping health update", th.Tablet) return } @@ -541,7 +552,7 @@ func (hc *HealthCheckImpl) updateHealth(th *TabletHealth, prevTarget *query.Targ // causing an interruption where no primary is assigned to the shard. if prevTarget.TabletType == topodata.TabletType_PRIMARY { if primaries := hc.healthData[oldTargetKey]; len(primaries) == 0 { - log.Infof("We will have no health data for the next new primary tablet after demoting the tablet: %v, so start loading tablets now", topotools.TabletIdent(th.Tablet)) + hc.logger().Infof("We will have no health data for the next new primary tablet after demoting the tablet: %v, so start loading tablets now", topotools.TabletIdent(th.Tablet)) // We want to trigger a loadTablets call, but if the channel is not empty // then a trigger is already scheduled, we don't need to trigger another one. // This also prevents the code from deadlocking as described in https://github.com/vitessio/vitess/issues/16994. @@ -567,7 +578,7 @@ func (hc *HealthCheckImpl) updateHealth(th *TabletHealth, prevTarget *query.Targ // We already have one up server, see if we // need to replace it. if th.PrimaryTermStartTime < hc.healthy[targetKey][0].PrimaryTermStartTime { - log.Warningf("not marking healthy primary %s as Up for %s because its PrimaryTermStartTime is smaller than the highest known timestamp from previous PRIMARYs %s: %d < %d ", + hc.logger().Warningf("not marking healthy primary %s as Up for %s because its PrimaryTermStartTime is smaller than the highest known timestamp from previous PRIMARYs %s: %d < %d ", topoproto.TabletAliasString(th.Tablet.Alias), topoproto.KeyspaceShardString(th.Target.Keyspace, th.Target.Shard), topoproto.TabletAliasString(hc.healthy[targetKey][0].Tablet.Alias), @@ -604,7 +615,7 @@ func (hc *HealthCheckImpl) updateHealth(th *TabletHealth, prevTarget *query.Targ isNewPrimary := isPrimary && prevTarget.TabletType != topodata.TabletType_PRIMARY if isNewPrimary { - log.Errorf("Adding 1 to PrimaryPromoted counter for target: %v, tablet: %v, tabletType: %v", prevTarget, topoproto.TabletAliasString(th.Tablet.Alias), th.Target.TabletType) + hc.logger().Errorf("Adding 1 to PrimaryPromoted counter for target: %v, tablet: %v, tabletType: %v", prevTarget, topoproto.TabletAliasString(th.Tablet.Alias), th.Target.TabletType) hcPrimaryPromotedCounters.Add([]string{th.Target.Keyspace, th.Target.Shard}, 1) } @@ -656,7 +667,7 @@ func (hc *HealthCheckImpl) broadcast(th *TabletHealth) { default: // If the channel is full, we drop the message. hcChannelFullCounter.Add(1) - log.Warningf("HealthCheck broadcast channel is full, dropping message for %s", topotools.TabletIdent(th.Tablet)) + hc.logger().Warningf("HealthCheck broadcast channel is full, dropping message for %s", topotools.TabletIdent(th.Tablet)) } } } @@ -837,7 +848,7 @@ func (hc *HealthCheckImpl) waitForTablets(ctx context.Context, targets []*query. timer.Stop() for _, target := range targets { if target != nil { - log.Infof("couldn't find tablets for target: %v", target) + hc.logger().Infof("couldn't find tablets for target: %v", target) } } return ctx.Err() @@ -966,7 +977,7 @@ func (hc *HealthCheckImpl) ServeHTTP(w http.ResponseWriter, _ *http.Request) { if err != nil { // Error logged if _, err := w.Write([]byte(err.Error())); err != nil { - log.Errorf("write to buffer error failed: %v", err) + hc.logger().Errorf("write to buffer error failed: %v", err) } return @@ -977,7 +988,7 @@ func (hc *HealthCheckImpl) ServeHTTP(w http.ResponseWriter, _ *http.Request) { // Error logged if _, err := w.Write(buf.Bytes()); err != nil { - log.Errorf("write to buffer bytes failed: %v", err) + hc.logger().Errorf("write to buffer bytes failed: %v", err) } } @@ -1018,6 +1029,11 @@ func (hc *HealthCheckImpl) stateChecksum() int64 { return int64(crc32.ChecksumIEEE(buf.Bytes())) } +// logger returns the logutil.Logger used by the healthcheck. +func (hc *HealthCheckImpl) logger() logutil.Logger { + return hc.options.logger +} + // TabletToMapKey creates a key to the map from tablet's host and ports. // It should only be used in discovery and related module. func TabletToMapKey(tablet *topodata.Tablet) string { diff --git a/go/vt/discovery/tablet_health_check.go b/go/vt/discovery/tablet_health_check.go index ecadeefdf78..4c6e569cfc5 100644 --- a/go/vt/discovery/tablet_health_check.go +++ b/go/vt/discovery/tablet_health_check.go @@ -25,7 +25,7 @@ import ( "time" "vitess.io/vitess/go/vt/grpcclient" - "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/logutil" "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/topotools" @@ -71,6 +71,8 @@ type tabletHealthCheck struct { // possibly delete both these loggedServingState bool lastResponseTimestamp time.Time // timestamp of the last healthcheck response + // logger is used to log messages. + logger logutil.Logger } // String is defined because we want to print a []*tabletHealthCheck array nicely. @@ -107,7 +109,7 @@ func (thc *tabletHealthCheck) setServingState(serving bool, reason string) { if !thc.loggedServingState || (serving != thc.Serving) { // Emit the log from a separate goroutine to avoid holding // the th lock while logging is happening - log.Infof("HealthCheckUpdate(Serving State): tablet: %v serving %v => %v for %v/%v (%v) reason: %s", + thc.logger.Infof("HealthCheckUpdate(Serving State): tablet: %v serving %v => %v for %v/%v (%v) reason: %s", topotools.TabletIdent(thc.Tablet), thc.Serving, serving, @@ -294,7 +296,7 @@ func (thc *tabletHealthCheck) checkConn(hc *HealthCheckImpl) { // the healthcheck cache again via the topology watcher. // WARNING: Under no other circumstances should we be deleting the tablet here. if strings.Contains(err.Error(), "health stats mismatch") { - log.Warningf("deleting tablet %v from healthcheck due to health stats mismatch", thc.Tablet) + thc.logger.Warningf("deleting tablet %v from healthcheck due to health stats mismatch", thc.Tablet) hc.deleteTablet(thc.Tablet) return } @@ -331,7 +333,7 @@ func (thc *tabletHealthCheck) checkConn(hc *HealthCheckImpl) { } func (thc *tabletHealthCheck) closeConnection(ctx context.Context, err error) { - log.Warningf("tablet %v healthcheck stream error: %v", thc.Tablet, err) + thc.logger.Warningf("tablet %v healthcheck stream error: %v", thc.Tablet, err) thc.setServingState(false, err.Error()) thc.LastError = err _ = thc.Conn.Close(ctx) diff --git a/go/vt/discovery/topology_watcher.go b/go/vt/discovery/topology_watcher.go index d1e358e1aa5..279cfb9146f 100644 --- a/go/vt/discovery/topology_watcher.go +++ b/go/vt/discovery/topology_watcher.go @@ -29,7 +29,7 @@ import ( "vitess.io/vitess/go/stats" "vitess.io/vitess/go/trace" "vitess.io/vitess/go/vt/key" - "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/logutil" topodatapb "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/topoproto" @@ -84,11 +84,16 @@ type TopologyWatcher struct { firstLoadDone bool // firstLoadChan is closed when the initial load of topology data is complete. firstLoadChan chan struct{} + // options contains optional settings used to modify HealthCheckImpl + // behavior. + options discoveryOptions } // NewTopologyWatcher returns a TopologyWatcher that monitors all // the tablets in a cell, and reloads them as needed. -func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, hc HealthCheck, f TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool) *TopologyWatcher { +func NewTopologyWatcher( + ctx context.Context, topoServer *topo.Server, hc HealthCheck, f TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, opts ...DiscoveryOption, +) *TopologyWatcher { tw := &TopologyWatcher{ topoServer: topoServer, healthcheck: hc, @@ -97,6 +102,7 @@ func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, hc HealthC refreshInterval: refreshInterval, refreshKnownTablets: refreshKnownTablets, tablets: make(map[string]*tabletInfo), + options: withOptions(opts...), } tw.firstLoadChan = make(chan struct{}) @@ -147,10 +153,10 @@ func (tw *TopologyWatcher) loadTablets() { topologyWatcherErrors.Add(topologyWatcherOpListTablets, 1) // If we get a partial result error, we just log it and process the tablets that we did manage to fetch. if topo.IsErrType(err, topo.PartialResult) { - log.Errorf("received partial result from getTablets for cell %v: %v", tw.cell, err) + tw.logger().Errorf("received partial result from getTablets for cell %v: %v", tw.cell, err) partialResult = true } else { // For all other errors, just return. - log.Errorf("error getting tablets for cell: %v: %v", tw.cell, err) + tw.logger().Errorf("error getting tablets for cell: %v: %v", tw.cell, err) return } } @@ -243,7 +249,6 @@ func (tw *TopologyWatcher) loadTablets() { } tw.topoChecksum = crc32.ChecksumIEEE(buf.Bytes()) tw.lastRefresh = time.Now() - } // RefreshLag returns the time since the last refresh. @@ -262,6 +267,11 @@ func (tw *TopologyWatcher) TopoChecksum() uint32 { return tw.topoChecksum } +// logger returns the logutil.Logger used by the TopologyWatcher. +func (tw *TopologyWatcher) logger() logutil.Logger { + return tw.options.logger +} + // TabletFilter is an interface that can be given to a TopologyWatcher // to be applied as an additional filter on the list of tablets returned by its getTablets function. type TabletFilter interface { @@ -287,6 +297,9 @@ func (tf TabletFilters) IsIncluded(tablet *topodatapb.Tablet) bool { type FilterByShard struct { // filters is a map of keyspace to filters for shards filters map[string][]*filterShard + // options contains optional settings used to modify FilterByShard + // behavior. + options discoveryOptions } // filterShard describes a filter for a given shard or keyrange inside @@ -295,6 +308,7 @@ type filterShard struct { keyspace string shard string keyRange *topodatapb.KeyRange // only set if shard is also a KeyRange + options discoveryOptions } // NewFilterByShard creates a new FilterByShard for use by a @@ -302,7 +316,7 @@ type filterShard struct { // can either be a shard name, or a keyrange. All tablets that match // at least one keyspace|shard tuple will be forwarded by the // TopologyWatcher to its consumer. -func NewFilterByShard(filters []string) (*FilterByShard, error) { +func NewFilterByShard(filters []string, opts ...DiscoveryOption) (*FilterByShard, error) { m := make(map[string][]*filterShard) for _, filter := range filters { parts := strings.Split(filter, "|") @@ -333,16 +347,19 @@ func NewFilterByShard(filters []string) (*FilterByShard, error) { }) } - return &FilterByShard{ + fbs := &FilterByShard{ filters: m, - }, nil + options: withOptions(opts...), + } + + return fbs, nil } // IsIncluded returns true iff the tablet's keyspace and shard match what we have. func (fbs *FilterByShard) IsIncluded(tablet *topodatapb.Tablet) bool { canonical, kr, err := topo.ValidateShardName(tablet.Shard) if err != nil { - log.Errorf("Error parsing shard name %v, will ignore tablet: %v", tablet.Shard, err) + fbs.logger().Errorf("Error parsing shard name %v, will ignore tablet: %v", tablet.Shard, err) return false } @@ -359,6 +376,11 @@ func (fbs *FilterByShard) IsIncluded(tablet *topodatapb.Tablet) bool { return false } +// logger returns the logutil.Logger used by the FilterByShard. +func (fbs *FilterByShard) logger() logutil.Logger { + return fbs.options.logger +} + // FilterByKeyspace is a filter that filters tablets by keyspace. type FilterByKeyspace struct { keyspaces map[string]bool From 110f3b816ce9f3536d62893664b5585eef769a11 Mon Sep 17 00:00:00 2001 From: Max Englander Date: Sat, 22 Feb 2025 13:15:34 -0500 Subject: [PATCH 2/2] cr: golang style Signed-off-by: Max Englander --- go/vt/discovery/healthcheck.go | 4 +-- .../{discovery_options.go => options.go} | 36 +++++++++---------- go/vt/discovery/topology_watcher.go | 10 +++--- 3 files changed, 25 insertions(+), 25 deletions(-) rename go/vt/discovery/{discovery_options.go => options.go} (52%) diff --git a/go/vt/discovery/healthcheck.go b/go/vt/discovery/healthcheck.go index 55e49a6bc64..69e70923c80 100644 --- a/go/vt/discovery/healthcheck.go +++ b/go/vt/discovery/healthcheck.go @@ -304,7 +304,7 @@ type HealthCheckImpl struct { loadTabletsTrigger chan struct{} // options contains optional settings used to modify HealthCheckImpl // behavior. - options discoveryOptions + options Options } // NewVTGateHealthCheckFilters returns healthcheck filters for vtgate. @@ -357,7 +357,7 @@ func NewVTGateHealthCheckFilters() (filters TabletFilters, err error) { // // Is one or more filters to apply when determining what tablets we want to stream healthchecks from. func NewHealthCheck( - ctx context.Context, retryDelay, healthCheckTimeout time.Duration, topoServer *topo.Server, localCell, cellsToWatch string, filters TabletFilter, opts ...DiscoveryOption, + ctx context.Context, retryDelay, healthCheckTimeout time.Duration, topoServer *topo.Server, localCell, cellsToWatch string, filters TabletFilter, opts ...Option, ) *HealthCheckImpl { hc := &HealthCheckImpl{ ts: topoServer, diff --git a/go/vt/discovery/discovery_options.go b/go/vt/discovery/options.go similarity index 52% rename from go/vt/discovery/discovery_options.go rename to go/vt/discovery/options.go index c5a2b1762d3..19847908329 100644 --- a/go/vt/discovery/discovery_options.go +++ b/go/vt/discovery/options.go @@ -17,30 +17,30 @@ import ( "vitess.io/vitess/go/vt/logutil" ) -// discoveryOptions configure a discovery components. discoveryOptions are set -// by the DiscoveryOption values passed to the component constructors. -type discoveryOptions struct { +// Options configure a discovery components. Options are set by the Option +// values passed to the component constructors. +type Options struct { logger logutil.Logger } -// DiscoveryOption configures how we perform certain operations. -type DiscoveryOption interface { - apply(*discoveryOptions) +// Option configures how we perform certain operations. +type Option interface { + apply(*Options) } -// funcDiscoveryOption wraps a function that modifies discoveryOptions into -// an implementation of the DiscoveryOption interface. -type funcDiscoveryOption struct { - f func(*discoveryOptions) +// funcOption wraps a function that modifies options into an implementation of +// the Option interface. +type funcOption struct { + f func(*Options) } -func defaultOptions() discoveryOptions { - return discoveryOptions{ +func defaultOptions() Options { + return Options{ logger: logutil.NewConsoleLogger(), } } -func withOptions(dos ...DiscoveryOption) discoveryOptions { +func withOptions(dos ...Option) Options { os := defaultOptions() for _, do := range dos { do.apply(&os) @@ -48,20 +48,20 @@ func withOptions(dos ...DiscoveryOption) discoveryOptions { return os } -func (fhco *funcDiscoveryOption) apply(dos *discoveryOptions) { +func (fhco *funcOption) apply(dos *Options) { fhco.f(dos) } -func newFuncDiscoveryOption(f func(*discoveryOptions)) *funcDiscoveryOption { - return &funcDiscoveryOption{ +func newFuncOption(f func(*Options)) *funcOption { + return &funcOption{ f: f, } } // WithLogger accepts a custom logger to use in a discovery component. If this // option is not provided then the default system logger will be used. -func WithLogger(l logutil.Logger) DiscoveryOption { - return newFuncDiscoveryOption(func(o *discoveryOptions) { +func WithLogger(l logutil.Logger) Option { + return newFuncOption(func(o *Options) { o.logger = l }) } diff --git a/go/vt/discovery/topology_watcher.go b/go/vt/discovery/topology_watcher.go index 279cfb9146f..506b76bf021 100644 --- a/go/vt/discovery/topology_watcher.go +++ b/go/vt/discovery/topology_watcher.go @@ -86,13 +86,13 @@ type TopologyWatcher struct { firstLoadChan chan struct{} // options contains optional settings used to modify HealthCheckImpl // behavior. - options discoveryOptions + options Options } // NewTopologyWatcher returns a TopologyWatcher that monitors all // the tablets in a cell, and reloads them as needed. func NewTopologyWatcher( - ctx context.Context, topoServer *topo.Server, hc HealthCheck, f TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, opts ...DiscoveryOption, + ctx context.Context, topoServer *topo.Server, hc HealthCheck, f TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, opts ...Option, ) *TopologyWatcher { tw := &TopologyWatcher{ topoServer: topoServer, @@ -299,7 +299,7 @@ type FilterByShard struct { filters map[string][]*filterShard // options contains optional settings used to modify FilterByShard // behavior. - options discoveryOptions + options Options } // filterShard describes a filter for a given shard or keyrange inside @@ -308,7 +308,7 @@ type filterShard struct { keyspace string shard string keyRange *topodatapb.KeyRange // only set if shard is also a KeyRange - options discoveryOptions + options Options } // NewFilterByShard creates a new FilterByShard for use by a @@ -316,7 +316,7 @@ type filterShard struct { // can either be a shard name, or a keyrange. All tablets that match // at least one keyspace|shard tuple will be forwarded by the // TopologyWatcher to its consumer. -func NewFilterByShard(filters []string, opts ...DiscoveryOption) (*FilterByShard, error) { +func NewFilterByShard(filters []string, opts ...Option) (*FilterByShard, error) { m := make(map[string][]*filterShard) for _, filter := range filters { parts := strings.Split(filter, "|")