Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

go/vt/discovery: configurable logger #17846

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 35 additions & 19 deletions go/vt/discovery/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (
"vitess.io/vitess/go/netutil"
"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/proto/vtrpc"
Expand Down Expand Up @@ -188,8 +189,10 @@ func FilteringKeyspaces() bool {
return len(KeyspacesToWatch) > 0
}

type KeyspaceShardTabletType string
type tabletAliasString string
type (
KeyspaceShardTabletType string
tabletAliasString string
)

// HealthCheck declares what the TabletGateway needs from the HealthCheck
type HealthCheck interface {
Expand Down Expand Up @@ -299,6 +302,9 @@ type HealthCheckImpl struct {
subscribers map[chan *TabletHealth]struct{}
// loadTablets trigger is used to immediately load a new primary tablet when the current one has been demoted
loadTabletsTrigger chan struct{}
// options contains optional settings used to modify HealthCheckImpl
// behavior.
options Options
}

// NewVTGateHealthCheckFilters returns healthcheck filters for vtgate.
Expand Down Expand Up @@ -350,9 +356,9 @@ func NewVTGateHealthCheckFilters() (filters TabletFilters, err error) {
// filters.
//
// Is one or more filters to apply when determining what tablets we want to stream healthchecks from.
func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Duration, topoServer *topo.Server, localCell, cellsToWatch string, filters TabletFilter) *HealthCheckImpl {
log.Infof("loading tablets for cells: %v", cellsToWatch)

func NewHealthCheck(
ctx context.Context, retryDelay, healthCheckTimeout time.Duration, topoServer *topo.Server, localCell, cellsToWatch string, filters TabletFilter, opts ...Option,
) *HealthCheckImpl {
hc := &HealthCheckImpl{
ts: topoServer,
cell: localCell,
Expand All @@ -364,19 +370,23 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur
subscribers: make(map[chan *TabletHealth]struct{}),
cellAliases: make(map[string]string),
loadTabletsTrigger: make(chan struct{}, 1),
options: withOptions(opts...),
}

hc.logger().Infof("loading tablets for cells: %v", cellsToWatch)

var topoWatchers []*TopologyWatcher
cells := strings.Split(cellsToWatch, ",")
if cellsToWatch == "" {
cells = append(cells, localCell)
}

for _, c := range cells {
log.Infof("Setting up healthcheck for cell: %v", c)
hc.logger().Infof("Setting up healthcheck for cell: %v", c)
if c == "" {
continue
}
topoWatchers = append(topoWatchers, NewTopologyWatcher(ctx, topoServer, hc, filters, c, refreshInterval, refreshKnownTablets))
topoWatchers = append(topoWatchers, NewTopologyWatcher(ctx, topoServer, hc, filters, c, refreshInterval, refreshKnownTablets, opts...))
}

hc.topoWatchers = topoWatchers
Expand All @@ -401,7 +411,7 @@ func (hc *HealthCheckImpl) AddTablet(tablet *topodata.Tablet) {
return
}

log.Infof("Adding tablet to healthcheck: %v", tablet)
hc.logger().Infof("Adding tablet to healthcheck: %v", tablet)
hc.mu.Lock()
defer hc.mu.Unlock()
if hc.healthByAlias == nil {
Expand All @@ -419,14 +429,15 @@ func (hc *HealthCheckImpl) AddTablet(tablet *topodata.Tablet) {
cancelFunc: cancelFunc,
Tablet: tablet,
Target: target,
logger: hc.logger(),
}

// add to our datastore
key := KeyFromTarget(target)
tabletAlias := topoproto.TabletAliasString(tablet.Alias)
if _, ok := hc.healthByAlias[tabletAliasString(tabletAlias)]; ok {
// We should not add a tablet that we already have
log.Errorf("Program bug: tried to add existing tablet: %v to healthcheck", tabletAlias)
hc.logger().Errorf("Program bug: tried to add existing tablet: %v to healthcheck", tabletAlias)
return
}
hc.healthByAlias[tabletAliasString(tabletAlias)] = thc
Expand Down Expand Up @@ -454,7 +465,7 @@ func (hc *HealthCheckImpl) ReplaceTablet(old, new *topodata.Tablet) {
}

func (hc *HealthCheckImpl) deleteTablet(tablet *topodata.Tablet) {
log.Infof("Removing tablet from healthcheck: %v", tablet)
hc.logger().Infof("Removing tablet from healthcheck: %v", tablet)
hc.mu.Lock()
defer hc.mu.Unlock()

Expand Down Expand Up @@ -497,7 +508,7 @@ func (hc *HealthCheckImpl) deleteTablet(tablet *topodata.Tablet) {
// delete from authoritative map
th, ok := hc.healthByAlias[tabletAlias]
if !ok {
log.Infof("We have no health data for tablet: %v, it might have been deleted already", tablet)
hc.logger().Infof("We have no health data for tablet: %v, it might have been deleted already", tablet)
return
}
// Calling this will end the context associated with th.checkConn,
Expand All @@ -516,7 +527,7 @@ func (hc *HealthCheckImpl) updateHealth(th *TabletHealth, prevTarget *query.Targ
// so that we're not racing to update it and in effect re-adding a copy of the
// tablet record that was deleted
if _, ok := hc.healthByAlias[tabletAlias]; !ok {
log.Infof("Tablet %v has been deleted, skipping health update", th.Tablet)
hc.logger().Infof("Tablet %v has been deleted, skipping health update", th.Tablet)
return
}

Expand All @@ -541,7 +552,7 @@ func (hc *HealthCheckImpl) updateHealth(th *TabletHealth, prevTarget *query.Targ
// causing an interruption where no primary is assigned to the shard.
if prevTarget.TabletType == topodata.TabletType_PRIMARY {
if primaries := hc.healthData[oldTargetKey]; len(primaries) == 0 {
log.Infof("We will have no health data for the next new primary tablet after demoting the tablet: %v, so start loading tablets now", topotools.TabletIdent(th.Tablet))
hc.logger().Infof("We will have no health data for the next new primary tablet after demoting the tablet: %v, so start loading tablets now", topotools.TabletIdent(th.Tablet))
// We want to trigger a loadTablets call, but if the channel is not empty
// then a trigger is already scheduled, we don't need to trigger another one.
// This also prevents the code from deadlocking as described in https://github.com/vitessio/vitess/issues/16994.
Expand All @@ -567,7 +578,7 @@ func (hc *HealthCheckImpl) updateHealth(th *TabletHealth, prevTarget *query.Targ
// We already have one up server, see if we
// need to replace it.
if th.PrimaryTermStartTime < hc.healthy[targetKey][0].PrimaryTermStartTime {
log.Warningf("not marking healthy primary %s as Up for %s because its PrimaryTermStartTime is smaller than the highest known timestamp from previous PRIMARYs %s: %d < %d ",
hc.logger().Warningf("not marking healthy primary %s as Up for %s because its PrimaryTermStartTime is smaller than the highest known timestamp from previous PRIMARYs %s: %d < %d ",
topoproto.TabletAliasString(th.Tablet.Alias),
topoproto.KeyspaceShardString(th.Target.Keyspace, th.Target.Shard),
topoproto.TabletAliasString(hc.healthy[targetKey][0].Tablet.Alias),
Expand Down Expand Up @@ -604,7 +615,7 @@ func (hc *HealthCheckImpl) updateHealth(th *TabletHealth, prevTarget *query.Targ

isNewPrimary := isPrimary && prevTarget.TabletType != topodata.TabletType_PRIMARY
if isNewPrimary {
log.Errorf("Adding 1 to PrimaryPromoted counter for target: %v, tablet: %v, tabletType: %v", prevTarget, topoproto.TabletAliasString(th.Tablet.Alias), th.Target.TabletType)
hc.logger().Errorf("Adding 1 to PrimaryPromoted counter for target: %v, tablet: %v, tabletType: %v", prevTarget, topoproto.TabletAliasString(th.Tablet.Alias), th.Target.TabletType)
hcPrimaryPromotedCounters.Add([]string{th.Target.Keyspace, th.Target.Shard}, 1)
}

Expand Down Expand Up @@ -656,7 +667,7 @@ func (hc *HealthCheckImpl) broadcast(th *TabletHealth) {
default:
// If the channel is full, we drop the message.
hcChannelFullCounter.Add(1)
log.Warningf("HealthCheck broadcast channel is full, dropping message for %s", topotools.TabletIdent(th.Tablet))
hc.logger().Warningf("HealthCheck broadcast channel is full, dropping message for %s", topotools.TabletIdent(th.Tablet))
}
}
}
Expand Down Expand Up @@ -837,7 +848,7 @@ func (hc *HealthCheckImpl) waitForTablets(ctx context.Context, targets []*query.
timer.Stop()
for _, target := range targets {
if target != nil {
log.Infof("couldn't find tablets for target: %v", target)
hc.logger().Infof("couldn't find tablets for target: %v", target)
}
}
return ctx.Err()
Expand Down Expand Up @@ -966,7 +977,7 @@ func (hc *HealthCheckImpl) ServeHTTP(w http.ResponseWriter, _ *http.Request) {
if err != nil {
// Error logged
if _, err := w.Write([]byte(err.Error())); err != nil {
log.Errorf("write to buffer error failed: %v", err)
hc.logger().Errorf("write to buffer error failed: %v", err)
}

return
Expand All @@ -977,7 +988,7 @@ func (hc *HealthCheckImpl) ServeHTTP(w http.ResponseWriter, _ *http.Request) {

// Error logged
if _, err := w.Write(buf.Bytes()); err != nil {
log.Errorf("write to buffer bytes failed: %v", err)
hc.logger().Errorf("write to buffer bytes failed: %v", err)
}
}

Expand Down Expand Up @@ -1018,6 +1029,11 @@ func (hc *HealthCheckImpl) stateChecksum() int64 {
return int64(crc32.ChecksumIEEE(buf.Bytes()))
}

// logger returns the logutil.Logger used by the healthcheck.
func (hc *HealthCheckImpl) logger() logutil.Logger {
return hc.options.logger
}

// TabletToMapKey creates a key to the map from tablet's host and ports.
// It should only be used in discovery and related module.
func TabletToMapKey(tablet *topodata.Tablet) string {
Expand Down
67 changes: 67 additions & 0 deletions go/vt/discovery/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
Copyright 2025 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package discovery

import (
"vitess.io/vitess/go/vt/logutil"
)

// Options configure a discovery components. Options are set by the Option
// values passed to the component constructors.
type Options struct {
logger logutil.Logger
}

// Option configures how we perform certain operations.
type Option interface {
apply(*Options)
}

// funcOption wraps a function that modifies options into an implementation of
// the Option interface.
type funcOption struct {
f func(*Options)
}

func defaultOptions() Options {
return Options{
logger: logutil.NewConsoleLogger(),
}
}

func withOptions(dos ...Option) Options {
os := defaultOptions()
for _, do := range dos {
do.apply(&os)
}
return os
}

func (fhco *funcOption) apply(dos *Options) {
fhco.f(dos)
}

func newFuncOption(f func(*Options)) *funcOption {
return &funcOption{
f: f,
}
}

// WithLogger accepts a custom logger to use in a discovery component. If this
// option is not provided then the default system logger will be used.
func WithLogger(l logutil.Logger) Option {
return newFuncOption(func(o *Options) {
o.logger = l
})
}
10 changes: 6 additions & 4 deletions go/vt/discovery/tablet_health_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"time"

"vitess.io/vitess/go/vt/grpcclient"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/topotools"
Expand Down Expand Up @@ -71,6 +71,8 @@ type tabletHealthCheck struct {
// possibly delete both these
loggedServingState bool
lastResponseTimestamp time.Time // timestamp of the last healthcheck response
// logger is used to log messages.
logger logutil.Logger
}

// String is defined because we want to print a []*tabletHealthCheck array nicely.
Expand Down Expand Up @@ -107,7 +109,7 @@ func (thc *tabletHealthCheck) setServingState(serving bool, reason string) {
if !thc.loggedServingState || (serving != thc.Serving) {
// Emit the log from a separate goroutine to avoid holding
// the th lock while logging is happening
log.Infof("HealthCheckUpdate(Serving State): tablet: %v serving %v => %v for %v/%v (%v) reason: %s",
thc.logger.Infof("HealthCheckUpdate(Serving State): tablet: %v serving %v => %v for %v/%v (%v) reason: %s",
topotools.TabletIdent(thc.Tablet),
thc.Serving,
serving,
Expand Down Expand Up @@ -294,7 +296,7 @@ func (thc *tabletHealthCheck) checkConn(hc *HealthCheckImpl) {
// the healthcheck cache again via the topology watcher.
// WARNING: Under no other circumstances should we be deleting the tablet here.
if strings.Contains(err.Error(), "health stats mismatch") {
log.Warningf("deleting tablet %v from healthcheck due to health stats mismatch", thc.Tablet)
thc.logger.Warningf("deleting tablet %v from healthcheck due to health stats mismatch", thc.Tablet)
hc.deleteTablet(thc.Tablet)
return
}
Expand Down Expand Up @@ -331,7 +333,7 @@ func (thc *tabletHealthCheck) checkConn(hc *HealthCheckImpl) {
}

func (thc *tabletHealthCheck) closeConnection(ctx context.Context, err error) {
log.Warningf("tablet %v healthcheck stream error: %v", thc.Tablet, err)
thc.logger.Warningf("tablet %v healthcheck stream error: %v", thc.Tablet, err)
thc.setServingState(false, err.Error())
thc.LastError = err
_ = thc.Conn.Close(ctx)
Expand Down
Loading
Loading