Skip to content

Commit

Permalink
Support to integrate with the PD HTTP client (#1049)
Browse files Browse the repository at this point in the history
Signed-off-by: JmPotato <[email protected]>
Co-authored-by: disksing <[email protected]>
  • Loading branch information
JmPotato and disksing authored Nov 9, 2023
1 parent ea379b0 commit 845e3b0
Show file tree
Hide file tree
Showing 9 changed files with 92 additions and 409 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ module github.com/tikv/client-go/v2

go 1.21

replace github.com/tikv/pd/client v0.0.0-20230724080549-de985b8e0afc => github.com/JmPotato/pd/client v0.0.0-20231107024609-216b5b763864

require (
github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2
Expand Down
5 changes: 3 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/JmPotato/pd/client v0.0.0-20231107024609-216b5b763864 h1:q7k2boDgGSdnX/gXaIhk59V33J6GcxhpI53eoyVvMM0=
github.com/JmPotato/pd/client v0.0.0-20231107024609-216b5b763864/go.mod h1:cd6zBqRM9aogxf26K8NnFRPVtq9BnRE59tKEpX8IaWQ=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
Expand Down Expand Up @@ -111,8 +114,6 @@ github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a h1:J/YdBZ46WKpXsxsW93SG+q0F8KI+yFrcIDT4c/RNoc4=
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM=
github.com/tikv/pd/client v0.0.0-20230724080549-de985b8e0afc h1:IUg0j2nWoGYj3FQ3vA3vg97fPSpJEZQrDpgF8RkMLEU=
github.com/tikv/pd/client v0.0.0-20230724080549-de985b8e0afc/go.mod h1:wfHRc4iYaqJiOQZCHcrF+r4hYnkGDaYWDfcicee//pc=
github.com/twmb/murmur3 v1.1.3 h1:D83U0XYKcHRYwYIpBKf3Pks91Z0Byda/9SJ8B6EMRcA=
github.com/twmb/murmur3 v1.1.3/go.mod h1:Qq/R7NUyOfr65zD+6Q5IHKsJLwP7exErjN6lyyq3OSQ=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
Expand Down
2 changes: 2 additions & 0 deletions integration_tests/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ module integration_tests

go 1.21

replace github.com/tikv/pd/client v0.0.0-20230912103610-2f57a9f050eb => github.com/JmPotato/pd/client v0.0.0-20231107024609-216b5b763864

require (
github.com/ninedraft/israce v0.0.3
github.com/pingcap/errors v0.11.5-0.20221009092201-b66cddb77c32
Expand Down
4 changes: 2 additions & 2 deletions integration_tests/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ github.com/DataDog/zstd v1.4.5 h1:EndNeuB0l9syBZhut0wns3gV1hL8zX8LIu6ZiVHWLIQ=
github.com/DataDog/zstd v1.4.5/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=
github.com/HdrHistogram/hdrhistogram-go v1.1.2 h1:5IcZpTvzydCQeHzK4Ef/D5rrSqwxob0t8PQPMybUNFM=
github.com/HdrHistogram/hdrhistogram-go v1.1.2/go.mod h1:yDgFjdqOqDEKOvasDdhWNXYg9BVp4O+o5f6V/ehm6Oo=
github.com/JmPotato/pd/client v0.0.0-20231107024609-216b5b763864 h1:q7k2boDgGSdnX/gXaIhk59V33J6GcxhpI53eoyVvMM0=
github.com/JmPotato/pd/client v0.0.0-20231107024609-216b5b763864/go.mod h1:cd6zBqRM9aogxf26K8NnFRPVtq9BnRE59tKEpX8IaWQ=
github.com/Joker/hpp v1.0.0/go.mod h1:8x5n+M1Hp5hC0g8okX3sR3vFQwynaX/UgSOM9MeBKzY=
github.com/Joker/jade v1.0.1-0.20190614124447-d475f43051e7/go.mod h1:6E6s8o2AE4KhCrqr6GRJjdC/gNfTdxkIXvuGZZda2VM=
github.com/Masterminds/semver v1.5.0 h1:H65muMkzWKEuNDnfl9d70GUjFniHKHRbFPGBuZ3QEww=
Expand Down Expand Up @@ -507,8 +509,6 @@ github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA=
github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM=
github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs=
github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/tikv/pd/client v0.0.0-20230912103610-2f57a9f050eb h1:hAcH9tFjQzQ3+ofrAHm4ajOTLliYCOfXpj3+boKOtac=
github.com/tikv/pd/client v0.0.0-20230912103610-2f57a9f050eb/go.mod h1:E+6qtPu8fJm5kNjvKWPVFqSgNAFPk07y2EjD03GWzuI=
github.com/tklauser/go-sysconf v0.3.9/go.mod h1:11DU/5sG7UexIrp/O6g35hrWzu0JxlwQ3LSFUzyeuhs=
github.com/tklauser/go-sysconf v0.3.11 h1:89WgdJhk5SNwJfu+GKyYveZ4IaJ7xAkecBo+KdJV0CM=
github.com/tklauser/go-sysconf v0.3.11/go.mod h1:GqXfhXY3kiPa0nAXPDIQIWzJbMCB7AmcWpGR8lSZfqI=
Expand Down
94 changes: 29 additions & 65 deletions integration_tests/pd_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,15 @@ type apiTestSuite struct {
}

func (s *apiTestSuite) SetupTest() {
require := s.Require()
addrs := strings.Split(*pdAddrs, ",")
pdClient, err := pd.NewClient(addrs, pd.SecurityOption{})
s.Require().NoError(err)
require.NoError(err)
rpcClient := tikv.NewRPCClient()
s.Require().NoError(failpoint.Enable("tikvclient/mockFastSafeTSUpdater", `return()`))
require.NoError(failpoint.Enable("tikvclient/mockFastSafeTSUpdater", `return()`))
// Set PD HTTP client.
store, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0, tikv.WithPDHTTPClient(nil, addrs))
s.store = store
s.store, err = tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0, tikv.WithPDHTTPClient(addrs, nil))
require.NoError(err)
storeID := uint64(1)
s.store.GetRegionCache().SetRegionCacheStore(storeID, s.storeAddr(storeID), s.storeAddr(storeID), tikvrpc.TiKV, 1, nil)
}
Expand Down Expand Up @@ -122,18 +123,17 @@ func (s *apiTestSuite) TestGetStoresMinResolvedTS() {
storeID := uint64(1)
s.store.GetRegionCache().SetRegionCacheStore(storeID, s.storeAddr(storeID), s.storeAddr(storeID), tikvrpc.TiKV, 1, labels)
// Try to get the minimum resolved timestamp of the stores from PD.
require.NoError(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(100)`))
var retryCount int
for s.store.GetMinSafeTS(dcLabel) != 100 {
time.Sleep(100 * time.Millisecond)
if retryCount > 5 {
break
}
retryCount++
}
require.NoError(failpoint.Enable("tikvclient/InjectPDMinResolvedTS", `return(100)`))
s.waitForMinSafeTS(dcLabel, 100)
require.Equal(int32(0), atomic.LoadInt32(&mockClient.requestCount))
require.Equal(uint64(100), s.store.GetMinSafeTS(dcLabel))
require.NoError(failpoint.Disable("tikvclient/InjectMinResolvedTS"))
require.NoError(failpoint.Disable("tikvclient/InjectPDMinResolvedTS"))
}

func (s *apiTestSuite) waitForMinSafeTS(txnScope string, ts uint64) {
s.Eventually(func() bool {
return s.store.GetMinSafeTS(txnScope) == ts
}, time.Second, 200*time.Millisecond)
}

func (s *apiTestSuite) TestDCLabelClusterMinResolvedTS() {
Expand All @@ -142,22 +142,15 @@ func (s *apiTestSuite) TestDCLabelClusterMinResolvedTS() {
mockClient := newStoreSafeTsMockClient(s.store.GetTiKVClient())
s.store.SetTiKVClient(&mockClient)
// Try to get the minimum resolved timestamp of the cluster from PD.
require.NoError(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(100)`))
var retryCount int
for s.store.GetMinSafeTS(oracle.GlobalTxnScope) != 100 {
time.Sleep(100 * time.Millisecond)
if retryCount > 5 {
break
}
retryCount++
}
require.NoError(failpoint.Enable("tikvclient/InjectPDMinResolvedTS", `return(100)`))
s.waitForMinSafeTS(oracle.GlobalTxnScope, 100)
require.Equal(atomic.LoadInt32(&mockClient.requestCount), int32(0))
require.Equal(uint64(100), s.store.GetMinSafeTS(oracle.GlobalTxnScope))
require.NoError(failpoint.Disable("tikvclient/InjectMinResolvedTS"))
require.NoError(failpoint.Disable("tikvclient/InjectPDMinResolvedTS"))

// Set DC label for store 1.
// Mock PD server not support get min resolved ts by stores.
require.NoError(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(0)`))
require.NoError(failpoint.Enable("tikvclient/InjectPDMinResolvedTS", `return(0)`))
dcLabel := "testDC"
restore := config.UpdateGlobal(func(conf *config.Config) {
conf.TxnScope = dcLabel
Expand All @@ -173,18 +166,10 @@ func (s *apiTestSuite) TestDCLabelClusterMinResolvedTS() {
storeID := uint64(1)
s.store.GetRegionCache().SetRegionCacheStore(storeID, s.storeAddr(storeID), s.storeAddr(storeID), tikvrpc.TiKV, 1, labels)
// Try to get the minimum resolved timestamp of the store from TiKV.
retryCount = 0
for s.store.GetMinSafeTS(dcLabel) != 150 {
time.Sleep(100 * time.Millisecond)
if retryCount > 5 {
break
}
retryCount++
}

s.waitForMinSafeTS(dcLabel, 150)
require.GreaterOrEqual(atomic.LoadInt32(&mockClient.requestCount), int32(1))
require.Equal(uint64(150), s.store.GetMinSafeTS(dcLabel))
require.NoError(failpoint.Disable("tikvclient/InjectMinResolvedTS"))
require.NoError(failpoint.Disable("tikvclient/InjectPDMinResolvedTS"))
}

func (s *apiTestSuite) TestInitClusterMinResolvedTSZero() {
Expand All @@ -196,47 +181,26 @@ func (s *apiTestSuite) TestInitClusterMinResolvedTSZero() {
// Make sure the store's min resolved ts is not initialized.
mockClient.SetKVSafeTS(0)
// Try to get the minimum resolved timestamp of the cluster from TiKV.
require.NoError(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(0)`))
var retryCount int
for s.store.GetMinSafeTS(oracle.GlobalTxnScope) != math.MaxUint64 {
time.Sleep(100 * time.Millisecond)
if retryCount > 5 {
break
}
retryCount++
}
require.NoError(failpoint.Enable("tikvclient/InjectPDMinResolvedTS", `return(0)`))
// Make sure the store's min resolved ts is not initialized.
s.waitForMinSafeTS(oracle.GlobalTxnScope, math.MaxUint64)
require.Equal(uint64(math.MaxUint64), s.store.GetMinSafeTS(oracle.GlobalTxnScope))
require.NoError(failpoint.Disable("tikvclient/InjectMinResolvedTS"))
require.NoError(failpoint.Disable("tikvclient/InjectPDMinResolvedTS"))

// Try to get the minimum resolved timestamp of the cluster from PD.
require.NoError(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(100)`))
retryCount = 0
for s.store.GetMinSafeTS(oracle.GlobalTxnScope) == math.MaxUint64 {
time.Sleep(100 * time.Millisecond)
if retryCount > 5 {
break
}
retryCount++
}
require.NoError(failpoint.Enable("tikvclient/InjectPDMinResolvedTS", `return(100)`))
// Make sure the store's min resolved ts is not regarded as MaxUint64.
s.waitForMinSafeTS(oracle.GlobalTxnScope, 100)
require.Equal(uint64(100), s.store.GetMinSafeTS(oracle.GlobalTxnScope))
require.NoError(failpoint.Disable("tikvclient/InjectMinResolvedTS"))
require.NoError(failpoint.Disable("tikvclient/InjectPDMinResolvedTS"))

// Fallback to KV Request when PD server not support get min resolved ts.
require.NoError(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(0)`))
require.NoError(failpoint.Enable("tikvclient/InjectPDMinResolvedTS", `return(0)`))
mockClient.SetKVSafeTS(150)
retryCount = 0
for s.store.GetMinSafeTS(oracle.GlobalTxnScope) != 150 {
time.Sleep(100 * time.Millisecond)
if retryCount > 5 {
break
}
retryCount++
}
// Make sure the minSafeTS can advance.
s.waitForMinSafeTS(oracle.GlobalTxnScope, 150)
require.Equal(uint64(150), s.store.GetMinSafeTS(oracle.GlobalTxnScope))
require.NoError(failpoint.Disable("tikvclient/InjectMinResolvedTS"))
require.NoError(failpoint.Disable("tikvclient/InjectPDMinResolvedTS"))
}

func (s *apiTestSuite) TearDownTest() {
Expand Down
4 changes: 4 additions & 0 deletions tikv/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/txnkv/txnlock"
pdhttp "github.com/tikv/pd/client/http"
)

// Storage represent the kv.Storage runs on TiKV.
Expand Down Expand Up @@ -69,6 +70,9 @@ type Storage interface {
// GetTiKVClient gets the TiKV client.
GetTiKVClient() Client

// GetPDHTTPClient gets the PD HTTP client.
GetPDHTTPClient() pdhttp.Client

// Closed returns the closed channel.
Closed() <-chan struct{}

Expand Down
61 changes: 50 additions & 11 deletions tikv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ import (
"github.com/tikv/client-go/v2/txnkv/txnsnapshot"
"github.com/tikv/client-go/v2/util"
pd "github.com/tikv/pd/client"
pdhttp "github.com/tikv/pd/client/http"
resourceControlClient "github.com/tikv/pd/client/resource_group/controller"
clientv3 "go.etcd.io/etcd/client/v3"
atomicutil "go.uber.org/atomic"
Expand Down Expand Up @@ -115,7 +116,7 @@ type KVStore struct {
client Client
}
pdClient pd.Client
pdHttpClient *util.PDHTTPClient
pdHttpClient pdhttp.Client
regionCache *locate.RegionCache
lockResolver *txnlock.LockResolver
txnLatches *latch.LatchesScheduler
Expand Down Expand Up @@ -147,6 +148,8 @@ type KVStore struct {
gP Pool
}

var _ Storage = (*KVStore)(nil)

// Go run the function in a separate goroutine.
func (s *KVStore) Go(f func()) error {
return s.gP.Run(f)
Expand Down Expand Up @@ -195,9 +198,9 @@ func WithPool(gp Pool) Option {
}

// WithPDHTTPClient set the PD HTTP client with the given address and TLS config.
func WithPDHTTPClient(tlsConf *tls.Config, pdaddrs []string) Option {
func WithPDHTTPClient(pdAddrs []string, tlsConf *tls.Config) Option {
return func(o *KVStore) {
o.pdHttpClient = util.NewPDHTTPClient(tlsConf, pdaddrs)
o.pdHttpClient = pdhttp.NewClient(pdAddrs, pdhttp.WithTLSConfig(tlsConf))
}
}

Expand Down Expand Up @@ -444,6 +447,11 @@ func (s *KVStore) GetPDClient() pd.Client {
return s.pdClient
}

// GetPDHTTPClient returns the PD HTTP client.
func (s *KVStore) GetPDHTTPClient() pdhttp.Client {
return s.pdHttpClient
}

// SupportDeleteRange gets the storage support delete range or not.
func (s *KVStore) SupportDeleteRange() (supported bool) {
return !s.mock
Expand Down Expand Up @@ -598,28 +606,31 @@ func (s *KVStore) updateSafeTS(ctx context.Context) {
err error
storeMinResolvedTSs map[uint64]uint64
)
storeIDs := make([]string, len(stores))
storeIDs := make([]uint64, len(stores))
if s.pdHttpClient != nil {
for i, store := range stores {
storeIDs[i] = strconv.FormatUint(store.StoreID(), 10)
storeIDs[i] = store.StoreID()
}
_, storeMinResolvedTSs, err = s.pdHttpClient.GetMinResolvedTSByStoresIDs(ctx, storeIDs)
_, storeMinResolvedTSs, err = s.getMinResolvedTSByStoresIDs(ctx, storeIDs)
if err != nil {
// If getting the minimum resolved timestamp from PD failed, log the error and need to get it from TiKV.
logutil.BgLogger().Debug("get resolved TS from PD failed", zap.Error(err), zap.Any("stores", storeIDs))
}
}

for i, store := range stores {
for _, store := range stores {
storeID := store.StoreID()
storeAddr := store.GetAddr()
if store.IsTiFlash() {
storeAddr = store.GetPeerAddr()
}
go func(ctx context.Context, wg *sync.WaitGroup, storeID uint64, storeAddr string, storeIDStr string) {
go func(ctx context.Context, wg *sync.WaitGroup, storeID uint64, storeAddr string) {
defer wg.Done()

var safeTS uint64
var (
safeTS uint64
storeIDStr = strconv.FormatUint(storeID, 10)
)
// If getting the minimum resolved timestamp from PD failed or returned 0, try to get it from TiKV.
if storeMinResolvedTSs == nil || storeMinResolvedTSs[storeID] == 0 || err != nil {
resp, err := tikvClient.SendRequest(
Expand Down Expand Up @@ -655,7 +666,7 @@ func (s *KVStore) updateSafeTS(ctx context.Context) {
metrics.TiKVSafeTSUpdateCounter.WithLabelValues("success", storeIDStr).Inc()
safeTSTime := oracle.GetTimeFromTS(safeTS)
metrics.TiKVMinSafeTSGapSeconds.WithLabelValues(storeIDStr).Set(time.Since(safeTSTime).Seconds())
}(ctx, wg, storeID, storeAddr, storeIDs[i])
}(ctx, wg, storeID, storeAddr)
}

txnScopeMap := make(map[string][]uint64)
Expand All @@ -672,6 +683,34 @@ func (s *KVStore) updateSafeTS(ctx context.Context) {
wg.Wait()
}

func (s *KVStore) getMinResolvedTSByStoresIDs(ctx context.Context, storeIDs []uint64) (uint64, map[uint64]uint64, error) {
var (
minResolvedTS uint64
storeMinResolvedTSs map[uint64]uint64
err error
)
minResolvedTS, storeMinResolvedTSs, err = s.pdHttpClient.GetMinResolvedTSByStoresIDs(ctx, storeIDs)
if err != nil {
return 0, nil, err
}
if val, e := util.EvalFailpoint("InjectPDMinResolvedTS"); e == nil {
injectedTS, ok := val.(int)
if !ok {
return minResolvedTS, storeMinResolvedTSs, err
}
minResolvedTS = uint64(injectedTS)
logutil.BgLogger().Info("inject min resolved ts", zap.Uint64("ts", uint64(injectedTS)))
// Currently we only have a store 1 in the test, so it's OK to inject the same min resolved TS for all stores here.
for storeID, v := range storeMinResolvedTSs {
if v != 0 && v != math.MaxUint64 {
storeMinResolvedTSs[storeID] = uint64(injectedTS)
logutil.BgLogger().Info("inject store min resolved ts", zap.Uint64("storeID", storeID), zap.Uint64("ts", uint64(injectedTS)))
}
}
}
return minResolvedTS, storeMinResolvedTSs, err
}

var (
skipSafeTSUpdateCounter = metrics.TiKVSafeTSUpdateCounter.WithLabelValues("skip", "cluster")
successSafeTSUpdateCounter = metrics.TiKVSafeTSUpdateCounter.WithLabelValues("success", "cluster")
Expand All @@ -684,7 +723,7 @@ func (s *KVStore) updateGlobalTxnScopeTSFromPD(ctx context.Context) bool {
isGlobal := config.GetTxnScopeFromConfig() == oracle.GlobalTxnScope
// Try to get the minimum resolved timestamp of the cluster from PD.
if s.pdHttpClient != nil && isGlobal {
clusterMinSafeTS, _, err := s.pdHttpClient.GetMinResolvedTSByStoresIDs(ctx, nil)
clusterMinSafeTS, _, err := s.getMinResolvedTSByStoresIDs(ctx, nil)
if err != nil {
logutil.BgLogger().Debug("get resolved TS from PD failed", zap.Error(err))
} else if clusterMinSafeTS != 0 {
Expand Down
Loading

0 comments on commit 845e3b0

Please sign in to comment.