Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dr-autosync: add recover timeout #6295

Merged
merged 6 commits into from
Nov 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions pkg/replication/replication_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@
type drAutoSyncStatus struct {
State string `json:"state,omitempty"`
StateID uint64 `json:"state_id,omitempty"`
AsyncStartTime *time.Time `json:"async_start,omitempty"`
RecoverStartTime *time.Time `json:"recover_start,omitempty"`
TotalRegions int `json:"total_regions,omitempty"`
SyncedRegions int `json:"synced_regions,omitempty"`
Expand Down Expand Up @@ -262,7 +263,8 @@
log.Warn("failed to switch to async state", zap.String("replicate-mode", modeDRAutoSync), errs.ZapError(err))
return err
}
dr := drAutoSyncStatus{State: drStateAsync, StateID: id, AvailableStores: availableStores}
now := time.Now()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to pick it to 6.5 and other branches?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need cherry-pick 6.5. i have added the label

dr := drAutoSyncStatus{State: drStateAsync, StateID: id, AvailableStores: availableStores, AsyncStartTime: &now}
if err := m.storage.SaveReplicationStatus(modeDRAutoSync, dr); err != nil {
log.Warn("failed to switch to async state", zap.String("replicate-mode", modeDRAutoSync), errs.ZapError(err))
return err
Expand All @@ -272,6 +274,15 @@
return nil
}

func (m *ModeManager) drDurationSinceAsyncStart() time.Duration {
m.RLock()
defer m.RUnlock()
if m.drAutoSync.AsyncStartTime == nil {
return 0

Check warning on line 281 in pkg/replication/replication_mode.go

View check run for this annotation

Codecov / codecov/patch

pkg/replication/replication_mode.go#L281

Added line #L281 was not covered by tests
}
return time.Since(*m.drAutoSync.AsyncStartTime)
}

func (m *ModeManager) drSwitchToSyncRecover() error {
m.Lock()
defer m.Unlock()
Expand Down Expand Up @@ -477,7 +488,7 @@
m.drSwitchToAsync(storeIDs[primaryUp])
}
case drStateAsync:
if canSync {
if canSync && m.drDurationSinceAsyncStart() > m.config.DRAutoSync.WaitRecoverTimeout.Duration {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this config only used to switch from async to syncRecover?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes!

m.drSwitchToSyncRecover()
break
}
Expand Down
60 changes: 41 additions & 19 deletions pkg/replication/replication_mode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package replication

import (
"context"
"encoding/json"
"errors"
"fmt"
"testing"
Expand Down Expand Up @@ -159,6 +160,20 @@ func newMockReplicator(ids []uint64) *mockFileReplicator {
}
}

func assertLastData(t *testing.T, data string, state string, stateID uint64, availableStores []uint64) {
type status struct {
State string `json:"state"`
StateID uint64 `json:"state_id"`
AvailableStores []uint64 `json:"available_stores"`
}
var s status
err := json.Unmarshal([]byte(data), &s)
require.NoError(t, err)
require.Equal(t, state, s.State)
require.Equal(t, stateID, s.StateID)
require.Equal(t, availableStores, s.AvailableStores)
}

func TestStateSwitch(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -190,7 +205,7 @@ func TestStateSwitch(t *testing.T) {
stateID := rep.drAutoSync.StateID
re.NotEqual(uint64(0), stateID)
rep.tickReplicateStatus()
re.Equal(fmt.Sprintf(`{"state":"sync","state_id":%d}`, stateID), replicator.lastData[1])
assertLastData(t, replicator.lastData[1], "sync", stateID, nil)
assertStateIDUpdate := func() {
re.NotEqual(stateID, rep.drAutoSync.StateID)
stateID = rep.drAutoSync.StateID
Expand All @@ -207,7 +222,7 @@ func TestStateSwitch(t *testing.T) {
re.Equal(drStateAsyncWait, rep.drGetState())
assertStateIDUpdate()
rep.tickReplicateStatus()
re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,2,3,4]}`, stateID), replicator.lastData[1])
assertLastData(t, replicator.lastData[1], "async_wait", stateID, []uint64{1, 2, 3, 4})

re.False(rep.GetReplicationStatus().GetDrAutoSync().GetPauseRegionSplit())
conf.DRAutoSync.PauseRegionSplit = true
Expand All @@ -218,7 +233,7 @@ func TestStateSwitch(t *testing.T) {
rep.tickUpdateState()
assertStateIDUpdate()
rep.tickReplicateStatus()
re.Equal(fmt.Sprintf(`{"state":"async","state_id":%d,"available_stores":[1,2,3,4]}`, stateID), replicator.lastData[1])
assertLastData(t, replicator.lastData[1], "async", stateID, []uint64{1, 2, 3, 4})

// add new store in dr zone.
cluster.AddLabelsStore(5, 1, map[string]string{"zone": "zone2"})
Expand Down Expand Up @@ -268,18 +283,19 @@ func TestStateSwitch(t *testing.T) {
rep.tickUpdateState()
re.Equal(drStateAsyncWait, rep.drGetState())
assertStateIDUpdate()

rep.tickReplicateStatus()
re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,2,3,4]}`, stateID), replicator.lastData[1])
assertLastData(t, replicator.lastData[1], "async_wait", stateID, []uint64{1, 2, 3, 4})
setStoreState(cluster, "down", "up", "up", "up", "down", "down")
rep.tickUpdateState()
assertStateIDUpdate()
rep.tickReplicateStatus()
re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[2,3,4]}`, stateID), replicator.lastData[1])
assertLastData(t, replicator.lastData[1], "async_wait", stateID, []uint64{2, 3, 4})
setStoreState(cluster, "up", "down", "up", "up", "down", "down")
rep.tickUpdateState()
assertStateIDUpdate()
rep.tickReplicateStatus()
re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,3,4]}`, stateID), replicator.lastData[1])
assertLastData(t, replicator.lastData[1], "async_wait", stateID, []uint64{1, 3, 4})

// async_wait -> async
rep.tickUpdateState()
Expand All @@ -291,26 +307,32 @@ func TestStateSwitch(t *testing.T) {
rep.tickUpdateState()
assertStateIDUpdate()
rep.tickReplicateStatus()
re.Equal(fmt.Sprintf(`{"state":"async","state_id":%d,"available_stores":[1,3,4]}`, stateID), replicator.lastData[1])
assertLastData(t, replicator.lastData[1], "async", stateID, []uint64{1, 3, 4})

// async -> async
setStoreState(cluster, "up", "up", "up", "up", "down", "down")
rep.tickUpdateState()
// store 2 won't be available before it syncs status.
rep.tickReplicateStatus()
re.Equal(fmt.Sprintf(`{"state":"async","state_id":%d,"available_stores":[1,3,4]}`, stateID), replicator.lastData[1])
assertLastData(t, replicator.lastData[1], "async", stateID, []uint64{1, 3, 4})
syncStoreStatus(1, 2, 3, 4)
rep.tickUpdateState()
assertStateIDUpdate()
rep.tickReplicateStatus()
re.Equal(fmt.Sprintf(`{"state":"async","state_id":%d,"available_stores":[1,2,3,4]}`, stateID), replicator.lastData[1])
assertLastData(t, replicator.lastData[1], "async", stateID, []uint64{1, 2, 3, 4})

// async -> sync_recover
setStoreState(cluster, "up", "up", "up", "up", "up", "up")
rep.tickUpdateState()
re.Equal(drStateSyncRecover, rep.drGetState())
assertStateIDUpdate()

rep.drSwitchToAsync([]uint64{1, 2, 3, 4, 5})
rep.config.DRAutoSync.WaitRecoverTimeout = typeutil.NewDuration(time.Hour)
rep.tickUpdateState()
re.Equal(drStateAsync, rep.drGetState()) // wait recover timeout

rep.config.DRAutoSync.WaitRecoverTimeout = typeutil.NewDuration(0)
setStoreState(cluster, "down", "up", "up", "up", "up", "up")
rep.tickUpdateState()
re.Equal(drStateSyncRecover, rep.drGetState())
Expand Down Expand Up @@ -387,27 +409,27 @@ func TestReplicateState(t *testing.T) {
stateID := rep.drAutoSync.StateID
// replicate after initialized
rep.tickReplicateStatus()
re.Equal(fmt.Sprintf(`{"state":"sync","state_id":%d}`, stateID), replicator.lastData[1])
assertLastData(t, replicator.lastData[1], "sync", stateID, nil)

// repliate state to new member
replicator.memberIDs = append(replicator.memberIDs, 2, 3)
rep.tickReplicateStatus()
re.Equal(fmt.Sprintf(`{"state":"sync","state_id":%d}`, stateID), replicator.lastData[2])
re.Equal(fmt.Sprintf(`{"state":"sync","state_id":%d}`, stateID), replicator.lastData[3])
assertLastData(t, replicator.lastData[2], "sync", stateID, nil)
assertLastData(t, replicator.lastData[3], "sync", stateID, nil)

// inject error
replicator.errors[2] = errors.New("failed to persist")
rep.tickUpdateState() // switch async_wait since there is only one zone
newStateID := rep.drAutoSync.StateID
rep.tickReplicateStatus()
re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,2]}`, newStateID), replicator.lastData[1])
re.Equal(fmt.Sprintf(`{"state":"sync","state_id":%d}`, stateID), replicator.lastData[2])
re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,2]}`, newStateID), replicator.lastData[3])
assertLastData(t, replicator.lastData[1], "async_wait", newStateID, []uint64{1, 2})
assertLastData(t, replicator.lastData[2], "sync", stateID, nil)
assertLastData(t, replicator.lastData[3], "async_wait", newStateID, []uint64{1, 2})

// clear error, replicate to node 2 next time
delete(replicator.errors, 2)
rep.tickReplicateStatus()
re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,2]}`, newStateID), replicator.lastData[2])
assertLastData(t, replicator.lastData[2], "async_wait", newStateID, []uint64{1, 2})
}

func TestAsynctimeout(t *testing.T) {
Expand Down Expand Up @@ -637,7 +659,7 @@ func TestComplexPlacementRules(t *testing.T) {
rep.tickUpdateState()
re.Equal(drStateAsyncWait, rep.drGetState())
rep.tickReplicateStatus()
re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,2,3,4,5,6]}`, rep.drAutoSync.StateID), replicator.lastData[1])
assertLastData(t, replicator.lastData[1], "async_wait", rep.drAutoSync.StateID, []uint64{1, 2, 3, 4, 5, 6})

// reset to sync
setStoreState(cluster, "up", "up", "up", "up", "up", "up", "up", "up", "up", "up")
Expand Down Expand Up @@ -698,7 +720,7 @@ func TestComplexPlacementRules2(t *testing.T) {
rep.tickUpdateState()
re.Equal(drStateAsyncWait, rep.drGetState())
rep.tickReplicateStatus()
re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,2,3,4]}`, rep.drAutoSync.StateID), replicator.lastData[1])
assertLastData(t, replicator.lastData[1], "async_wait", rep.drAutoSync.StateID, []uint64{1, 2, 3, 4})
}

func TestComplexPlacementRules3(t *testing.T) {
Expand Down Expand Up @@ -737,7 +759,7 @@ func TestComplexPlacementRules3(t *testing.T) {
rep.tickUpdateState()
re.Equal(drStateAsyncWait, rep.drGetState())
rep.tickReplicateStatus()
re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,2,3,4]}`, rep.drAutoSync.StateID), replicator.lastData[1])
assertLastData(t, replicator.lastData[1], "async_wait", rep.drAutoSync.StateID, []uint64{1, 2, 3, 4})
}

func genRegions(cluster *mockcluster.Cluster, stateID uint64, n int) []*core.RegionInfo {
Expand Down
15 changes: 8 additions & 7 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -831,13 +831,14 @@ func NormalizeReplicationMode(m string) string {

// DRAutoSyncReplicationConfig is the configuration for auto sync mode between 2 data centers.
type DRAutoSyncReplicationConfig struct {
LabelKey string `toml:"label-key" json:"label-key"`
Primary string `toml:"primary" json:"primary"`
DR string `toml:"dr" json:"dr"`
PrimaryReplicas int `toml:"primary-replicas" json:"primary-replicas"`
DRReplicas int `toml:"dr-replicas" json:"dr-replicas"`
WaitStoreTimeout typeutil.Duration `toml:"wait-store-timeout" json:"wait-store-timeout"`
PauseRegionSplit bool `toml:"pause-region-split" json:"pause-region-split,string"`
LabelKey string `toml:"label-key" json:"label-key"`
Primary string `toml:"primary" json:"primary"`
DR string `toml:"dr" json:"dr"`
PrimaryReplicas int `toml:"primary-replicas" json:"primary-replicas"`
DRReplicas int `toml:"dr-replicas" json:"dr-replicas"`
WaitStoreTimeout typeutil.Duration `toml:"wait-store-timeout" json:"wait-store-timeout"`
WaitRecoverTimeout typeutil.Duration `toml:"wait-recover-timeout" json:"wait-recover-timeout"`
PauseRegionSplit bool `toml:"pause-region-split" json:"pause-region-split,string"`
}

func (c *DRAutoSyncReplicationConfig) adjust(meta *configutil.ConfigMetaData) {
Expand Down
Loading