From 03a927e004c2a38983bb5c32759a41f24184ea82 Mon Sep 17 00:00:00 2001 From: rishabh_mittal Date: Fri, 20 Dec 2024 13:29:43 -0800 Subject: [PATCH 1/4] retry stale read as stale if leader is not accessible Signed-off-by: rishabh_mittal --- internal/locate/region_request3_test.go | 3 --- internal/locate/region_request_state_test.go | 22 ++++++++++++-------- internal/locate/replica_selector.go | 7 +++++-- internal/locate/replica_selector_test.go | 8 +++---- 4 files changed, 22 insertions(+), 18 deletions(-) diff --git a/internal/locate/region_request3_test.go b/internal/locate/region_request3_test.go index eec6e1eaa..8898fea00 100644 --- a/internal/locate/region_request3_test.go +++ b/internal/locate/region_request3_test.go @@ -1412,9 +1412,6 @@ func (s *testRegionRequestToThreeStoresSuite) TestDoNotTryUnreachableLeader() { follower, _, _, _ := region.FollowerStorePeer(regionStore, 0, &storeSelectorOp{}) s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) { - if req.StaleRead && addr == follower.addr { - return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{DataIsNotReady: &errorpb.DataIsNotReady{}}}}, nil - } return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{ Value: []byte(addr), }}, nil diff --git a/internal/locate/region_request_state_test.go b/internal/locate/region_request_state_test.go index a5b7280e4..441cd683f 100644 --- a/internal/locate/region_request_state_test.go +++ b/internal/locate/region_request_state_test.go @@ -17,6 +17,8 @@ package locate import ( "context" "fmt" + "reflect" + "runtime" "strconv" "strings" "sync/atomic" @@ -322,7 +324,7 @@ func TestRegionCacheStaleRead(t *testing.T) { leaderRegionValid: true, leaderAsyncReload: util.Some(true), leaderSuccessReplica: []string{"z2", "z3"}, - leaderSuccessReadType: SuccessFollowerRead, + leaderSuccessReadType: SuccessStaleRead, followerRegionValid: true, followerAsyncReload: util.Some(false), followerSuccessReplica: []string{"z2"}, @@ -333,7 +335,7 @@ func TestRegionCacheStaleRead(t *testing.T) { leaderRegionValid: true, leaderAsyncReload: util.Some(true), leaderSuccessReplica: []string{"z2", "z3"}, - leaderSuccessReadType: SuccessFollowerRead, + leaderSuccessReadType: SuccessStaleRead, followerRegionValid: true, followerAsyncReload: util.None[bool](), followerSuccessReplica: []string{"z2"}, @@ -344,7 +346,7 @@ func TestRegionCacheStaleRead(t *testing.T) { leaderRegionValid: true, leaderAsyncReload: util.Some(true), leaderSuccessReplica: []string{"z2", "z3"}, - leaderSuccessReadType: SuccessFollowerRead, + leaderSuccessReadType: SuccessStaleRead, followerRegionValid: true, followerAsyncReload: util.None[bool](), followerSuccessReplica: []string{"z2"}, @@ -432,11 +434,11 @@ func TestRegionCacheStaleRead(t *testing.T) { leaderRegionValid: true, leaderAsyncReload: util.Some(true), leaderSuccessReplica: []string{"z2", "z3"}, - leaderSuccessReadType: SuccessFollowerRead, + leaderSuccessReadType: SuccessStaleRead, followerRegionValid: true, followerAsyncReload: util.Some(true), followerSuccessReplica: []string{"z2", "z3"}, - followerSuccessReadType: SuccessFollowerRead, + followerSuccessReadType: SuccessStaleRead, }, { do: leaderDown, @@ -445,11 +447,11 @@ func TestRegionCacheStaleRead(t *testing.T) { leaderRegionValid: true, leaderAsyncReload: util.Some(true), leaderSuccessReplica: []string{"z3"}, - leaderSuccessReadType: SuccessFollowerRead, + leaderSuccessReadType: SuccessStaleRead, followerRegionValid: true, followerAsyncReload: util.Some(true), followerSuccessReplica: []string{"z3"}, - followerSuccessReadType: SuccessFollowerRead, + followerSuccessReadType: SuccessStaleRead, }, { do: leaderDown, @@ -458,11 +460,11 @@ func TestRegionCacheStaleRead(t *testing.T) { leaderRegionValid: true, leaderAsyncReload: util.Some(true), leaderSuccessReplica: []string{"z3"}, - leaderSuccessReadType: SuccessFollowerRead, + leaderSuccessReadType: SuccessStaleRead, followerRegionValid: true, followerAsyncReload: util.Some(true), followerSuccessReplica: []string{"z3"}, - followerSuccessReadType: SuccessFollowerRead, + followerSuccessReadType: SuccessStaleRead, }, } tests := []func(*testRegionCacheStaleReadSuite, *RegionCacheTestCase){ @@ -562,6 +564,8 @@ func testStaleRead(s *testRegionCacheStaleReadSuite, r *RegionCacheTestCase, zon } } } else { + funcName := runtime.FuncForPC(reflect.ValueOf(r.do).Pointer()).Name() + fmt.Printf("Function name of r.do(): %s\n", funcName) s.Equal(r.followerSuccessReadType, successReadType) for _, z := range r.followerSuccessReplica { if z == successZone { diff --git a/internal/locate/replica_selector.go b/internal/locate/replica_selector.go index 901592ca3..a086b0a65 100644 --- a/internal/locate/replica_selector.go +++ b/internal/locate/replica_selector.go @@ -307,8 +307,11 @@ func (s *ReplicaSelectMixedStrategy) next(selector *replicaSelector) *replica { func (s *ReplicaSelectMixedStrategy) canSendReplicaRead(selector *replicaSelector) bool { replicas := selector.replicas replica := replicas[s.leaderIdx] - if replica.hasFlag(deadlineErrUsingConfTimeoutFlag) || replica.hasFlag(serverIsBusyFlag) { - // don't overwhelm the leader if it is busy + liveness := replica.store.getLivenessState() + epochStale := replica.isEpochStale() + + if liveness != reachable || epochStale || replica.hasFlag(deadlineErrUsingConfTimeoutFlag) || replica.hasFlag(serverIsBusyFlag) { + // don't overwhelm the leader is busy, timeout, not accessible or stale epoch. return false } return true diff --git a/internal/locate/replica_selector_test.go b/internal/locate/replica_selector_test.go index 372172952..ac7f567bf 100644 --- a/internal/locate/replica_selector_test.go +++ b/internal/locate/replica_selector_test.go @@ -898,8 +898,8 @@ func TestReplicaReadAccessPathByCase2(t *testing.T) { expect: &accessPathResult{ accessPath: []string{ "{addr: store1, replica-read: false, stale-read: true}", - "{addr: store2, replica-read: true, stale-read: false}", - "{addr: store3, replica-read: true, stale-read: false}"}, + "{addr: store2, replica-read: false, stale-read: true}", + "{addr: store3, replica-read: false, stale-read: true}"}, respErr: "", respRegionError: fakeEpochNotMatch, backoffCnt: 3, @@ -2125,7 +2125,7 @@ func TestReplicaReadAccessPathByStaleReadCase(t *testing.T) { accessPath: []string{ "{addr: store2, replica-read: false, stale-read: true}", "{addr: store1, replica-read: false, stale-read: false}", - "{addr: store3, replica-read: true, stale-read: false}", + "{addr: store3, replica-read: false, stale-read: true}", }, respErr: "", respRegionError: nil, @@ -2144,7 +2144,7 @@ func TestReplicaReadAccessPathByStaleReadCase(t *testing.T) { beforeRun: func() { /* don't resetStoreState */ }, expect: &accessPathResult{ accessPath: []string{ - "{addr: store3, replica-read: true, stale-read: false}", + "{addr: store3, replica-read: false, stale-read: true}", }, respErr: "", respRegionError: fakeEpochNotMatch, From b8beeb90f53bc4973857ad8d56802dd8e88a8616 Mon Sep 17 00:00:00 2001 From: rishabh_mittal Date: Fri, 20 Dec 2024 14:13:17 -0800 Subject: [PATCH 2/4] try replica read if leader is exhausted Signed-off-by: rishabh_mittal --- internal/locate/region_request_state_test.go | 18 ++++++++++-------- internal/locate/replica_selector.go | 7 +++---- internal/locate/replica_selector_test.go | 6 +++--- 3 files changed, 16 insertions(+), 15 deletions(-) diff --git a/internal/locate/region_request_state_test.go b/internal/locate/region_request_state_test.go index 441cd683f..58ac94563 100644 --- a/internal/locate/region_request_state_test.go +++ b/internal/locate/region_request_state_test.go @@ -324,7 +324,7 @@ func TestRegionCacheStaleRead(t *testing.T) { leaderRegionValid: true, leaderAsyncReload: util.Some(true), leaderSuccessReplica: []string{"z2", "z3"}, - leaderSuccessReadType: SuccessStaleRead, + leaderSuccessReadType: SuccessFollowerRead, followerRegionValid: true, followerAsyncReload: util.Some(false), followerSuccessReplica: []string{"z2"}, @@ -346,7 +346,7 @@ func TestRegionCacheStaleRead(t *testing.T) { leaderRegionValid: true, leaderAsyncReload: util.Some(true), leaderSuccessReplica: []string{"z2", "z3"}, - leaderSuccessReadType: SuccessStaleRead, + leaderSuccessReadType: SuccessFollowerRead, followerRegionValid: true, followerAsyncReload: util.None[bool](), followerSuccessReplica: []string{"z2"}, @@ -434,11 +434,11 @@ func TestRegionCacheStaleRead(t *testing.T) { leaderRegionValid: true, leaderAsyncReload: util.Some(true), leaderSuccessReplica: []string{"z2", "z3"}, - leaderSuccessReadType: SuccessStaleRead, + leaderSuccessReadType: SuccessFollowerRead, followerRegionValid: true, followerAsyncReload: util.Some(true), followerSuccessReplica: []string{"z2", "z3"}, - followerSuccessReadType: SuccessStaleRead, + followerSuccessReadType: SuccessFollowerRead, }, { do: leaderDown, @@ -447,11 +447,11 @@ func TestRegionCacheStaleRead(t *testing.T) { leaderRegionValid: true, leaderAsyncReload: util.Some(true), leaderSuccessReplica: []string{"z3"}, - leaderSuccessReadType: SuccessStaleRead, + leaderSuccessReadType: SuccessFollowerRead, followerRegionValid: true, followerAsyncReload: util.Some(true), followerSuccessReplica: []string{"z3"}, - followerSuccessReadType: SuccessStaleRead, + followerSuccessReadType: SuccessFollowerRead, }, { do: leaderDown, @@ -460,11 +460,11 @@ func TestRegionCacheStaleRead(t *testing.T) { leaderRegionValid: true, leaderAsyncReload: util.Some(true), leaderSuccessReplica: []string{"z3"}, - leaderSuccessReadType: SuccessStaleRead, + leaderSuccessReadType: SuccessFollowerRead, followerRegionValid: true, followerAsyncReload: util.Some(true), followerSuccessReplica: []string{"z3"}, - followerSuccessReadType: SuccessStaleRead, + followerSuccessReadType: SuccessFollowerRead, }, } tests := []func(*testRegionCacheStaleReadSuite, *RegionCacheTestCase){ @@ -556,6 +556,8 @@ func testStaleRead(s *testRegionCacheStaleReadSuite, r *RegionCacheTestCase, zon _, successZone, successReadType := s.extractResp(resp) find := false if leaderZone { + funcName := runtime.FuncForPC(reflect.ValueOf(r.do).Pointer()).Name() + fmt.Printf("Function name of r.do(): %s\n", funcName) s.Equal(r.leaderSuccessReadType, successReadType, msg) for _, z := range r.leaderSuccessReplica { if z == successZone { diff --git a/internal/locate/replica_selector.go b/internal/locate/replica_selector.go index a086b0a65..e90a52b89 100644 --- a/internal/locate/replica_selector.go +++ b/internal/locate/replica_selector.go @@ -307,11 +307,10 @@ func (s *ReplicaSelectMixedStrategy) next(selector *replicaSelector) *replica { func (s *ReplicaSelectMixedStrategy) canSendReplicaRead(selector *replicaSelector) bool { replicas := selector.replicas replica := replicas[s.leaderIdx] - liveness := replica.store.getLivenessState() - epochStale := replica.isEpochStale() - if liveness != reachable || epochStale || replica.hasFlag(deadlineErrUsingConfTimeoutFlag) || replica.hasFlag(serverIsBusyFlag) { - // don't overwhelm the leader is busy, timeout, not accessible or stale epoch. + maxAttempt := 1 + if !replica.isExhausted(maxAttempt, 0) || replica.hasFlag(deadlineErrUsingConfTimeoutFlag) || replica.hasFlag(serverIsBusyFlag) { + // don't do the replica read if leader is not exhausted or leader has timeout or server busy error. return false } return true diff --git a/internal/locate/replica_selector_test.go b/internal/locate/replica_selector_test.go index ac7f567bf..980a4cf3a 100644 --- a/internal/locate/replica_selector_test.go +++ b/internal/locate/replica_selector_test.go @@ -898,8 +898,8 @@ func TestReplicaReadAccessPathByCase2(t *testing.T) { expect: &accessPathResult{ accessPath: []string{ "{addr: store1, replica-read: false, stale-read: true}", - "{addr: store2, replica-read: false, stale-read: true}", - "{addr: store3, replica-read: false, stale-read: true}"}, + "{addr: store2, replica-read: true, stale-read: false}", + "{addr: store3, replica-read: true, stale-read: false}"}, respErr: "", respRegionError: fakeEpochNotMatch, backoffCnt: 3, @@ -2125,7 +2125,7 @@ func TestReplicaReadAccessPathByStaleReadCase(t *testing.T) { accessPath: []string{ "{addr: store2, replica-read: false, stale-read: true}", "{addr: store1, replica-read: false, stale-read: false}", - "{addr: store3, replica-read: false, stale-read: true}", + "{addr: store3, replica-read: true, stale-read: false}", }, respErr: "", respRegionError: nil, From ade4add6202f5579a626a577bd4472f999a624ab Mon Sep 17 00:00:00 2001 From: mittalrishabh Date: Fri, 20 Dec 2024 14:15:34 -0800 Subject: [PATCH 3/4] Update region_request_state_test.go Signed-off-by: rishabh_mittal --- internal/locate/region_request_state_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/internal/locate/region_request_state_test.go b/internal/locate/region_request_state_test.go index 58ac94563..efc26cf53 100644 --- a/internal/locate/region_request_state_test.go +++ b/internal/locate/region_request_state_test.go @@ -17,8 +17,6 @@ package locate import ( "context" "fmt" - "reflect" - "runtime" "strconv" "strings" "sync/atomic" From 53cd393455adb71134375bda908f2dd66f236ad8 Mon Sep 17 00:00:00 2001 From: mittalrishabh Date: Fri, 20 Dec 2024 14:16:05 -0800 Subject: [PATCH 4/4] Update region_request_state_test.go Signed-off-by: rishabh_mittal --- internal/locate/region_request_state_test.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/internal/locate/region_request_state_test.go b/internal/locate/region_request_state_test.go index efc26cf53..96b5932f6 100644 --- a/internal/locate/region_request_state_test.go +++ b/internal/locate/region_request_state_test.go @@ -554,8 +554,6 @@ func testStaleRead(s *testRegionCacheStaleReadSuite, r *RegionCacheTestCase, zon _, successZone, successReadType := s.extractResp(resp) find := false if leaderZone { - funcName := runtime.FuncForPC(reflect.ValueOf(r.do).Pointer()).Name() - fmt.Printf("Function name of r.do(): %s\n", funcName) s.Equal(r.leaderSuccessReadType, successReadType, msg) for _, z := range r.leaderSuccessReplica { if z == successZone { @@ -564,8 +562,6 @@ func testStaleRead(s *testRegionCacheStaleReadSuite, r *RegionCacheTestCase, zon } } } else { - funcName := runtime.FuncForPC(reflect.ValueOf(r.do).Pointer()).Name() - fmt.Printf("Function name of r.do(): %s\n", funcName) s.Equal(r.followerSuccessReadType, successReadType) for _, z := range r.followerSuccessReplica { if z == successZone {