From b686692b02573256234823956941316870f5d0fd Mon Sep 17 00:00:00 2001 From: disksing Date: Mon, 10 Apr 2023 17:31:46 +0800 Subject: [PATCH] dr-autosync: add recover timeout Signed-off-by: disksing --- server/config/config.go | 15 +++++---- server/replication/replication_mode.go | 15 +++++++-- server/replication/replication_mode_test.go | 37 ++++++++++++++++----- 3 files changed, 50 insertions(+), 17 deletions(-) diff --git a/server/config/config.go b/server/config/config.go index 420222c2f23..bd2b7394f19 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -1369,13 +1369,14 @@ func NormalizeReplicationMode(m string) string { // DRAutoSyncReplicationConfig is the configuration for auto sync mode between 2 data centers. type DRAutoSyncReplicationConfig struct { - LabelKey string `toml:"label-key" json:"label-key"` - Primary string `toml:"primary" json:"primary"` - DR string `toml:"dr" json:"dr"` - PrimaryReplicas int `toml:"primary-replicas" json:"primary-replicas"` - DRReplicas int `toml:"dr-replicas" json:"dr-replicas"` - WaitStoreTimeout typeutil.Duration `toml:"wait-store-timeout" json:"wait-store-timeout"` - PauseRegionSplit bool `toml:"pause-region-split" json:"pause-region-split,string"` + LabelKey string `toml:"label-key" json:"label-key"` + Primary string `toml:"primary" json:"primary"` + DR string `toml:"dr" json:"dr"` + PrimaryReplicas int `toml:"primary-replicas" json:"primary-replicas"` + DRReplicas int `toml:"dr-replicas" json:"dr-replicas"` + WaitStoreTimeout typeutil.Duration `toml:"wait-store-timeout" json:"wait-store-timeout"` + WaitRecoverTimeout typeutil.Duration `toml:"wait-recover-timeout" json:"wait-recover-timeout"` + PauseRegionSplit bool `toml:"pause-region-split" json:"pause-region-split,string"` } func (c *DRAutoSyncReplicationConfig) adjust(meta *configutil.ConfigMetaData) { diff --git a/server/replication/replication_mode.go b/server/replication/replication_mode.go index 52978c78fa9..a088d38ddcd 100644 --- a/server/replication/replication_mode.go +++ b/server/replication/replication_mode.go @@ -211,6 +211,7 @@ const ( type drAutoSyncStatus struct { State string `json:"state,omitempty"` StateID uint64 `json:"state_id,omitempty"` + AsyncStartTime *time.Time `json:"async_start,omitempty"` RecoverStartTime *time.Time `json:"recover_start,omitempty"` TotalRegions int `json:"total_regions,omitempty"` SyncedRegions int `json:"synced_regions,omitempty"` @@ -262,7 +263,8 @@ func (m *ModeManager) drSwitchToAsyncWithLock(availableStores []uint64) error { log.Warn("failed to switch to async state", zap.String("replicate-mode", modeDRAutoSync), errs.ZapError(err)) return err } - dr := drAutoSyncStatus{State: drStateAsync, StateID: id, AvailableStores: availableStores} + now := time.Now() + dr := drAutoSyncStatus{State: drStateAsync, StateID: id, AvailableStores: availableStores, AsyncStartTime: &now} m.drPersistStatusWithLock(dr) if err := m.storage.SaveReplicationStatus(modeDRAutoSync, dr); err != nil { log.Warn("failed to switch to async state", zap.String("replicate-mode", modeDRAutoSync), errs.ZapError(err)) @@ -273,6 +275,15 @@ func (m *ModeManager) drSwitchToAsyncWithLock(availableStores []uint64) error { return nil } +func (m *ModeManager) drDurationSinceAsyncStart() time.Duration { + m.RLock() + defer m.RUnlock() + if m.drAutoSync.AsyncStartTime == nil { + return 0 + } + return time.Since(*m.drAutoSync.AsyncStartTime) +} + func (m *ModeManager) drSwitchToSyncRecover() error { m.Lock() defer m.Unlock() @@ -461,7 +472,7 @@ func (m *ModeManager) tickDR() { m.drSwitchToAsync(stores[primaryUp]) } case drStateAsync: - if canSync { + if canSync && m.drDurationSinceAsyncStart() > m.config.DRAutoSync.WaitRecoverTimeout.Duration { m.drSwitchToSyncRecover() break } diff --git a/server/replication/replication_mode_test.go b/server/replication/replication_mode_test.go index 499c40b0412..755df968420 100644 --- a/server/replication/replication_mode_test.go +++ b/server/replication/replication_mode_test.go @@ -16,6 +16,7 @@ package replication import ( "context" + "encoding/json" "errors" "fmt" "testing" @@ -158,6 +159,20 @@ func newMockReplicator(ids []uint64) *mockFileReplicator { } } +func assertLastData(t *testing.T, data string, state string, stateID uint64, availableStores []uint64) { + type status struct { + State string `json:"state"` + StateID uint64 `json:"state_id"` + AvailableStores []uint64 `json:"available_stores"` + } + var s status + err := json.Unmarshal([]byte(data), &s) + require.NoError(t, err) + require.Equal(t, state, s.State) + require.Equal(t, stateID, s.StateID) + require.Equal(t, availableStores, s.AvailableStores) +} + func TestStateSwitch(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) @@ -185,7 +200,7 @@ func TestStateSwitch(t *testing.T) { re.Equal(drStateSync, rep.drGetState()) stateID := rep.drAutoSync.StateID re.NotEqual(uint64(0), stateID) - re.Equal(fmt.Sprintf(`{"state":"sync","state_id":%d}`, stateID), replicator.lastData[1]) + assertLastData(t, replicator.lastData[1], "sync", stateID, nil) assertStateIDUpdate := func() { re.NotEqual(stateID, rep.drAutoSync.StateID) stateID = rep.drAutoSync.StateID @@ -201,7 +216,7 @@ func TestStateSwitch(t *testing.T) { rep.tickDR() re.Equal(drStateAsyncWait, rep.drGetState()) assertStateIDUpdate() - re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,2,3,4]}`, stateID), replicator.lastData[1]) + assertLastData(t, replicator.lastData[1], "async_wait", stateID, []uint64{1, 2, 3, 4}) re.False(rep.GetReplicationStatus().GetDrAutoSync().GetPauseRegionSplit()) conf.DRAutoSync.PauseRegionSplit = true @@ -211,7 +226,7 @@ func TestStateSwitch(t *testing.T) { syncStoreStatus(1, 2, 3, 4) rep.tickDR() assertStateIDUpdate() - re.Equal(fmt.Sprintf(`{"state":"async","state_id":%d,"available_stores":[1,2,3,4]}`, stateID), replicator.lastData[1]) + assertLastData(t, replicator.lastData[1], "async", stateID, []uint64{1, 2, 3, 4}) // add new store in dr zone. cluster.AddLabelsStore(5, 1, map[string]string{"zone": "zone2"}) @@ -257,11 +272,11 @@ func TestStateSwitch(t *testing.T) { rep.tickDR() re.Equal(drStateAsyncWait, rep.drGetState()) assertStateIDUpdate() - re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,2,3,4]}`, stateID), replicator.lastData[1]) + assertLastData(t, replicator.lastData[1], "async_wait", stateID, []uint64{1, 2, 3, 4}) setStoreState(cluster, "down", "up", "up", "up", "down", "up") rep.tickDR() assertStateIDUpdate() - re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[2,3,4]}`, stateID), replicator.lastData[1]) + assertLastData(t, replicator.lastData[1], "async_wait", stateID, []uint64{2, 3, 4}) setStoreState(cluster, "up", "down", "up", "up", "down", "up") rep.tickDR() assertStateIDUpdate() @@ -276,24 +291,30 @@ func TestStateSwitch(t *testing.T) { syncStoreStatus(4) rep.tickDR() assertStateIDUpdate() - re.Equal(fmt.Sprintf(`{"state":"async","state_id":%d,"available_stores":[1,3,4]}`, stateID), replicator.lastData[1]) + assertLastData(t, replicator.lastData[1], "async", stateID, []uint64{1, 3, 4}) // async -> async setStoreState(cluster, "up", "up", "up", "up", "down", "up") rep.tickDR() // store 2 won't be available before it syncs status. - re.Equal(fmt.Sprintf(`{"state":"async","state_id":%d,"available_stores":[1,3,4]}`, stateID), replicator.lastData[1]) + assertLastData(t, replicator.lastData[1], "async", stateID, []uint64{1, 3, 4}) syncStoreStatus(1, 2, 3, 4) rep.tickDR() assertStateIDUpdate() - re.Equal(fmt.Sprintf(`{"state":"async","state_id":%d,"available_stores":[1,2,3,4]}`, stateID), replicator.lastData[1]) + assertLastData(t, replicator.lastData[1], "async", stateID, []uint64{1, 2, 3, 4}) // async -> sync_recover setStoreState(cluster, "up", "up", "up", "up", "up", "up") rep.tickDR() re.Equal(drStateSyncRecover, rep.drGetState()) assertStateIDUpdate() + rep.drSwitchToAsync([]uint64{1, 2, 3, 4, 5}) + rep.config.DRAutoSync.WaitRecoverTimeout = typeutil.NewDuration(time.Hour) + rep.tickDR() + re.Equal(drStateAsync, rep.drGetState()) // wait recover timeout + + rep.config.DRAutoSync.WaitRecoverTimeout = typeutil.NewDuration(0) setStoreState(cluster, "down", "up", "up", "up", "up", "up") rep.tickDR() re.Equal(drStateSyncRecover, rep.drGetState())