diff --git a/go/vt/discovery/tablet_picker.go b/go/vt/discovery/tablet_picker.go index cfe1ba2e964..34c7178c350 100644 --- a/go/vt/discovery/tablet_picker.go +++ b/go/vt/discovery/tablet_picker.go @@ -100,9 +100,10 @@ func SetTabletPickerRetryDelay(delay time.Duration) { } type TabletPickerOptions struct { - CellPreference string - TabletOrder string - IncludeNonServingTablets bool + CellPreference string + TabletOrder string + IncludeNonServingTablets bool + ExcludeTabletsWithMaxReplicationLag time.Duration } func parseTabletPickerCellPreferenceString(str string) (TabletPickerCellPreference, error) { @@ -356,8 +357,8 @@ func (tp *TabletPicker) PickForStreaming(ctx context.Context) (*topodatapb.Table if len(candidates) == 0 { // If no viable candidates were found, sleep and try again. tp.incNoTabletFoundStat() - log.Infof("No healthy serving tablet found for streaming, shard %s.%s, cells %v, tabletTypes %v, sleeping for %.3f seconds.", - tp.keyspace, tp.shard, tp.cells, tp.tabletTypes, float64(GetTabletPickerRetryDelay().Milliseconds())/1000.0) + log.Infof("No healthy serving tablet found for streaming, shard %s.%s, cells %v, tabletTypes %v, maxReplicationLag: %v, sleeping for %.3f seconds.", + tp.keyspace, tp.shard, tp.cells, tp.tabletTypes, tp.options.ExcludeTabletsWithMaxReplicationLag, float64(GetTabletPickerRetryDelay().Milliseconds())/1000.0) timer := time.NewTimer(GetTabletPickerRetryDelay()) select { case <-ctx.Done(): @@ -471,8 +472,10 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn if err := conn.StreamHealth(shortCtx, func(shr *querypb.StreamHealthResponse) error { if shr != nil && (shr.Serving || tp.options.IncludeNonServingTablets) && - shr.RealtimeStats != nil && - shr.RealtimeStats.HealthError == "" { + (shr.RealtimeStats != nil && shr.RealtimeStats.HealthError == "" && + (tabletInfo.Tablet.Type == topodatapb.TabletType_PRIMARY /* lag is not relevant */ || + (tp.options.ExcludeTabletsWithMaxReplicationLag == 0 /* not set */ || + shr.RealtimeStats.ReplicationLagSeconds <= uint32(tp.options.ExcludeTabletsWithMaxReplicationLag.Seconds())))) { return io.EOF // End the stream } return vterrors.New(vtrpcpb.Code_INTERNAL, "tablet is not healthy and serving") diff --git a/go/vt/discovery/tablet_picker_test.go b/go/vt/discovery/tablet_picker_test.go index 27c4d8bf7b1..b44ae9adbd1 100644 --- a/go/vt/discovery/tablet_picker_test.go +++ b/go/vt/discovery/tablet_picker_test.go @@ -685,6 +685,61 @@ func TestPickNonServingTablets(t *testing.T) { assert.True(t, picked3) } +// TestPickNonLaggingTablets validates that lagging tablets are excluded when the +// ExcludeTabletsWithMaxReplicationLag option is set. +func TestPickNonLaggingTablets(t *testing.T) { + ctx := utils.LeakCheckContext(t) + cells := []string{"cell1"} + defaultCell := cells[0] + tabletTypes := "replica" + options := TabletPickerOptions{ + ExcludeTabletsWithMaxReplicationLag: lowReplicationLag.Default(), + } + replLag := options.ExcludeTabletsWithMaxReplicationLag + (5 * time.Second) + te := newPickerTestEnv(t, ctx, cells) + + // Tablet should not be selected as we only want replicas. + primaryTablet := addTablet(ctx, te, 100, topodatapb.TabletType_PRIMARY, defaultCell, true, true) + defer deleteTablet(t, te, primaryTablet) + + // Tablet should not be selected as it is lagging. + laggingReplicaTablet := addTabletWithLag(ctx, te, 200, topodatapb.TabletType_REPLICA, defaultCell, true, true, uint32(replLag.Seconds())) + defer deleteTablet(t, te, laggingReplicaTablet) + + // Tablet should be selected because it's a non-lagging replica. + nonLaggingReplicaTablet := addTablet(ctx, te, 300, topodatapb.TabletType_REPLICA, defaultCell, true, true) + defer deleteTablet(t, te, nonLaggingReplicaTablet) + + _, err := te.topoServ.UpdateShardFields(ctx, te.keyspace, te.shard, func(si *topo.ShardInfo) error { + si.PrimaryAlias = primaryTablet.Alias + return nil + }) + require.NoError(t, err) + + tp, err := NewTabletPicker(ctx, te.topoServ, cells, defaultCell, te.keyspace, te.shard, tabletTypes, options) + require.NoError(t, err) + ctx, cancel := context.WithTimeout(ctx, contextTimeout) + defer cancel() + + var pickedPrimary, pickedLaggingReplica, pickedNonLaggingReplica int + for i := 0; i < numTestIterations; i++ { + tablet, err := tp.PickForStreaming(ctx) + require.NoError(t, err) + if proto.Equal(tablet, primaryTablet) { + pickedPrimary++ + } + if proto.Equal(tablet, laggingReplicaTablet) { + pickedLaggingReplica++ + } + if proto.Equal(tablet, nonLaggingReplicaTablet) { + pickedNonLaggingReplica++ + } + } + require.Zero(t, pickedPrimary) + require.Zero(t, pickedLaggingReplica) + require.Equal(t, numTestIterations, pickedNonLaggingReplica) +} + type pickerTestEnv struct { t *testing.T keyspace string @@ -720,6 +775,10 @@ func newPickerTestEnv(t *testing.T, ctx context.Context, cells []string, extraCe } func addTablet(ctx context.Context, te *pickerTestEnv, id int, tabletType topodatapb.TabletType, cell string, serving, healthy bool) *topodatapb.Tablet { + return addTabletWithLag(ctx, te, id, tabletType, cell, serving, healthy, 0) +} + +func addTabletWithLag(ctx context.Context, te *pickerTestEnv, id int, tabletType topodatapb.TabletType, cell string, serving, healthy bool, replLagSecs uint32) *topodatapb.Tablet { tablet := &topodatapb.Tablet{ Alias: &topodatapb.TabletAlias{ Cell: cell, @@ -748,6 +807,7 @@ func addTablet(ctx context.Context, te *pickerTestEnv, id int, tabletType topoda if healthy { shr.RealtimeStats.HealthError = "" } + shr.RealtimeStats.ReplicationLagSeconds = replLagSecs _ = createFixedHealthConn(tablet, shr) diff --git a/go/vt/vtgate/vstream_manager.go b/go/vt/vtgate/vstream_manager.go index 2a48fa9fc6d..ada1ddb131c 100644 --- a/go/vt/vtgate/vstream_manager.go +++ b/go/vt/vtgate/vstream_manager.go @@ -194,6 +194,10 @@ func (vsm *vstreamManager) VStream(ctx context.Context, tabletType topodatapb.Ta tabletPickerOptions: discovery.TabletPickerOptions{ CellPreference: flags.GetCellPreference(), TabletOrder: flags.GetTabletOrder(), + // This is NOT configurable via the API because we check the + // discovery.GetLowReplicationLag().Seconds() value in the tablet + // health stream. + ExcludeTabletsWithMaxReplicationLag: discovery.GetLowReplicationLag(), }, flags: flags, }