From 261b2d06a3b5eeb274ab276efd9ebc2daa5cf4af Mon Sep 17 00:00:00 2001 From: Cabinfever_B Date: Wed, 6 Dec 2023 21:15:23 +0800 Subject: [PATCH 01/18] allow pd follower handle region api Signed-off-by: Cabinfever_B --- go.mod | 12 +++++++----- go.sum | 28 ++++++++++++++++------------ internal/locate/region_cache.go | 11 ++++++----- 3 files changed, 29 insertions(+), 22 deletions(-) diff --git a/go.mod b/go.mod index 87655fb89c..23e5b869b8 100644 --- a/go.mod +++ b/go.mod @@ -50,14 +50,16 @@ require ( go.etcd.io/etcd/client/pkg/v3 v3.5.10 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/exp v0.0.0-20230711005742-c3f37128e5a4 // indirect - golang.org/x/net v0.17.0 // indirect - golang.org/x/sys v0.13.0 // indirect - golang.org/x/text v0.13.0 // indirect + golang.org/x/net v0.18.0 // indirect + golang.org/x/sys v0.14.0 // indirect + golang.org/x/text v0.14.0 // indirect golang.org/x/tools v0.9.1 // indirect - google.golang.org/genproto v0.0.0-20231016165738-49dd2c1f3d0b // indirect + google.golang.org/genproto v0.0.0-20231030173426-d783a09b4405 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20231016165738-49dd2c1f3d0b // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20231016165738-49dd2c1f3d0b // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17 // indirect google.golang.org/protobuf v1.31.0 // indirect gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) + +replace github.com/tikv/pd/client => github.com/CabinfeverB/pd/client v0.0.0-20231206020402-01a983b1833b diff --git a/go.sum b/go.sum index 39e069e2c8..28b1ec8096 100644 --- a/go.sum +++ b/go.sum @@ -1,9 +1,13 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/CabinfeverB/pd/client v0.0.0-20231206020402-01a983b1833b h1:mQkSZcqC/FOpOgehqdcTCqMQSTJVfVfJy6eHU3AOKwE= +github.com/CabinfeverB/pd/client v0.0.0-20231206020402-01a983b1833b/go.mod h1:wRct10Q+PjUL5IMBalxzK49MVpVHyT7OZVYzDKW13Rw= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/cakturk/go-netstat v0.0.0-20200220111822-e5b49efee7a5 h1:BjkPE3785EwPhhyuFkbINB+2a1xATwk8SNDWnJiD41g= +github.com/cakturk/go-netstat v0.0.0-20200220111822-e5b49efee7a5/go.mod h1:jtAfVaU/2cu1+wdSRPWE2c1N2qeAA3K4RH9pYgqwets= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= @@ -112,8 +116,6 @@ github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a h1:J/YdBZ46WKpXsxsW93SG+q0F8KI+yFrcIDT4c/RNoc4= github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM= -github.com/tikv/pd/client v0.0.0-20231204034622-259435d93ae2 h1:7fnKwFC9pgiOolvnUnquEAb60liIpna+0hFRkopaOSg= -github.com/tikv/pd/client v0.0.0-20231204034622-259435d93ae2/go.mod h1:cd6zBqRM9aogxf26K8NnFRPVtq9BnRE59tKEpX8IaWQ= github.com/twmb/murmur3 v1.1.3 h1:D83U0XYKcHRYwYIpBKf3Pks91Z0Byda/9SJ8B6EMRcA= github.com/twmb/murmur3 v1.1.3/go.mod h1:Qq/R7NUyOfr65zD+6Q5IHKsJLwP7exErjN6lyyq3OSQ= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= @@ -162,8 +164,8 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= -golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= +golang.org/x/net v0.18.0 h1:mIYleuAkSbHh0tCv7RvjL3F6ZVbLjq4+R7zbOn3Kokg= +golang.org/x/net v0.18.0/go.mod h1:/czyP5RqHAH4odGYxBJ1qz0+CE5WZ+2j1YgoEo8F2jQ= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -178,12 +180,12 @@ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= -golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.14.0 h1:Vz7Qs629MkJkGyHxUlRHizWJRG2j8fbQKjELVSNhy7Q= +golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= -golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= @@ -204,15 +206,17 @@ google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9Ywl google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= -google.golang.org/genproto v0.0.0-20231016165738-49dd2c1f3d0b h1:+YaDE2r2OG8t/z5qmsh7Y+XXwCbvadxxZ0YY6mTdrVA= -google.golang.org/genproto v0.0.0-20231016165738-49dd2c1f3d0b/go.mod h1:CgAqfJo+Xmu0GwA0411Ht3OU3OntXwsGmrmjI8ioGXI= +google.golang.org/genproto v0.0.0-20231030173426-d783a09b4405 h1:I6WNifs6pF9tNdSob2W24JtyxIYjzFB9qDlpUC76q+U= +google.golang.org/genproto v0.0.0-20231030173426-d783a09b4405/go.mod h1:3WDQMjmJk36UQhjQ89emUzb1mdaHcPeeAh4SCBKznB4= google.golang.org/genproto/googleapis/api v0.0.0-20231016165738-49dd2c1f3d0b h1:CIC2YMXmIhYw6evmhPxBKJ4fmLbOFtXQN/GV3XOZR8k= google.golang.org/genproto/googleapis/api v0.0.0-20231016165738-49dd2c1f3d0b/go.mod h1:IBQ646DjkDkvUIsVq/cc03FUFQ9wbZu7yE396YcL870= -google.golang.org/genproto/googleapis/rpc v0.0.0-20231016165738-49dd2c1f3d0b h1:ZlWIi1wSK56/8hn4QcBp/j9M7Gt3U/3hZw3mC7vDICo= -google.golang.org/genproto/googleapis/rpc v0.0.0-20231016165738-49dd2c1f3d0b/go.mod h1:swOH3j0KzcDDgGUWr+SNpyTen5YrXjS3eyPzFYKc6lc= +google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17 h1:Jyp0Hsi0bmHXG6k9eATXoYtjd6e2UzZ1SCn/wIupY14= +google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17/go.mod h1:oQ5rr10WTTMvP4A36n8JpR1OrO1BEiV4f78CneXZxkA= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.59.0 h1:Z5Iec2pjwb+LEOqzpB2MR12/eKFhDPhuqW91O+4bwUk= google.golang.org/grpc v1.59.0/go.mod h1:aUPDwccQo6OTjy7Hct4AfBPD1GptF4fyUjIkQ9YtF98= +google.golang.org/grpc/examples v0.0.0-20230419000256-16651f60ddc5 h1:FjbWL/mGfyRQNxjagfT1chiHL1569WEA/OGH0ZIzGcI= +google.golang.org/grpc/examples v0.0.0-20230419000256-16651f60ddc5/go.mod h1:5av8LiY5jU2KRcrX+SHtvLHnaOpPJ7gzWStBurgHlqY= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index 7305fe8203..5b58ce0f3c 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -1099,7 +1099,7 @@ func (c *RegionCache) findRegionByKey(bo *retry.Backoffer, key []byte, isEndKey r = c.searchCachedRegion(key, isEndKey) if r == nil { // load region when it is not exists or expired. - lr, err := c.loadRegion(bo, key, isEndKey) + lr, err := c.loadRegion(bo, key, isEndKey, pd.WithAllowFollowerHandle()) if err != nil { // no region data, return error if failure. return nil, err @@ -1631,7 +1631,7 @@ func filterUnavailablePeers(region *pd.Region) { // loadRegion loads region from pd client, and picks the first peer as leader. // If the given key is the end key of the region that you want, you may set the second argument to true. This is useful // when processing in reverse order. -func (c *RegionCache) loadRegion(bo *retry.Backoffer, key []byte, isEndKey bool) (*Region, error) { +func (c *RegionCache) loadRegion(bo *retry.Backoffer, key []byte, isEndKey bool, opts ...pd.GetRegionOption) (*Region, error) { ctx := bo.GetCtx() if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { span1 := span.Tracer().StartSpan("loadRegion", opentracing.ChildOf(span.Context())) @@ -1641,6 +1641,7 @@ func (c *RegionCache) loadRegion(bo *retry.Backoffer, key []byte, isEndKey bool) var backoffErr error searchPrev := false + opts = append(opts, pd.WithBuckets()) for { if backoffErr != nil { err := bo.Backoff(retry.BoPDRPC, backoffErr) @@ -1652,9 +1653,9 @@ func (c *RegionCache) loadRegion(bo *retry.Backoffer, key []byte, isEndKey bool) var reg *pd.Region var err error if searchPrev { - reg, err = c.pdClient.GetPrevRegion(ctx, key, pd.WithBuckets()) + reg, err = c.pdClient.GetPrevRegion(ctx, key, opts...) } else { - reg, err = c.pdClient.GetRegion(ctx, key, pd.WithBuckets()) + reg, err = c.pdClient.GetRegion(ctx, key, opts...) } metrics.LoadRegionCacheHistogramWhenCacheMiss.Observe(time.Since(start).Seconds()) if err != nil { @@ -1806,7 +1807,7 @@ func (c *RegionCache) scanRegions(bo *retry.Backoffer, startKey, endKey []byte, } } start := time.Now() - regionsInfo, err := c.pdClient.ScanRegions(ctx, startKey, endKey, limit) + regionsInfo, err := c.pdClient.ScanRegions(ctx, startKey, endKey, limit, pd.WithAllowFollowerHandle()) metrics.LoadRegionCacheHistogramWithRegions.Observe(time.Since(start).Seconds()) if err != nil { if apicodec.IsDecodeError(err) { From 0fd9d6632691ef2ab76d124fa31e6d03fb49a05d Mon Sep 17 00:00:00 2001 From: Cabinfever_B Date: Thu, 14 Dec 2023 13:59:41 +0800 Subject: [PATCH 02/18] address comment Signed-off-by: Cabinfever_B --- internal/locate/region_cache.go | 65 +++++++++++++++++++++----- internal/locate/region_cache_test.go | 2 + internal/mockstore/mocktikv/cluster.go | 14 +++++- 3 files changed, 69 insertions(+), 12 deletions(-) diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index 5b58ce0f3c..39ced34bb2 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -552,8 +552,8 @@ func (c *RegionCache) clear() { } // thread unsafe, should use with lock -func (c *RegionCache) insertRegionToCache(cachedRegion *Region, invalidateOldRegion bool) { - c.mu.insertRegionToCache(cachedRegion, invalidateOldRegion) +func (c *RegionCache) insertRegionToCache(cachedRegion *Region, invalidateOldRegion bool) bool { + return c.mu.insertRegionToCache(cachedRegion, invalidateOldRegion) } // Close releases region cache's resource. @@ -1107,8 +1107,20 @@ func (c *RegionCache) findRegionByKey(bo *retry.Backoffer, key []byte, isEndKey logutil.Eventf(bo.GetCtx(), "load region %d from pd, due to cache-miss", lr.GetID()) r = lr c.mu.Lock() - c.insertRegionToCache(r, true) + stale := c.insertRegionToCache(r, true) c.mu.Unlock() + // just retry once, it won't bring much overhead. + if stale { + lr, err = c.loadRegion(bo, key, isEndKey) + if err != nil { + // no region data, return error if failure. + return nil, err + } + r = lr + c.mu.Lock() + c.insertRegionToCache(r, true) + c.mu.Unlock() + } } else if r.checkNeedReloadAndMarkUpdated() { // load region when it be marked as need reload. lr, err := c.loadRegion(bo, key, isEndKey) @@ -1485,7 +1497,32 @@ func (mu *regionIndexMu) removeVersionFromCache(oldVer RegionVerID, regionID uin // It should be protected by c.mu.l.Lock(). // if `invalidateOldRegion` is false, the old region cache should be still valid, // and it may still be used by some kv requests. -func (mu *regionIndexMu) insertRegionToCache(cachedRegion *Region, invalidateOldRegion bool) { +// Moreover, it will return whether the region is fresh. +func (mu *regionIndexMu) insertRegionToCache(cachedRegion *Region, invalidateOldRegion bool) bool { + newVer := cachedRegion.VerID() + latest, ok := mu.latestVersions[cachedRegion.VerID().id] + // There are two or more situations in which the region we got is stale. + // The first case is that the process of getting a region is concurrent. + // The stale region may be returned later due to network reasons. + // The second case is that the region may be obtained from the PD follower, + // and there is the synchronization time between the pd follower and the leader. + // So we should check the epoch. + if ok && (latest.GetVer() > newVer.GetVer() || latest.GetConfVer() > newVer.GetConfVer()) { + logutil.BgLogger().Debug("get stale region", + zap.Uint64("region", newVer.GetID()), zap.Uint64("ver", newVer.GetVer()), zap.Uint64("conf", newVer.GetConfVer()), + zap.Uint64("lastest-ver", latest.GetVer()), zap.Uint64("lastest-conf", latest.GetConfVer())) + return false + } + // Also check the intersecting regions. + intersectedRegions := mu.sorted.removeIntersecting(cachedRegion) + for _, region := range intersectedRegions { + if region.cachedRegion.meta.GetRegionEpoch().GetVersion() > newVer.GetVer() { + logutil.BgLogger().Debug("get stale region", + zap.Uint64("region", newVer.GetID()), zap.Uint64("ver", newVer.GetVer()), zap.Uint64("conf", newVer.GetConfVer()), + zap.Uint64("intersecting-ver", region.cachedRegion.meta.GetRegionEpoch().GetVersion())) + return false + } + } oldRegion := mu.sorted.ReplaceOrInsert(cachedRegion) if oldRegion != nil { store := cachedRegion.getStore() @@ -1513,21 +1550,27 @@ func (mu *regionIndexMu) insertRegionToCache(cachedRegion *Region, invalidateOld if store.buckets == nil || (oldRegionStore.buckets != nil && store.buckets.GetVersion() < oldRegionStore.buckets.GetVersion()) { store.buckets = oldRegionStore.buckets } - mu.removeVersionFromCache(oldRegion.VerID(), cachedRegion.VerID().id) + // Only delete when IDs are different, because we will update right away. + if cachedRegion.VerID().id != oldRegion.VerID().id { + mu.removeVersionFromCache(oldRegion.VerID(), cachedRegion.VerID().id) + mu.removeVersionFromCache(oldRegion.VerID(), cachedRegion.VerID().id) + mu.removeVersionFromCache(oldRegion.VerID(), cachedRegion.VerID().id) + } } + // update related vars. mu.regions[cachedRegion.VerID()] = cachedRegion - newVer := cachedRegion.VerID() - latest, ok := mu.latestVersions[cachedRegion.VerID().id] + latest, ok = mu.latestVersions[cachedRegion.VerID().id] if !ok || latest.GetVer() < newVer.GetVer() || latest.GetConfVer() < newVer.GetConfVer() { mu.latestVersions[cachedRegion.VerID().id] = newVer } // The intersecting regions in the cache are probably stale, clear them. - deleted := mu.sorted.removeIntersecting(cachedRegion) - for _, region := range deleted { + for _, region := range intersectedRegions { mu.removeVersionFromCache(region.cachedRegion.VerID(), region.cachedRegion.GetID()) } + return true +} -} // searchCachedRegion finds a region from cache by key. Like `getCachedRegion`, +// searchCachedRegion finds a region from cache by key. Like `getCachedRegion`, // it should be called with c.mu.RLock(), and the returned Region should not be // used after c.mu is RUnlock(). // If the given key is the end key of the region that you want, you may set the second argument to true. This is useful @@ -1555,7 +1598,7 @@ func (c *RegionCache) getRegionByIDFromCache(regionID uint64) *Region { } latestRegion, ok := c.mu.regions[ver] if !ok { - // should not happen + // Should not happen. If happned, maybe logutil.BgLogger().Warn("region version not found", zap.Uint64("regionID", regionID), zap.Stringer("version", &ver)) return nil diff --git a/internal/locate/region_cache_test.go b/internal/locate/region_cache_test.go index f583dc7cfb..6124009c6e 100644 --- a/internal/locate/region_cache_test.go +++ b/internal/locate/region_cache_test.go @@ -1567,6 +1567,8 @@ func (s *testRegionCacheSuite) TestBuckets() { // update buckets if it's nil. cachedRegion.getStore().buckets = nil + // we should replace the version of `cacheRegion` because of stale. + s.cluster.PutRegion(r.GetId(), newMeta.RegionEpoch.ConfVer, newMeta.RegionEpoch.Version, []uint64{s.store1, s.store2}, []uint64{s.peer1, s.peer2}, s.peer1) s.cluster.SplitRegionBuckets(cachedRegion.GetID(), defaultBuckets.Keys, defaultBuckets.Version) s.cache.UpdateBucketsIfNeeded(cachedRegion.VerID(), defaultBuckets.GetVersion()) waitUpdateBuckets(defaultBuckets, []byte("a")) diff --git a/internal/mockstore/mocktikv/cluster.go b/internal/mockstore/mocktikv/cluster.go index e46b02d3da..7b5f1cc3f9 100644 --- a/internal/mockstore/mocktikv/cluster.go +++ b/internal/mockstore/mocktikv/cluster.go @@ -413,6 +413,14 @@ func (c *Cluster) Bootstrap(regionID uint64, storeIDs, peerIDs []uint64, leaderP c.regions[regionID] = newRegion(regionID, storeIDs, peerIDs, leaderPeerID) } +// PutRegion adds or replaces a region. +func (c *Cluster) PutRegion(regionID, confVer, ver uint64, storeIDs, peerIDs []uint64, leaderPeerID uint64) { + c.Lock() + defer c.Unlock() + + c.regions[regionID] = newRegion(regionID, storeIDs, peerIDs, leaderPeerID, confVer, ver) +} + // AddPeer adds a new Peer for the Region on the Store. func (c *Cluster) AddPeer(regionID, storeID, peerID uint64) { c.Lock() @@ -634,7 +642,7 @@ func newPeerMeta(peerID, storeID uint64) *metapb.Peer { } } -func newRegion(regionID uint64, storeIDs, peerIDs []uint64, leaderPeerID uint64) *Region { +func newRegion(regionID uint64, storeIDs, peerIDs []uint64, leaderPeerID uint64, epoch ...uint64) *Region { if len(storeIDs) != len(peerIDs) { panic("len(storeIDs) != len(peerIds)") } @@ -647,6 +655,10 @@ func newRegion(regionID uint64, storeIDs, peerIDs []uint64, leaderPeerID uint64) Peers: peers, RegionEpoch: &metapb.RegionEpoch{}, } + if len(epoch) == 2 { + meta.RegionEpoch.ConfVer = epoch[0] + meta.RegionEpoch.Version = epoch[1] + } return &Region{ Meta: meta, leader: leaderPeerID, From 51b7793e9dab1d20670e16a5a91ce19eac65d4ad Mon Sep 17 00:00:00 2001 From: Cabinfever_B Date: Thu, 14 Dec 2023 15:50:58 +0800 Subject: [PATCH 03/18] address comment Signed-off-by: Cabinfever_B --- internal/locate/region_cache.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index 39ced34bb2..2f9e4fbe3d 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -1497,7 +1497,7 @@ func (mu *regionIndexMu) removeVersionFromCache(oldVer RegionVerID, regionID uin // It should be protected by c.mu.l.Lock(). // if `invalidateOldRegion` is false, the old region cache should be still valid, // and it may still be used by some kv requests. -// Moreover, it will return whether the region is fresh. +// Moreover, it will return whether the region is stale. func (mu *regionIndexMu) insertRegionToCache(cachedRegion *Region, invalidateOldRegion bool) bool { newVer := cachedRegion.VerID() latest, ok := mu.latestVersions[cachedRegion.VerID().id] @@ -1511,7 +1511,7 @@ func (mu *regionIndexMu) insertRegionToCache(cachedRegion *Region, invalidateOld logutil.BgLogger().Debug("get stale region", zap.Uint64("region", newVer.GetID()), zap.Uint64("ver", newVer.GetVer()), zap.Uint64("conf", newVer.GetConfVer()), zap.Uint64("lastest-ver", latest.GetVer()), zap.Uint64("lastest-conf", latest.GetConfVer())) - return false + return true } // Also check the intersecting regions. intersectedRegions := mu.sorted.removeIntersecting(cachedRegion) @@ -1520,7 +1520,7 @@ func (mu *regionIndexMu) insertRegionToCache(cachedRegion *Region, invalidateOld logutil.BgLogger().Debug("get stale region", zap.Uint64("region", newVer.GetID()), zap.Uint64("ver", newVer.GetVer()), zap.Uint64("conf", newVer.GetConfVer()), zap.Uint64("intersecting-ver", region.cachedRegion.meta.GetRegionEpoch().GetVersion())) - return false + return true } } oldRegion := mu.sorted.ReplaceOrInsert(cachedRegion) @@ -1567,7 +1567,7 @@ func (mu *regionIndexMu) insertRegionToCache(cachedRegion *Region, invalidateOld for _, region := range intersectedRegions { mu.removeVersionFromCache(region.cachedRegion.VerID(), region.cachedRegion.GetID()) } - return true + return false } // searchCachedRegion finds a region from cache by key. Like `getCachedRegion`, From b7f61d3dd49d070d7907a6151cd79b662137dcf2 Mon Sep 17 00:00:00 2001 From: Cabinfever_B Date: Thu, 14 Dec 2023 19:57:52 +0800 Subject: [PATCH 04/18] add test Signed-off-by: Cabinfever_B no update Signed-off-by: Cabinfever_B fix test Signed-off-by: Cabinfever_B fix test Signed-off-by: Cabinfever_B fix test Signed-off-by: Cabinfever_B --- go.mod | 12 ++-- go.sum | 28 ++++----- internal/locate/region_cache_test.go | 78 +++++++++++++++++++++++++- internal/mockstore/mocktikv/cluster.go | 4 ++ internal/mockstore/mocktikv/pd.go | 23 +++++++- 5 files changed, 120 insertions(+), 25 deletions(-) diff --git a/go.mod b/go.mod index 23e5b869b8..87655fb89c 100644 --- a/go.mod +++ b/go.mod @@ -50,16 +50,14 @@ require ( go.etcd.io/etcd/client/pkg/v3 v3.5.10 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/exp v0.0.0-20230711005742-c3f37128e5a4 // indirect - golang.org/x/net v0.18.0 // indirect - golang.org/x/sys v0.14.0 // indirect - golang.org/x/text v0.14.0 // indirect + golang.org/x/net v0.17.0 // indirect + golang.org/x/sys v0.13.0 // indirect + golang.org/x/text v0.13.0 // indirect golang.org/x/tools v0.9.1 // indirect - google.golang.org/genproto v0.0.0-20231030173426-d783a09b4405 // indirect + google.golang.org/genproto v0.0.0-20231016165738-49dd2c1f3d0b // indirect google.golang.org/genproto/googleapis/api v0.0.0-20231016165738-49dd2c1f3d0b // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20231016165738-49dd2c1f3d0b // indirect google.golang.org/protobuf v1.31.0 // indirect gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) - -replace github.com/tikv/pd/client => github.com/CabinfeverB/pd/client v0.0.0-20231206020402-01a983b1833b diff --git a/go.sum b/go.sum index 28b1ec8096..39e069e2c8 100644 --- a/go.sum +++ b/go.sum @@ -1,13 +1,9 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= -github.com/CabinfeverB/pd/client v0.0.0-20231206020402-01a983b1833b h1:mQkSZcqC/FOpOgehqdcTCqMQSTJVfVfJy6eHU3AOKwE= -github.com/CabinfeverB/pd/client v0.0.0-20231206020402-01a983b1833b/go.mod h1:wRct10Q+PjUL5IMBalxzK49MVpVHyT7OZVYzDKW13Rw= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= -github.com/cakturk/go-netstat v0.0.0-20200220111822-e5b49efee7a5 h1:BjkPE3785EwPhhyuFkbINB+2a1xATwk8SNDWnJiD41g= -github.com/cakturk/go-netstat v0.0.0-20200220111822-e5b49efee7a5/go.mod h1:jtAfVaU/2cu1+wdSRPWE2c1N2qeAA3K4RH9pYgqwets= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= @@ -116,6 +112,8 @@ github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a h1:J/YdBZ46WKpXsxsW93SG+q0F8KI+yFrcIDT4c/RNoc4= github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM= +github.com/tikv/pd/client v0.0.0-20231204034622-259435d93ae2 h1:7fnKwFC9pgiOolvnUnquEAb60liIpna+0hFRkopaOSg= +github.com/tikv/pd/client v0.0.0-20231204034622-259435d93ae2/go.mod h1:cd6zBqRM9aogxf26K8NnFRPVtq9BnRE59tKEpX8IaWQ= github.com/twmb/murmur3 v1.1.3 h1:D83U0XYKcHRYwYIpBKf3Pks91Z0Byda/9SJ8B6EMRcA= github.com/twmb/murmur3 v1.1.3/go.mod h1:Qq/R7NUyOfr65zD+6Q5IHKsJLwP7exErjN6lyyq3OSQ= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= @@ -164,8 +162,8 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.18.0 h1:mIYleuAkSbHh0tCv7RvjL3F6ZVbLjq4+R7zbOn3Kokg= -golang.org/x/net v0.18.0/go.mod h1:/czyP5RqHAH4odGYxBJ1qz0+CE5WZ+2j1YgoEo8F2jQ= +golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= +golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -180,12 +178,12 @@ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.14.0 h1:Vz7Qs629MkJkGyHxUlRHizWJRG2j8fbQKjELVSNhy7Q= -golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= +golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= -golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= +golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= @@ -206,17 +204,15 @@ google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9Ywl google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= -google.golang.org/genproto v0.0.0-20231030173426-d783a09b4405 h1:I6WNifs6pF9tNdSob2W24JtyxIYjzFB9qDlpUC76q+U= -google.golang.org/genproto v0.0.0-20231030173426-d783a09b4405/go.mod h1:3WDQMjmJk36UQhjQ89emUzb1mdaHcPeeAh4SCBKznB4= +google.golang.org/genproto v0.0.0-20231016165738-49dd2c1f3d0b h1:+YaDE2r2OG8t/z5qmsh7Y+XXwCbvadxxZ0YY6mTdrVA= +google.golang.org/genproto v0.0.0-20231016165738-49dd2c1f3d0b/go.mod h1:CgAqfJo+Xmu0GwA0411Ht3OU3OntXwsGmrmjI8ioGXI= google.golang.org/genproto/googleapis/api v0.0.0-20231016165738-49dd2c1f3d0b h1:CIC2YMXmIhYw6evmhPxBKJ4fmLbOFtXQN/GV3XOZR8k= google.golang.org/genproto/googleapis/api v0.0.0-20231016165738-49dd2c1f3d0b/go.mod h1:IBQ646DjkDkvUIsVq/cc03FUFQ9wbZu7yE396YcL870= -google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17 h1:Jyp0Hsi0bmHXG6k9eATXoYtjd6e2UzZ1SCn/wIupY14= -google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17/go.mod h1:oQ5rr10WTTMvP4A36n8JpR1OrO1BEiV4f78CneXZxkA= +google.golang.org/genproto/googleapis/rpc v0.0.0-20231016165738-49dd2c1f3d0b h1:ZlWIi1wSK56/8hn4QcBp/j9M7Gt3U/3hZw3mC7vDICo= +google.golang.org/genproto/googleapis/rpc v0.0.0-20231016165738-49dd2c1f3d0b/go.mod h1:swOH3j0KzcDDgGUWr+SNpyTen5YrXjS3eyPzFYKc6lc= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.59.0 h1:Z5Iec2pjwb+LEOqzpB2MR12/eKFhDPhuqW91O+4bwUk= google.golang.org/grpc v1.59.0/go.mod h1:aUPDwccQo6OTjy7Hct4AfBPD1GptF4fyUjIkQ9YtF98= -google.golang.org/grpc/examples v0.0.0-20230419000256-16651f60ddc5 h1:FjbWL/mGfyRQNxjagfT1chiHL1569WEA/OGH0ZIzGcI= -google.golang.org/grpc/examples v0.0.0-20230419000256-16651f60ddc5/go.mod h1:5av8LiY5jU2KRcrX+SHtvLHnaOpPJ7gzWStBurgHlqY= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= diff --git a/internal/locate/region_cache_test.go b/internal/locate/region_cache_test.go index 6124009c6e..a1cca0cba7 100644 --- a/internal/locate/region_cache_test.go +++ b/internal/locate/region_cache_test.go @@ -55,6 +55,7 @@ import ( "github.com/tikv/client-go/v2/internal/mockstore/mocktikv" "github.com/tikv/client-go/v2/kv" pd "github.com/tikv/pd/client" + uatomic "go.uber.org/atomic" ) func TestRegionCache(t *testing.T) { @@ -88,7 +89,7 @@ func (s *testRegionCacheSuite) SetupTest() { s.bo = retry.NewBackofferWithVars(context.Background(), 5000, nil) } -func (s *testRegionCacheSuite) TearDownTest() { +func (s *testRegionCacheSuite) TearDownSuite() { s.cache.Close() s.mvccStore.Close() } @@ -1835,3 +1836,78 @@ func (s *testRegionRequestToSingleStoreSuite) TestRefreshCacheConcurrency() { cancel() } + +func TestRegionCacheWithDelay(t *testing.T) { + suite.Run(t, new(testRegionCacheWithDelaySuite)) +} + +type testRegionCacheWithDelaySuite struct { + suite.Suite + mvccStore mocktikv.MVCCStore + cluster *mocktikv.Cluster + store uint64 // store1 is leader + region1 uint64 + bo *retry.Backoffer + + delay uatomic.Bool + delayCache *RegionCache + cache *RegionCache +} + +func (s *testRegionCacheWithDelaySuite) SetupTest() { + s.mvccStore = mocktikv.MustNewMVCCStore() + s.cluster = mocktikv.NewCluster(s.mvccStore) + storeIDs, _, regionID, _ := mocktikv.BootstrapWithMultiStores(s.cluster, 1) + s.region1 = regionID + s.store = storeIDs[0] + pdCli := &CodecPDClient{mocktikv.NewPDClient(s.cluster), apicodec.NewCodecV1(apicodec.ModeTxn)} + s.cache = NewRegionCache(pdCli) + pdCli2 := &CodecPDClient{mocktikv.NewPDClient(s.cluster, mocktikv.WithDelay(&s.delay)), apicodec.NewCodecV1(apicodec.ModeTxn)} + s.delayCache = NewRegionCache(pdCli2) + s.bo = retry.NewBackofferWithVars(context.Background(), 5000, nil) +} + +func (s *testRegionCacheWithDelaySuite) TearDownSuite() { + s.cache.Close() + s.delayCache.Close() + s.mvccStore.Close() +} + +func (s *testRegionCacheWithDelaySuite) TestStaleGetRegion() { + r1, err := s.cache.findRegionByKey(s.bo, []byte("a"), false) + s.NoError(err) + r2, err := s.delayCache.findRegionByKey(s.bo, []byte("a"), false) + s.NoError(err) + s.Equal(r1.meta, r2.meta) + + s.delay.Store(true) + var wg sync.WaitGroup + wg.Add(1) + go func() { + r2.invalidate(Other) + _, err = s.delayCache.findRegionByKey(s.bo, []byte("b"), false) + s.NoError(err) + wg.Done() + }() + newPeersIDs := s.cluster.AllocIDs(1) + s.cluster.Split(r1.GetID(), s.cluster.AllocID(), []byte("b"), newPeersIDs, newPeersIDs[0]) + r1.invalidate(Other) + r, err := s.cache.findRegionByKey(s.bo, []byte("b"), false) + s.NoError(err) + s.Equal([]byte("b"), r.meta.StartKey) + r, err = s.cache.findRegionByKey(s.bo, []byte("c"), false) + s.NoError(err) + s.Equal([]byte("b"), r.meta.StartKey) + + s.delay.Store(false) + r, err = s.delayCache.findRegionByKey(s.bo, []byte("b"), false) + s.NoError(err) + s.Equal([]byte("b"), r.meta.StartKey) + wg.Wait() + r, err = s.delayCache.findRegionByKey(s.bo, []byte("b"), false) + s.NoError(err) + s.Equal([]byte("b"), r.meta.StartKey) + r, err = s.delayCache.findRegionByKey(s.bo, []byte("a"), false) + s.NoError(err) + s.Equal([]byte("b"), r.meta.EndKey) +} diff --git a/internal/mockstore/mocktikv/cluster.go b/internal/mockstore/mocktikv/cluster.go index 7b5f1cc3f9..57ab37f52f 100644 --- a/internal/mockstore/mocktikv/cluster.go +++ b/internal/mockstore/mocktikv/cluster.go @@ -37,6 +37,7 @@ package mocktikv import ( "bytes" "context" + "fmt" "math" "sort" "sync" @@ -540,6 +541,9 @@ func (c *Cluster) getEntriesGroupByRegions(mvccStore MVCCStore, start, end MvccK regionEntriesSlice := make([][]Pair, 0, count) quotient := len(pairs) / count remainder := len(pairs) % count + for i := range pairs { + fmt.Println(pairs[i]) + } i := 0 for i < len(pairs) { regionEntryCount := quotient diff --git a/internal/mockstore/mocktikv/pd.go b/internal/mockstore/mocktikv/pd.go index 833b914c0e..b2b01aa41a 100644 --- a/internal/mockstore/mocktikv/pd.go +++ b/internal/mockstore/mocktikv/pd.go @@ -61,6 +61,16 @@ var tsMu = struct { const defaultResourceGroupName = "default" +var _ pd.Client = (*pdClient)(nil) + +type MockPDOption func(*pdClient) + +func WithDelay(delay *atomic.Bool) MockPDOption { + return func(pc *pdClient) { + pc.delay = delay + } +} + type pdClient struct { cluster *Cluster // SafePoint set by `UpdateGCSafePoint`. Not to be confused with SafePointKV. @@ -73,11 +83,13 @@ type pdClient struct { externalTimestamp atomic.Uint64 groups map[string]*rmpb.ResourceGroup + + delay *atomic.Bool } // NewPDClient creates a mock pd.Client that uses local timestamp and meta data // from a Cluster. -func NewPDClient(cluster *Cluster) pd.Client { +func NewPDClient(cluster *Cluster, ops ...MockPDOption) *pdClient { mockCli := &pdClient{ cluster: cluster, serviceSafePoints: make(map[string]uint64), @@ -97,6 +109,9 @@ func NewPDClient(cluster *Cluster) pd.Client { }, Priority: 8, } + for _, op := range ops { + op(mockCli) + } return mockCli } @@ -206,6 +221,12 @@ func (c *pdClient) GetRegion(ctx context.Context, key []byte, opts ...pd.GetRegi if len(opts) == 0 { buckets = nil } + if c.delay != nil && c.delay.Load() { + select { + case <-ctx.Done(): + case <-time.After(200 * time.Millisecond): + } + } return &pd.Region{Meta: region, Leader: peer, Buckets: buckets, DownPeers: downPeers}, nil } From 525bebc0faf9c203803c0896e903372f3dc5ed23 Mon Sep 17 00:00:00 2001 From: Cabinfever_B Date: Fri, 15 Dec 2023 14:24:13 +0800 Subject: [PATCH 05/18] fix test Signed-off-by: Cabinfever_B --- internal/locate/region_cache_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/locate/region_cache_test.go b/internal/locate/region_cache_test.go index a1cca0cba7..5d7fda844d 100644 --- a/internal/locate/region_cache_test.go +++ b/internal/locate/region_cache_test.go @@ -1889,6 +1889,7 @@ func (s *testRegionCacheWithDelaySuite) TestStaleGetRegion() { s.NoError(err) wg.Done() }() + time.Sleep(30 * time.Millisecond) newPeersIDs := s.cluster.AllocIDs(1) s.cluster.Split(r1.GetID(), s.cluster.AllocID(), []byte("b"), newPeersIDs, newPeersIDs[0]) r1.invalidate(Other) From 2ac762355e061d1a3f3abf38c02c5a94e71cdc44 Mon Sep 17 00:00:00 2001 From: Cabinfever_B Date: Fri, 15 Dec 2023 14:31:07 +0800 Subject: [PATCH 06/18] fix test Signed-off-by: Cabinfever_B --- internal/locate/main_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/locate/main_test.go b/internal/locate/main_test.go index e60db8d2eb..e222adac87 100644 --- a/internal/locate/main_test.go +++ b/internal/locate/main_test.go @@ -23,6 +23,7 @@ import ( func TestMain(m *testing.M) { opts := []goleak.Option{ goleak.IgnoreTopFunction("github.com/pingcap/goleveldb/leveldb.(*DB).mpoolDrain"), + goleak.IgnoreTopFunction("github.com/pingcap/goleveldb/leveldb/util.(*BufferPool).drain"), } goleak.VerifyTestMain(m, opts...) From 81f89473ed7128a5d1c6986889131343337152b7 Mon Sep 17 00:00:00 2001 From: Cabinfever_B Date: Fri, 15 Dec 2023 14:35:44 +0800 Subject: [PATCH 07/18] fix test Signed-off-by: Cabinfever_B --- internal/locate/region_cache.go | 2 +- internal/locate/region_cache_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index 2f9e4fbe3d..319a0eaa5d 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -1598,7 +1598,7 @@ func (c *RegionCache) getRegionByIDFromCache(regionID uint64) *Region { } latestRegion, ok := c.mu.regions[ver] if !ok { - // Should not happen. If happned, maybe + // should not happen logutil.BgLogger().Warn("region version not found", zap.Uint64("regionID", regionID), zap.Stringer("version", &ver)) return nil diff --git a/internal/locate/region_cache_test.go b/internal/locate/region_cache_test.go index 5d7fda844d..7e38247857 100644 --- a/internal/locate/region_cache_test.go +++ b/internal/locate/region_cache_test.go @@ -1867,7 +1867,7 @@ func (s *testRegionCacheWithDelaySuite) SetupTest() { s.bo = retry.NewBackofferWithVars(context.Background(), 5000, nil) } -func (s *testRegionCacheWithDelaySuite) TearDownSuite() { +func (s *testRegionCacheWithDelaySuite) TearDownTest() { s.cache.Close() s.delayCache.Close() s.mvccStore.Close() From 9253566654b07bca704f90c5a626d6c0e97626fd Mon Sep 17 00:00:00 2001 From: Cabinfever_B Date: Fri, 15 Dec 2023 14:38:49 +0800 Subject: [PATCH 08/18] fix test Signed-off-by: Cabinfever_B --- internal/locate/main_test.go | 1 - internal/locate/region_cache_test.go | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/internal/locate/main_test.go b/internal/locate/main_test.go index e222adac87..e60db8d2eb 100644 --- a/internal/locate/main_test.go +++ b/internal/locate/main_test.go @@ -23,7 +23,6 @@ import ( func TestMain(m *testing.M) { opts := []goleak.Option{ goleak.IgnoreTopFunction("github.com/pingcap/goleveldb/leveldb.(*DB).mpoolDrain"), - goleak.IgnoreTopFunction("github.com/pingcap/goleveldb/leveldb/util.(*BufferPool).drain"), } goleak.VerifyTestMain(m, opts...) diff --git a/internal/locate/region_cache_test.go b/internal/locate/region_cache_test.go index 7e38247857..058619bdad 100644 --- a/internal/locate/region_cache_test.go +++ b/internal/locate/region_cache_test.go @@ -89,7 +89,7 @@ func (s *testRegionCacheSuite) SetupTest() { s.bo = retry.NewBackofferWithVars(context.Background(), 5000, nil) } -func (s *testRegionCacheSuite) TearDownSuite() { +func (s *testRegionCacheSuite) TearDownTest() { s.cache.Close() s.mvccStore.Close() } From 4b78c6415718b6e88635627ce44faddc7243a99f Mon Sep 17 00:00:00 2001 From: Cabinfever_B Date: Fri, 15 Dec 2023 14:43:56 +0800 Subject: [PATCH 09/18] fix test Signed-off-by: Cabinfever_B --- internal/locate/region_cache_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/locate/region_cache_test.go b/internal/locate/region_cache_test.go index 058619bdad..43cbf128d9 100644 --- a/internal/locate/region_cache_test.go +++ b/internal/locate/region_cache_test.go @@ -1885,7 +1885,7 @@ func (s *testRegionCacheWithDelaySuite) TestStaleGetRegion() { wg.Add(1) go func() { r2.invalidate(Other) - _, err = s.delayCache.findRegionByKey(s.bo, []byte("b"), false) + _, err := s.delayCache.findRegionByKey(s.bo, []byte("b"), false) s.NoError(err) wg.Done() }() From 0817ad5e2695e9ee1ff029068ed4d11d84d65450 Mon Sep 17 00:00:00 2001 From: Cabinfever_B Date: Fri, 15 Dec 2023 16:14:36 +0800 Subject: [PATCH 10/18] check epoch Signed-off-by: Cabinfever_B --- internal/locate/region_cache.go | 46 ++++++++--- internal/locate/region_cache_test.go | 105 +++++++++++++++++++++++++ internal/mockstore/mocktikv/cluster.go | 14 +++- internal/mockstore/mocktikv/pd.go | 23 +++++- 4 files changed, 177 insertions(+), 11 deletions(-) diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index 7305fe8203..826602336e 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -552,8 +552,8 @@ func (c *RegionCache) clear() { } // thread unsafe, should use with lock -func (c *RegionCache) insertRegionToCache(cachedRegion *Region, invalidateOldRegion bool) { - c.mu.insertRegionToCache(cachedRegion, invalidateOldRegion) +func (c *RegionCache) insertRegionToCache(cachedRegion *Region, invalidateOldRegion bool) bool { + return c.mu.insertRegionToCache(cachedRegion, invalidateOldRegion) } // Close releases region cache's resource. @@ -1485,7 +1485,32 @@ func (mu *regionIndexMu) removeVersionFromCache(oldVer RegionVerID, regionID uin // It should be protected by c.mu.l.Lock(). // if `invalidateOldRegion` is false, the old region cache should be still valid, // and it may still be used by some kv requests. -func (mu *regionIndexMu) insertRegionToCache(cachedRegion *Region, invalidateOldRegion bool) { +// Moreover, it will return whether the region is stale. +func (mu *regionIndexMu) insertRegionToCache(cachedRegion *Region, invalidateOldRegion bool) bool { + newVer := cachedRegion.VerID() + latest, ok := mu.latestVersions[cachedRegion.VerID().id] + // There are two or more situations in which the region we got is stale. + // The first case is that the process of getting a region is concurrent. + // The stale region may be returned later due to network reasons. + // The second case is that the region may be obtained from the PD follower, + // and there is the synchronization time between the pd follower and the leader. + // So we should check the epoch. + if ok && (latest.GetVer() > newVer.GetVer() || latest.GetConfVer() > newVer.GetConfVer()) { + logutil.BgLogger().Debug("get stale region", + zap.Uint64("region", newVer.GetID()), zap.Uint64("ver", newVer.GetVer()), zap.Uint64("conf", newVer.GetConfVer()), + zap.Uint64("lastest-ver", latest.GetVer()), zap.Uint64("lastest-conf", latest.GetConfVer())) + return true + } + // Also check the intersecting regions. + intersectedRegions := mu.sorted.removeIntersecting(cachedRegion) + for _, region := range intersectedRegions { + if region.cachedRegion.meta.GetRegionEpoch().GetVersion() > newVer.GetVer() { + logutil.BgLogger().Debug("get stale region", + zap.Uint64("region", newVer.GetID()), zap.Uint64("ver", newVer.GetVer()), zap.Uint64("conf", newVer.GetConfVer()), + zap.Uint64("intersecting-ver", region.cachedRegion.meta.GetRegionEpoch().GetVersion())) + return true + } + } oldRegion := mu.sorted.ReplaceOrInsert(cachedRegion) if oldRegion != nil { store := cachedRegion.getStore() @@ -1513,21 +1538,24 @@ func (mu *regionIndexMu) insertRegionToCache(cachedRegion *Region, invalidateOld if store.buckets == nil || (oldRegionStore.buckets != nil && store.buckets.GetVersion() < oldRegionStore.buckets.GetVersion()) { store.buckets = oldRegionStore.buckets } - mu.removeVersionFromCache(oldRegion.VerID(), cachedRegion.VerID().id) + // Only delete when IDs are different, because we will update right away. + if cachedRegion.VerID().id != oldRegion.VerID().id { + mu.removeVersionFromCache(oldRegion.VerID(), cachedRegion.VerID().id) + } } + // update related vars. mu.regions[cachedRegion.VerID()] = cachedRegion - newVer := cachedRegion.VerID() - latest, ok := mu.latestVersions[cachedRegion.VerID().id] if !ok || latest.GetVer() < newVer.GetVer() || latest.GetConfVer() < newVer.GetConfVer() { mu.latestVersions[cachedRegion.VerID().id] = newVer } // The intersecting regions in the cache are probably stale, clear them. - deleted := mu.sorted.removeIntersecting(cachedRegion) - for _, region := range deleted { + for _, region := range intersectedRegions { mu.removeVersionFromCache(region.cachedRegion.VerID(), region.cachedRegion.GetID()) } + return false +} -} // searchCachedRegion finds a region from cache by key. Like `getCachedRegion`, +// searchCachedRegion finds a region from cache by key. Like `getCachedRegion`, // it should be called with c.mu.RLock(), and the returned Region should not be // used after c.mu is RUnlock(). // If the given key is the end key of the region that you want, you may set the second argument to true. This is useful diff --git a/internal/locate/region_cache_test.go b/internal/locate/region_cache_test.go index f583dc7cfb..9ac665c814 100644 --- a/internal/locate/region_cache_test.go +++ b/internal/locate/region_cache_test.go @@ -55,6 +55,7 @@ import ( "github.com/tikv/client-go/v2/internal/mockstore/mocktikv" "github.com/tikv/client-go/v2/kv" pd "github.com/tikv/pd/client" + uatomic "go.uber.org/atomic" ) func TestRegionCache(t *testing.T) { @@ -1567,6 +1568,8 @@ func (s *testRegionCacheSuite) TestBuckets() { // update buckets if it's nil. cachedRegion.getStore().buckets = nil + // we should replace the version of `cacheRegion` because of stale. + s.cluster.PutRegion(r.GetId(), newMeta.RegionEpoch.ConfVer, newMeta.RegionEpoch.Version, []uint64{s.store1, s.store2}, []uint64{s.peer1, s.peer2}, s.peer1) s.cluster.SplitRegionBuckets(cachedRegion.GetID(), defaultBuckets.Keys, defaultBuckets.Version) s.cache.UpdateBucketsIfNeeded(cachedRegion.VerID(), defaultBuckets.GetVersion()) waitUpdateBuckets(defaultBuckets, []byte("a")) @@ -1833,3 +1836,105 @@ func (s *testRegionRequestToSingleStoreSuite) TestRefreshCacheConcurrency() { cancel() } + +func TestRegionCacheWithDelay(t *testing.T) { + suite.Run(t, new(testRegionCacheWithDelaySuite)) +} + +type testRegionCacheWithDelaySuite struct { + suite.Suite + mvccStore mocktikv.MVCCStore + cluster *mocktikv.Cluster + store uint64 // store1 is leader + region1 uint64 + bo *retry.Backoffer + + delay uatomic.Bool + delayCache *RegionCache + cache *RegionCache +} + +func (s *testRegionCacheWithDelaySuite) SetupTest() { + s.mvccStore = mocktikv.MustNewMVCCStore() + s.cluster = mocktikv.NewCluster(s.mvccStore) + storeIDs, _, regionID, _ := mocktikv.BootstrapWithMultiStores(s.cluster, 1) + s.region1 = regionID + s.store = storeIDs[0] + pdCli := &CodecPDClient{mocktikv.NewPDClient(s.cluster), apicodec.NewCodecV1(apicodec.ModeTxn)} + s.cache = NewRegionCache(pdCli) + pdCli2 := &CodecPDClient{mocktikv.NewPDClient(s.cluster, mocktikv.WithDelay(&s.delay)), apicodec.NewCodecV1(apicodec.ModeTxn)} + s.delayCache = NewRegionCache(pdCli2) + s.bo = retry.NewBackofferWithVars(context.Background(), 5000, nil) +} + +func (s *testRegionCacheWithDelaySuite) TearDownTest() { + s.cache.Close() + s.delayCache.Close() + s.mvccStore.Close() +} + +func (s *testRegionCacheWithDelaySuite) TestInsertStaleRegion() { + r, err := s.cache.findRegionByKey(s.bo, []byte("a"), false) + s.NoError(err) + fakeRegion := &Region{ + meta: r.meta, + syncFlag: r.syncFlag, + lastAccess: r.lastAccess, + invalidReason: r.invalidReason, + } + fakeRegion.setStore(r.getStore().clone()) + + newPeersIDs := s.cluster.AllocIDs(1) + s.cluster.Split(r.GetID(), s.cluster.AllocID(), []byte("b"), newPeersIDs, newPeersIDs[0]) + r.invalidate(Other) + r2, err := s.cache.findRegionByKey(s.bo, []byte("a"), false) + s.NoError(err) + s.Equal([]byte("b"), r2.EndKey()) + stale := s.cache.insertRegionToCache(fakeRegion, true) + s.True(stale) + r3, err := s.cache.findRegionByKey(s.bo, []byte("a"), false) + s.NoError(err) + s.Equal([]byte("b"), r3.EndKey()) +} + +func (s *testRegionCacheWithDelaySuite) TestStaleGetRegion() { + r1, err := s.cache.findRegionByKey(s.bo, []byte("a"), false) + s.NoError(err) + r2, err := s.delayCache.findRegionByKey(s.bo, []byte("a"), false) + s.NoError(err) + s.Equal(r1.meta, r2.meta) + + // simulates network delay + s.delay.Store(true) + var wg sync.WaitGroup + wg.Add(1) + go func() { + r2.invalidate(Other) + _, err := s.delayCache.findRegionByKey(s.bo, []byte("b"), false) + s.NoError(err) + wg.Done() + }() + time.Sleep(30 * time.Millisecond) + newPeersIDs := s.cluster.AllocIDs(1) + s.cluster.Split(r1.GetID(), s.cluster.AllocID(), []byte("b"), newPeersIDs, newPeersIDs[0]) + r1.invalidate(Other) + r, err := s.cache.findRegionByKey(s.bo, []byte("b"), false) + s.NoError(err) + s.Equal([]byte("b"), r.meta.StartKey) + r, err = s.cache.findRegionByKey(s.bo, []byte("c"), false) + s.NoError(err) + s.Equal([]byte("b"), r.meta.StartKey) + + s.delay.Store(false) + r, err = s.delayCache.findRegionByKey(s.bo, []byte("b"), false) + s.NoError(err) + s.Equal([]byte("b"), r.meta.StartKey) + wg.Wait() + // the delay response is recived, but insert failed. + r, err = s.delayCache.findRegionByKey(s.bo, []byte("b"), false) + s.NoError(err) + s.Equal([]byte("b"), r.meta.StartKey) + r, err = s.delayCache.findRegionByKey(s.bo, []byte("a"), false) + s.NoError(err) + s.Equal([]byte("b"), r.meta.EndKey) +} diff --git a/internal/mockstore/mocktikv/cluster.go b/internal/mockstore/mocktikv/cluster.go index e46b02d3da..7b5f1cc3f9 100644 --- a/internal/mockstore/mocktikv/cluster.go +++ b/internal/mockstore/mocktikv/cluster.go @@ -413,6 +413,14 @@ func (c *Cluster) Bootstrap(regionID uint64, storeIDs, peerIDs []uint64, leaderP c.regions[regionID] = newRegion(regionID, storeIDs, peerIDs, leaderPeerID) } +// PutRegion adds or replaces a region. +func (c *Cluster) PutRegion(regionID, confVer, ver uint64, storeIDs, peerIDs []uint64, leaderPeerID uint64) { + c.Lock() + defer c.Unlock() + + c.regions[regionID] = newRegion(regionID, storeIDs, peerIDs, leaderPeerID, confVer, ver) +} + // AddPeer adds a new Peer for the Region on the Store. func (c *Cluster) AddPeer(regionID, storeID, peerID uint64) { c.Lock() @@ -634,7 +642,7 @@ func newPeerMeta(peerID, storeID uint64) *metapb.Peer { } } -func newRegion(regionID uint64, storeIDs, peerIDs []uint64, leaderPeerID uint64) *Region { +func newRegion(regionID uint64, storeIDs, peerIDs []uint64, leaderPeerID uint64, epoch ...uint64) *Region { if len(storeIDs) != len(peerIDs) { panic("len(storeIDs) != len(peerIds)") } @@ -647,6 +655,10 @@ func newRegion(regionID uint64, storeIDs, peerIDs []uint64, leaderPeerID uint64) Peers: peers, RegionEpoch: &metapb.RegionEpoch{}, } + if len(epoch) == 2 { + meta.RegionEpoch.ConfVer = epoch[0] + meta.RegionEpoch.Version = epoch[1] + } return &Region{ Meta: meta, leader: leaderPeerID, diff --git a/internal/mockstore/mocktikv/pd.go b/internal/mockstore/mocktikv/pd.go index 833b914c0e..b2b01aa41a 100644 --- a/internal/mockstore/mocktikv/pd.go +++ b/internal/mockstore/mocktikv/pd.go @@ -61,6 +61,16 @@ var tsMu = struct { const defaultResourceGroupName = "default" +var _ pd.Client = (*pdClient)(nil) + +type MockPDOption func(*pdClient) + +func WithDelay(delay *atomic.Bool) MockPDOption { + return func(pc *pdClient) { + pc.delay = delay + } +} + type pdClient struct { cluster *Cluster // SafePoint set by `UpdateGCSafePoint`. Not to be confused with SafePointKV. @@ -73,11 +83,13 @@ type pdClient struct { externalTimestamp atomic.Uint64 groups map[string]*rmpb.ResourceGroup + + delay *atomic.Bool } // NewPDClient creates a mock pd.Client that uses local timestamp and meta data // from a Cluster. -func NewPDClient(cluster *Cluster) pd.Client { +func NewPDClient(cluster *Cluster, ops ...MockPDOption) *pdClient { mockCli := &pdClient{ cluster: cluster, serviceSafePoints: make(map[string]uint64), @@ -97,6 +109,9 @@ func NewPDClient(cluster *Cluster) pd.Client { }, Priority: 8, } + for _, op := range ops { + op(mockCli) + } return mockCli } @@ -206,6 +221,12 @@ func (c *pdClient) GetRegion(ctx context.Context, key []byte, opts ...pd.GetRegi if len(opts) == 0 { buckets = nil } + if c.delay != nil && c.delay.Load() { + select { + case <-ctx.Done(): + case <-time.After(200 * time.Millisecond): + } + } return &pd.Region{Meta: region, Leader: peer, Buckets: buckets, DownPeers: downPeers}, nil } From 0d9bee8d9fc80f421530f9cc73d8cb81914d52f6 Mon Sep 17 00:00:00 2001 From: Cabinfever_B Date: Fri, 15 Dec 2023 16:47:22 +0800 Subject: [PATCH 11/18] add benchmark test Signed-off-by: Cabinfever_B --- internal/locate/region_cache.go | 12 ++++++++ internal/locate/region_cache_test.go | 46 +++++++++++++++++++++++++++- 2 files changed, 57 insertions(+), 1 deletion(-) diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index 826602336e..c046c8c35d 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -543,6 +543,18 @@ func NewRegionCache(pdClient pd.Client) *RegionCache { return c } +// only used fot test. +func newTestRegionCache() *RegionCache { + c := &RegionCache{} + c.storeMu.stores = make(map[uint64]*Store) + c.tiflashComputeStoreMu.needReload = true + c.tiflashComputeStoreMu.stores = make([]*Store, 0) + c.notifyCheckCh = make(chan struct{}, 1) + c.ctx, c.cancelFunc = context.WithCancel(context.Background()) + c.mu = *newRegionIndexMu(nil) + return c +} + // clear clears all cached data in the RegionCache. It's only used in tests. func (c *RegionCache) clear() { c.mu = *newRegionIndexMu(nil) diff --git a/internal/locate/region_cache_test.go b/internal/locate/region_cache_test.go index 9ac665c814..c2bf4caf14 100644 --- a/internal/locate/region_cache_test.go +++ b/internal/locate/region_cache_test.go @@ -1930,7 +1930,7 @@ func (s *testRegionCacheWithDelaySuite) TestStaleGetRegion() { s.NoError(err) s.Equal([]byte("b"), r.meta.StartKey) wg.Wait() - // the delay response is recived, but insert failed. + // the delay response is received, but insert failed. r, err = s.delayCache.findRegionByKey(s.bo, []byte("b"), false) s.NoError(err) s.Equal([]byte("b"), r.meta.StartKey) @@ -1938,3 +1938,47 @@ func (s *testRegionCacheWithDelaySuite) TestStaleGetRegion() { s.NoError(err) s.Equal([]byte("b"), r.meta.EndKey) } + +func generateKeyForSimulator(id int, keyLen int) []byte { + k := make([]byte, keyLen) + copy(k, fmt.Sprintf("%010d", id)) + return k +} + +func BenchmarkInsertRegionToCache(b *testing.B) { + b.StopTimer() + cache := newTestRegionCache() + r := &Region{ + meta: &metapb.Region{ + Id: 1, + RegionEpoch: &metapb.RegionEpoch{}, + }, + } + rs := ®ionStore{ + workTiKVIdx: 0, + proxyTiKVIdx: -1, + stores: make([]*Store, 0, len(r.meta.Peers)), + pendingTiFlashPeerStores: map[uint64]uint64{}, + storeEpochs: make([]uint32, 0, len(r.meta.Peers)), + } + r.setStore(rs) + b.StartTimer() + for i := 0; i < b.N; i++ { + newMeta := proto.Clone(r.meta).(*metapb.Region) + newMeta.Id = uint64(i + 1) + newMeta.RegionEpoch.ConfVer = uint64(i+1) - uint64(rand.Intn(i+1)) + newMeta.RegionEpoch.Version = uint64(i+1) - uint64(rand.Intn(i+1)) + if i%2 == 0 { + newMeta.StartKey = generateKeyForSimulator(rand.Intn(i+1), 56) + newMeta.EndKey = []byte("") + } else { + newMeta.EndKey = generateKeyForSimulator(rand.Intn(i+1), 56) + newMeta.StartKey = []byte("") + } + region := &Region{ + meta: newMeta, + } + region.setStore(r.getStore()) + cache.insertRegionToCache(region, true) + } +} From cc64a5bc2775206a8bb432f7cc4be5d04b04c062 Mon Sep 17 00:00:00 2001 From: Cabinfever_B Date: Fri, 15 Dec 2023 17:30:07 +0800 Subject: [PATCH 12/18] add benchmark test2 Signed-off-by: Cabinfever_B --- internal/locate/region_cache_test.go | 30 ++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/internal/locate/region_cache_test.go b/internal/locate/region_cache_test.go index c2bf4caf14..eda8dd31b2 100644 --- a/internal/locate/region_cache_test.go +++ b/internal/locate/region_cache_test.go @@ -1982,3 +1982,33 @@ func BenchmarkInsertRegionToCache(b *testing.B) { cache.insertRegionToCache(region, true) } } + +func BenchmarkInsertRegionToCache2(b *testing.B) { + b.StopTimer() + cache := newTestRegionCache() + r := &Region{ + meta: &metapb.Region{ + Id: 1, + RegionEpoch: &metapb.RegionEpoch{}, + }, + } + rs := ®ionStore{ + workTiKVIdx: 0, + proxyTiKVIdx: -1, + stores: make([]*Store, 0, len(r.meta.Peers)), + pendingTiFlashPeerStores: map[uint64]uint64{}, + storeEpochs: make([]uint32, 0, len(r.meta.Peers)), + } + r.setStore(rs) + b.StartTimer() + for i := 0; i < b.N; i++ { + newMeta := proto.Clone(r.meta).(*metapb.Region) + newMeta.RegionEpoch.ConfVer = uint64(i + 1) + newMeta.RegionEpoch.Version = uint64(i + 1) + region := &Region{ + meta: newMeta, + } + region.setStore(r.getStore()) + cache.insertRegionToCache(region, true) + } +} From 3aae7a4b39ed440bea118ede1f46aa97215334ac Mon Sep 17 00:00:00 2001 From: Cabinfever_B Date: Mon, 18 Dec 2023 09:38:39 +0800 Subject: [PATCH 13/18] address comment Signed-off-by: Cabinfever_B --- internal/locate/region_cache.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index 319a0eaa5d..dcb3873cba 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -1553,13 +1553,10 @@ func (mu *regionIndexMu) insertRegionToCache(cachedRegion *Region, invalidateOld // Only delete when IDs are different, because we will update right away. if cachedRegion.VerID().id != oldRegion.VerID().id { mu.removeVersionFromCache(oldRegion.VerID(), cachedRegion.VerID().id) - mu.removeVersionFromCache(oldRegion.VerID(), cachedRegion.VerID().id) - mu.removeVersionFromCache(oldRegion.VerID(), cachedRegion.VerID().id) } } // update related vars. mu.regions[cachedRegion.VerID()] = cachedRegion - latest, ok = mu.latestVersions[cachedRegion.VerID().id] if !ok || latest.GetVer() < newVer.GetVer() || latest.GetConfVer() < newVer.GetConfVer() { mu.latestVersions[cachedRegion.VerID().id] = newVer } From 6ab7dc16b117b75d63f89c84655601bbbb9e7c67 Mon Sep 17 00:00:00 2001 From: Cabinfever_B Date: Mon, 18 Dec 2023 11:37:06 +0800 Subject: [PATCH 14/18] add test Signed-off-by: Cabinfever_B --- internal/locate/region_cache_test.go | 34 ++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/internal/locate/region_cache_test.go b/internal/locate/region_cache_test.go index eda8dd31b2..a3efd57e16 100644 --- a/internal/locate/region_cache_test.go +++ b/internal/locate/region_cache_test.go @@ -1939,6 +1939,40 @@ func (s *testRegionCacheWithDelaySuite) TestStaleGetRegion() { s.Equal([]byte("b"), r.meta.EndKey) } +func (s *testRegionCacheWithDelaySuite) TestFollowerGetStaleRegion() { + var delay uatomic.Bool + pdCli3 := &CodecPDClient{mocktikv.NewPDClient(s.cluster, mocktikv.WithDelay(&delay)), apicodec.NewCodecV1(apicodec.ModeTxn)} + followerDelayCache := NewRegionCache(pdCli3) + + delay.Store(true) + var wg sync.WaitGroup + wg.Add(1) + var final *Region + go func() { + var err error + // followerDelayCache is empty now, so it will go follower. + final, err = followerDelayCache.findRegionByKey(s.bo, []byte("z"), false) + s.NoError(err) + wg.Done() + }() + time.Sleep(30 * time.Millisecond) + delay.Store(false) + r, err := followerDelayCache.findRegionByKey(s.bo, []byte("y"), false) + s.NoError(err) + newPeersIDs := s.cluster.AllocIDs(1) + s.cluster.Split(r.GetID(), s.cluster.AllocID(), []byte("z"), newPeersIDs, newPeersIDs[0]) + r.invalidate(Other) + r, err = followerDelayCache.findRegionByKey(s.bo, []byte("y"), false) + s.NoError(err) + s.Equal([]byte("z"), r.meta.EndKey) + + // no need to retry because + wg.Wait() + s.Equal([]byte("z"), final.meta.StartKey) + + followerDelayCache.Close() +} + func generateKeyForSimulator(id int, keyLen int) []byte { k := make([]byte, keyLen) copy(k, fmt.Sprintf("%010d", id)) From bd595a4bcd507bf56a78a2eb32ae90b9c9a0de50 Mon Sep 17 00:00:00 2001 From: Cabinfever_B Date: Mon, 18 Dec 2023 13:49:05 +0800 Subject: [PATCH 15/18] fix check Signed-off-by: Cabinfever_B --- internal/locate/region_cache.go | 11 +++-------- internal/locate/region_cache_test.go | 17 +++++++++++++++-- internal/locate/sorted_btree.go | 17 +++++++++++++++-- 3 files changed, 33 insertions(+), 12 deletions(-) diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index c046c8c35d..ede0941d0f 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -1514,14 +1514,9 @@ func (mu *regionIndexMu) insertRegionToCache(cachedRegion *Region, invalidateOld return true } // Also check the intersecting regions. - intersectedRegions := mu.sorted.removeIntersecting(cachedRegion) - for _, region := range intersectedRegions { - if region.cachedRegion.meta.GetRegionEpoch().GetVersion() > newVer.GetVer() { - logutil.BgLogger().Debug("get stale region", - zap.Uint64("region", newVer.GetID()), zap.Uint64("ver", newVer.GetVer()), zap.Uint64("conf", newVer.GetConfVer()), - zap.Uint64("intersecting-ver", region.cachedRegion.meta.GetRegionEpoch().GetVersion())) - return true - } + intersectedRegions, stale := mu.sorted.removeIntersecting(cachedRegion, &newVer) + if stale { + return true } oldRegion := mu.sorted.ReplaceOrInsert(cachedRegion) if oldRegion != nil { diff --git a/internal/locate/region_cache_test.go b/internal/locate/region_cache_test.go index eda8dd31b2..693eba8f31 100644 --- a/internal/locate/region_cache_test.go +++ b/internal/locate/region_cache_test.go @@ -1885,13 +1885,26 @@ func (s *testRegionCacheWithDelaySuite) TestInsertStaleRegion() { fakeRegion.setStore(r.getStore().clone()) newPeersIDs := s.cluster.AllocIDs(1) + s.cluster.Split(r.GetID(), s.cluster.AllocID(), []byte("c"), newPeersIDs, newPeersIDs[0]) + newPeersIDs = s.cluster.AllocIDs(1) s.cluster.Split(r.GetID(), s.cluster.AllocID(), []byte("b"), newPeersIDs, newPeersIDs[0]) + r.invalidate(Other) - r2, err := s.cache.findRegionByKey(s.bo, []byte("a"), false) + r2, err := s.cache.findRegionByKey(s.bo, []byte("c"), false) + s.NoError(err) + s.Equal([]byte("c"), r2.StartKey()) + r2, err = s.cache.findRegionByKey(s.bo, []byte("b"), false) s.NoError(err) - s.Equal([]byte("b"), r2.EndKey()) + s.Equal([]byte("b"), r2.StartKey()) + stale := s.cache.insertRegionToCache(fakeRegion, true) s.True(stale) + + rs, err := s.cache.scanRegionsFromCache(s.bo, []byte(""), []byte(""), 100) + s.NoError(err) + s.Greater(len(rs), 1) + s.NotEqual(rs[0].EndKey(), "") + r3, err := s.cache.findRegionByKey(s.bo, []byte("a"), false) s.NoError(err) s.Equal([]byte("b"), r3.EndKey()) diff --git a/internal/locate/sorted_btree.go b/internal/locate/sorted_btree.go index 6165f7614e..0f28f5e7f5 100644 --- a/internal/locate/sorted_btree.go +++ b/internal/locate/sorted_btree.go @@ -38,6 +38,8 @@ import ( "bytes" "github.com/google/btree" + "github.com/tikv/client-go/v2/internal/logutil" + "go.uber.org/zap" ) // SortedRegions is a sorted btree. @@ -93,23 +95,34 @@ func (s *SortedRegions) AscendGreaterOrEqual(startKey, endKey []byte, limit int) // removeIntersecting removes all items that have intersection with the key range of given region. // If the region itself is in the cache, it's not removed. -func (s *SortedRegions) removeIntersecting(r *Region) []*btreeItem { +func (s *SortedRegions) removeIntersecting(r *Region, verID *RegionVerID) ([]*btreeItem, bool) { var deleted []*btreeItem + var stale bool s.b.AscendGreaterOrEqual(newBtreeSearchItem(r.StartKey()), func(item *btreeItem) bool { // Skip the item that is equal to the given region. if item.cachedRegion.VerID() == r.VerID() { return true } + if item.cachedRegion.meta.GetRegionEpoch().GetVersion() > verID.ver { + logutil.BgLogger().Debug("get stale region", + zap.Uint64("region", verID.GetID()), zap.Uint64("ver", verID.GetVer()), zap.Uint64("conf", verID.GetConfVer()), + zap.Uint64("intersecting-ver", item.cachedRegion.meta.GetRegionEpoch().GetVersion())) + stale = true + return false + } if len(r.EndKey()) > 0 && bytes.Compare(item.cachedRegion.StartKey(), r.EndKey()) >= 0 { return false } deleted = append(deleted, item) return true }) + if stale { + return nil, true + } for _, item := range deleted { s.b.Delete(item) } - return deleted + return deleted, false } // Clear removes all items from the btree. From 2ac4bc4932f27e9edae64842961fd0516c829b9f Mon Sep 17 00:00:00 2001 From: Cabinfever_B Date: Mon, 18 Dec 2023 15:15:57 +0800 Subject: [PATCH 16/18] address comment Signed-off-by: Cabinfever_B --- internal/locate/region_cache.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index ede0941d0f..799c5c5a17 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -1500,17 +1500,17 @@ func (mu *regionIndexMu) removeVersionFromCache(oldVer RegionVerID, regionID uin // Moreover, it will return whether the region is stale. func (mu *regionIndexMu) insertRegionToCache(cachedRegion *Region, invalidateOldRegion bool) bool { newVer := cachedRegion.VerID() - latest, ok := mu.latestVersions[cachedRegion.VerID().id] + oldVer, ok := mu.latestVersions[cachedRegion.VerID().id] // There are two or more situations in which the region we got is stale. // The first case is that the process of getting a region is concurrent. // The stale region may be returned later due to network reasons. // The second case is that the region may be obtained from the PD follower, // and there is the synchronization time between the pd follower and the leader. // So we should check the epoch. - if ok && (latest.GetVer() > newVer.GetVer() || latest.GetConfVer() > newVer.GetConfVer()) { + if ok && (oldVer.GetVer() > newVer.GetVer() || oldVer.GetConfVer() > newVer.GetConfVer()) { logutil.BgLogger().Debug("get stale region", - zap.Uint64("region", newVer.GetID()), zap.Uint64("ver", newVer.GetVer()), zap.Uint64("conf", newVer.GetConfVer()), - zap.Uint64("lastest-ver", latest.GetVer()), zap.Uint64("lastest-conf", latest.GetConfVer())) + zap.Uint64("region", newVer.GetID()), zap.Uint64("new-ver", newVer.GetVer()), zap.Uint64("new-conf", newVer.GetConfVer()), + zap.Uint64("old-ver", oldVer.GetVer()), zap.Uint64("old-conf", oldVer.GetConfVer())) return true } // Also check the intersecting regions. @@ -1552,7 +1552,7 @@ func (mu *regionIndexMu) insertRegionToCache(cachedRegion *Region, invalidateOld } // update related vars. mu.regions[cachedRegion.VerID()] = cachedRegion - if !ok || latest.GetVer() < newVer.GetVer() || latest.GetConfVer() < newVer.GetConfVer() { + if !ok || oldVer.GetVer() < newVer.GetVer() || oldVer.GetConfVer() < newVer.GetConfVer() { mu.latestVersions[cachedRegion.VerID().id] = newVer } // The intersecting regions in the cache are probably stale, clear them. From 2f9c60f4c8961e5c840c314203bdee19e32df696 Mon Sep 17 00:00:00 2001 From: Cabinfever_B Date: Wed, 20 Dec 2023 14:17:37 +0800 Subject: [PATCH 17/18] merge master Signed-off-by: Cabinfever_B --- internal/locate/sorted_btree.go | 7 ------- 1 file changed, 7 deletions(-) diff --git a/internal/locate/sorted_btree.go b/internal/locate/sorted_btree.go index 5d876165ea..f718aba3d8 100644 --- a/internal/locate/sorted_btree.go +++ b/internal/locate/sorted_btree.go @@ -99,13 +99,6 @@ func (s *SortedRegions) removeIntersecting(r *Region, verID RegionVerID) ([]*btr var deleted []*btreeItem var stale bool s.b.AscendGreaterOrEqual(newBtreeSearchItem(r.StartKey()), func(item *btreeItem) bool { - if item.cachedRegion.meta.GetRegionEpoch().GetVersion() > verID.ver { - logutil.BgLogger().Debug("get stale region", - zap.Uint64("region", verID.GetID()), zap.Uint64("ver", verID.GetVer()), zap.Uint64("conf", verID.GetConfVer()), - zap.Uint64("intersecting-ver", item.cachedRegion.meta.GetRegionEpoch().GetVersion())) - stale = true - return false - } if item.cachedRegion.meta.GetRegionEpoch().GetVersion() > verID.ver { logutil.BgLogger().Debug("get stale region", zap.Uint64("region", verID.GetID()), zap.Uint64("ver", verID.GetVer()), zap.Uint64("conf", verID.GetConfVer()), From 1ce68314cd1790156d9fc18e5f621bc5dbfc47a0 Mon Sep 17 00:00:00 2001 From: Cabinfever_B Date: Wed, 20 Dec 2023 14:18:12 +0800 Subject: [PATCH 18/18] merge master Signed-off-by: Cabinfever_B --- internal/mockstore/mocktikv/cluster.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/internal/mockstore/mocktikv/cluster.go b/internal/mockstore/mocktikv/cluster.go index 57ab37f52f..7b5f1cc3f9 100644 --- a/internal/mockstore/mocktikv/cluster.go +++ b/internal/mockstore/mocktikv/cluster.go @@ -37,7 +37,6 @@ package mocktikv import ( "bytes" "context" - "fmt" "math" "sort" "sync" @@ -541,9 +540,6 @@ func (c *Cluster) getEntriesGroupByRegions(mvccStore MVCCStore, start, end MvccK regionEntriesSlice := make([][]Pair, 0, count) quotient := len(pairs) / count remainder := len(pairs) % count - for i := range pairs { - fmt.Println(pairs[i]) - } i := 0 for i < len(pairs) { regionEntryCount := quotient