From 845e3b01a2a42b1e36564b14f2fd4b72eaaa285d Mon Sep 17 00:00:00 2001 From: JmPotato Date: Thu, 9 Nov 2023 10:31:01 +0800 Subject: [PATCH] Support to integrate with the PD HTTP client (#1049) Signed-off-by: JmPotato Co-authored-by: disksing --- go.mod | 2 + go.sum | 5 +- integration_tests/go.mod | 2 + integration_tests/go.sum | 4 +- integration_tests/pd_api_test.go | 94 ++++--------- tikv/interface.go | 4 + tikv/kv.go | 61 ++++++-- util/pd.go | 232 ------------------------------- util/pd_test.go | 97 ------------- 9 files changed, 92 insertions(+), 409 deletions(-) delete mode 100644 util/pd.go delete mode 100644 util/pd_test.go diff --git a/go.mod b/go.mod index dd96d1837..b30464f38 100644 --- a/go.mod +++ b/go.mod @@ -2,6 +2,8 @@ module github.com/tikv/client-go/v2 go 1.21 +replace github.com/tikv/pd/client v0.0.0-20230724080549-de985b8e0afc => github.com/JmPotato/pd/client v0.0.0-20231107024609-216b5b763864 + require ( github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548 github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 diff --git a/go.sum b/go.sum index 924b5284b..b6580095a 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,8 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/JmPotato/pd/client v0.0.0-20231107024609-216b5b763864 h1:q7k2boDgGSdnX/gXaIhk59V33J6GcxhpI53eoyVvMM0= +github.com/JmPotato/pd/client v0.0.0-20231107024609-216b5b763864/go.mod h1:cd6zBqRM9aogxf26K8NnFRPVtq9BnRE59tKEpX8IaWQ= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= @@ -111,8 +114,6 @@ github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a h1:J/YdBZ46WKpXsxsW93SG+q0F8KI+yFrcIDT4c/RNoc4= github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM= -github.com/tikv/pd/client v0.0.0-20230724080549-de985b8e0afc h1:IUg0j2nWoGYj3FQ3vA3vg97fPSpJEZQrDpgF8RkMLEU= -github.com/tikv/pd/client v0.0.0-20230724080549-de985b8e0afc/go.mod h1:wfHRc4iYaqJiOQZCHcrF+r4hYnkGDaYWDfcicee//pc= github.com/twmb/murmur3 v1.1.3 h1:D83U0XYKcHRYwYIpBKf3Pks91Z0Byda/9SJ8B6EMRcA= github.com/twmb/murmur3 v1.1.3/go.mod h1:Qq/R7NUyOfr65zD+6Q5IHKsJLwP7exErjN6lyyq3OSQ= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= diff --git a/integration_tests/go.mod b/integration_tests/go.mod index 4eef4ac41..3cbe43b23 100644 --- a/integration_tests/go.mod +++ b/integration_tests/go.mod @@ -2,6 +2,8 @@ module integration_tests go 1.21 +replace github.com/tikv/pd/client v0.0.0-20230912103610-2f57a9f050eb => github.com/JmPotato/pd/client v0.0.0-20231107024609-216b5b763864 + require ( github.com/ninedraft/israce v0.0.3 github.com/pingcap/errors v0.11.5-0.20221009092201-b66cddb77c32 diff --git a/integration_tests/go.sum b/integration_tests/go.sum index 20b25e62b..ff9c88b52 100644 --- a/integration_tests/go.sum +++ b/integration_tests/go.sum @@ -31,6 +31,8 @@ github.com/DataDog/zstd v1.4.5 h1:EndNeuB0l9syBZhut0wns3gV1hL8zX8LIu6ZiVHWLIQ= github.com/DataDog/zstd v1.4.5/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo= github.com/HdrHistogram/hdrhistogram-go v1.1.2 h1:5IcZpTvzydCQeHzK4Ef/D5rrSqwxob0t8PQPMybUNFM= github.com/HdrHistogram/hdrhistogram-go v1.1.2/go.mod h1:yDgFjdqOqDEKOvasDdhWNXYg9BVp4O+o5f6V/ehm6Oo= +github.com/JmPotato/pd/client v0.0.0-20231107024609-216b5b763864 h1:q7k2boDgGSdnX/gXaIhk59V33J6GcxhpI53eoyVvMM0= +github.com/JmPotato/pd/client v0.0.0-20231107024609-216b5b763864/go.mod h1:cd6zBqRM9aogxf26K8NnFRPVtq9BnRE59tKEpX8IaWQ= github.com/Joker/hpp v1.0.0/go.mod h1:8x5n+M1Hp5hC0g8okX3sR3vFQwynaX/UgSOM9MeBKzY= github.com/Joker/jade v1.0.1-0.20190614124447-d475f43051e7/go.mod h1:6E6s8o2AE4KhCrqr6GRJjdC/gNfTdxkIXvuGZZda2VM= github.com/Masterminds/semver v1.5.0 h1:H65muMkzWKEuNDnfl9d70GUjFniHKHRbFPGBuZ3QEww= @@ -507,8 +509,6 @@ github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM= github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs= github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= -github.com/tikv/pd/client v0.0.0-20230912103610-2f57a9f050eb h1:hAcH9tFjQzQ3+ofrAHm4ajOTLliYCOfXpj3+boKOtac= -github.com/tikv/pd/client v0.0.0-20230912103610-2f57a9f050eb/go.mod h1:E+6qtPu8fJm5kNjvKWPVFqSgNAFPk07y2EjD03GWzuI= github.com/tklauser/go-sysconf v0.3.9/go.mod h1:11DU/5sG7UexIrp/O6g35hrWzu0JxlwQ3LSFUzyeuhs= github.com/tklauser/go-sysconf v0.3.11 h1:89WgdJhk5SNwJfu+GKyYveZ4IaJ7xAkecBo+KdJV0CM= github.com/tklauser/go-sysconf v0.3.11/go.mod h1:GqXfhXY3kiPa0nAXPDIQIWzJbMCB7AmcWpGR8lSZfqI= diff --git a/integration_tests/pd_api_test.go b/integration_tests/pd_api_test.go index a990c9cad..9d33bd538 100644 --- a/integration_tests/pd_api_test.go +++ b/integration_tests/pd_api_test.go @@ -48,14 +48,15 @@ type apiTestSuite struct { } func (s *apiTestSuite) SetupTest() { + require := s.Require() addrs := strings.Split(*pdAddrs, ",") pdClient, err := pd.NewClient(addrs, pd.SecurityOption{}) - s.Require().NoError(err) + require.NoError(err) rpcClient := tikv.NewRPCClient() - s.Require().NoError(failpoint.Enable("tikvclient/mockFastSafeTSUpdater", `return()`)) + require.NoError(failpoint.Enable("tikvclient/mockFastSafeTSUpdater", `return()`)) // Set PD HTTP client. - store, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0, tikv.WithPDHTTPClient(nil, addrs)) - s.store = store + s.store, err = tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0, tikv.WithPDHTTPClient(addrs, nil)) + require.NoError(err) storeID := uint64(1) s.store.GetRegionCache().SetRegionCacheStore(storeID, s.storeAddr(storeID), s.storeAddr(storeID), tikvrpc.TiKV, 1, nil) } @@ -122,18 +123,17 @@ func (s *apiTestSuite) TestGetStoresMinResolvedTS() { storeID := uint64(1) s.store.GetRegionCache().SetRegionCacheStore(storeID, s.storeAddr(storeID), s.storeAddr(storeID), tikvrpc.TiKV, 1, labels) // Try to get the minimum resolved timestamp of the stores from PD. - require.NoError(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(100)`)) - var retryCount int - for s.store.GetMinSafeTS(dcLabel) != 100 { - time.Sleep(100 * time.Millisecond) - if retryCount > 5 { - break - } - retryCount++ - } + require.NoError(failpoint.Enable("tikvclient/InjectPDMinResolvedTS", `return(100)`)) + s.waitForMinSafeTS(dcLabel, 100) require.Equal(int32(0), atomic.LoadInt32(&mockClient.requestCount)) require.Equal(uint64(100), s.store.GetMinSafeTS(dcLabel)) - require.NoError(failpoint.Disable("tikvclient/InjectMinResolvedTS")) + require.NoError(failpoint.Disable("tikvclient/InjectPDMinResolvedTS")) +} + +func (s *apiTestSuite) waitForMinSafeTS(txnScope string, ts uint64) { + s.Eventually(func() bool { + return s.store.GetMinSafeTS(txnScope) == ts + }, time.Second, 200*time.Millisecond) } func (s *apiTestSuite) TestDCLabelClusterMinResolvedTS() { @@ -142,22 +142,15 @@ func (s *apiTestSuite) TestDCLabelClusterMinResolvedTS() { mockClient := newStoreSafeTsMockClient(s.store.GetTiKVClient()) s.store.SetTiKVClient(&mockClient) // Try to get the minimum resolved timestamp of the cluster from PD. - require.NoError(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(100)`)) - var retryCount int - for s.store.GetMinSafeTS(oracle.GlobalTxnScope) != 100 { - time.Sleep(100 * time.Millisecond) - if retryCount > 5 { - break - } - retryCount++ - } + require.NoError(failpoint.Enable("tikvclient/InjectPDMinResolvedTS", `return(100)`)) + s.waitForMinSafeTS(oracle.GlobalTxnScope, 100) require.Equal(atomic.LoadInt32(&mockClient.requestCount), int32(0)) require.Equal(uint64(100), s.store.GetMinSafeTS(oracle.GlobalTxnScope)) - require.NoError(failpoint.Disable("tikvclient/InjectMinResolvedTS")) + require.NoError(failpoint.Disable("tikvclient/InjectPDMinResolvedTS")) // Set DC label for store 1. // Mock PD server not support get min resolved ts by stores. - require.NoError(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(0)`)) + require.NoError(failpoint.Enable("tikvclient/InjectPDMinResolvedTS", `return(0)`)) dcLabel := "testDC" restore := config.UpdateGlobal(func(conf *config.Config) { conf.TxnScope = dcLabel @@ -173,18 +166,10 @@ func (s *apiTestSuite) TestDCLabelClusterMinResolvedTS() { storeID := uint64(1) s.store.GetRegionCache().SetRegionCacheStore(storeID, s.storeAddr(storeID), s.storeAddr(storeID), tikvrpc.TiKV, 1, labels) // Try to get the minimum resolved timestamp of the store from TiKV. - retryCount = 0 - for s.store.GetMinSafeTS(dcLabel) != 150 { - time.Sleep(100 * time.Millisecond) - if retryCount > 5 { - break - } - retryCount++ - } - + s.waitForMinSafeTS(dcLabel, 150) require.GreaterOrEqual(atomic.LoadInt32(&mockClient.requestCount), int32(1)) require.Equal(uint64(150), s.store.GetMinSafeTS(dcLabel)) - require.NoError(failpoint.Disable("tikvclient/InjectMinResolvedTS")) + require.NoError(failpoint.Disable("tikvclient/InjectPDMinResolvedTS")) } func (s *apiTestSuite) TestInitClusterMinResolvedTSZero() { @@ -196,47 +181,26 @@ func (s *apiTestSuite) TestInitClusterMinResolvedTSZero() { // Make sure the store's min resolved ts is not initialized. mockClient.SetKVSafeTS(0) // Try to get the minimum resolved timestamp of the cluster from TiKV. - require.NoError(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(0)`)) - var retryCount int - for s.store.GetMinSafeTS(oracle.GlobalTxnScope) != math.MaxUint64 { - time.Sleep(100 * time.Millisecond) - if retryCount > 5 { - break - } - retryCount++ - } + require.NoError(failpoint.Enable("tikvclient/InjectPDMinResolvedTS", `return(0)`)) // Make sure the store's min resolved ts is not initialized. + s.waitForMinSafeTS(oracle.GlobalTxnScope, math.MaxUint64) require.Equal(uint64(math.MaxUint64), s.store.GetMinSafeTS(oracle.GlobalTxnScope)) - require.NoError(failpoint.Disable("tikvclient/InjectMinResolvedTS")) + require.NoError(failpoint.Disable("tikvclient/InjectPDMinResolvedTS")) // Try to get the minimum resolved timestamp of the cluster from PD. - require.NoError(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(100)`)) - retryCount = 0 - for s.store.GetMinSafeTS(oracle.GlobalTxnScope) == math.MaxUint64 { - time.Sleep(100 * time.Millisecond) - if retryCount > 5 { - break - } - retryCount++ - } + require.NoError(failpoint.Enable("tikvclient/InjectPDMinResolvedTS", `return(100)`)) // Make sure the store's min resolved ts is not regarded as MaxUint64. + s.waitForMinSafeTS(oracle.GlobalTxnScope, 100) require.Equal(uint64(100), s.store.GetMinSafeTS(oracle.GlobalTxnScope)) - require.NoError(failpoint.Disable("tikvclient/InjectMinResolvedTS")) + require.NoError(failpoint.Disable("tikvclient/InjectPDMinResolvedTS")) // Fallback to KV Request when PD server not support get min resolved ts. - require.NoError(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(0)`)) + require.NoError(failpoint.Enable("tikvclient/InjectPDMinResolvedTS", `return(0)`)) mockClient.SetKVSafeTS(150) - retryCount = 0 - for s.store.GetMinSafeTS(oracle.GlobalTxnScope) != 150 { - time.Sleep(100 * time.Millisecond) - if retryCount > 5 { - break - } - retryCount++ - } // Make sure the minSafeTS can advance. + s.waitForMinSafeTS(oracle.GlobalTxnScope, 150) require.Equal(uint64(150), s.store.GetMinSafeTS(oracle.GlobalTxnScope)) - require.NoError(failpoint.Disable("tikvclient/InjectMinResolvedTS")) + require.NoError(failpoint.Disable("tikvclient/InjectPDMinResolvedTS")) } func (s *apiTestSuite) TearDownTest() { diff --git a/tikv/interface.go b/tikv/interface.go index 48babfb50..f1c674334 100644 --- a/tikv/interface.go +++ b/tikv/interface.go @@ -41,6 +41,7 @@ import ( "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/tikvrpc" "github.com/tikv/client-go/v2/txnkv/txnlock" + pdhttp "github.com/tikv/pd/client/http" ) // Storage represent the kv.Storage runs on TiKV. @@ -69,6 +70,9 @@ type Storage interface { // GetTiKVClient gets the TiKV client. GetTiKVClient() Client + // GetPDHTTPClient gets the PD HTTP client. + GetPDHTTPClient() pdhttp.Client + // Closed returns the closed channel. Closed() <-chan struct{} diff --git a/tikv/kv.go b/tikv/kv.go index 0d909c29f..73d0ceda2 100644 --- a/tikv/kv.go +++ b/tikv/kv.go @@ -66,6 +66,7 @@ import ( "github.com/tikv/client-go/v2/txnkv/txnsnapshot" "github.com/tikv/client-go/v2/util" pd "github.com/tikv/pd/client" + pdhttp "github.com/tikv/pd/client/http" resourceControlClient "github.com/tikv/pd/client/resource_group/controller" clientv3 "go.etcd.io/etcd/client/v3" atomicutil "go.uber.org/atomic" @@ -115,7 +116,7 @@ type KVStore struct { client Client } pdClient pd.Client - pdHttpClient *util.PDHTTPClient + pdHttpClient pdhttp.Client regionCache *locate.RegionCache lockResolver *txnlock.LockResolver txnLatches *latch.LatchesScheduler @@ -147,6 +148,8 @@ type KVStore struct { gP Pool } +var _ Storage = (*KVStore)(nil) + // Go run the function in a separate goroutine. func (s *KVStore) Go(f func()) error { return s.gP.Run(f) @@ -195,9 +198,9 @@ func WithPool(gp Pool) Option { } // WithPDHTTPClient set the PD HTTP client with the given address and TLS config. -func WithPDHTTPClient(tlsConf *tls.Config, pdaddrs []string) Option { +func WithPDHTTPClient(pdAddrs []string, tlsConf *tls.Config) Option { return func(o *KVStore) { - o.pdHttpClient = util.NewPDHTTPClient(tlsConf, pdaddrs) + o.pdHttpClient = pdhttp.NewClient(pdAddrs, pdhttp.WithTLSConfig(tlsConf)) } } @@ -444,6 +447,11 @@ func (s *KVStore) GetPDClient() pd.Client { return s.pdClient } +// GetPDHTTPClient returns the PD HTTP client. +func (s *KVStore) GetPDHTTPClient() pdhttp.Client { + return s.pdHttpClient +} + // SupportDeleteRange gets the storage support delete range or not. func (s *KVStore) SupportDeleteRange() (supported bool) { return !s.mock @@ -598,28 +606,31 @@ func (s *KVStore) updateSafeTS(ctx context.Context) { err error storeMinResolvedTSs map[uint64]uint64 ) - storeIDs := make([]string, len(stores)) + storeIDs := make([]uint64, len(stores)) if s.pdHttpClient != nil { for i, store := range stores { - storeIDs[i] = strconv.FormatUint(store.StoreID(), 10) + storeIDs[i] = store.StoreID() } - _, storeMinResolvedTSs, err = s.pdHttpClient.GetMinResolvedTSByStoresIDs(ctx, storeIDs) + _, storeMinResolvedTSs, err = s.getMinResolvedTSByStoresIDs(ctx, storeIDs) if err != nil { // If getting the minimum resolved timestamp from PD failed, log the error and need to get it from TiKV. logutil.BgLogger().Debug("get resolved TS from PD failed", zap.Error(err), zap.Any("stores", storeIDs)) } } - for i, store := range stores { + for _, store := range stores { storeID := store.StoreID() storeAddr := store.GetAddr() if store.IsTiFlash() { storeAddr = store.GetPeerAddr() } - go func(ctx context.Context, wg *sync.WaitGroup, storeID uint64, storeAddr string, storeIDStr string) { + go func(ctx context.Context, wg *sync.WaitGroup, storeID uint64, storeAddr string) { defer wg.Done() - var safeTS uint64 + var ( + safeTS uint64 + storeIDStr = strconv.FormatUint(storeID, 10) + ) // If getting the minimum resolved timestamp from PD failed or returned 0, try to get it from TiKV. if storeMinResolvedTSs == nil || storeMinResolvedTSs[storeID] == 0 || err != nil { resp, err := tikvClient.SendRequest( @@ -655,7 +666,7 @@ func (s *KVStore) updateSafeTS(ctx context.Context) { metrics.TiKVSafeTSUpdateCounter.WithLabelValues("success", storeIDStr).Inc() safeTSTime := oracle.GetTimeFromTS(safeTS) metrics.TiKVMinSafeTSGapSeconds.WithLabelValues(storeIDStr).Set(time.Since(safeTSTime).Seconds()) - }(ctx, wg, storeID, storeAddr, storeIDs[i]) + }(ctx, wg, storeID, storeAddr) } txnScopeMap := make(map[string][]uint64) @@ -672,6 +683,34 @@ func (s *KVStore) updateSafeTS(ctx context.Context) { wg.Wait() } +func (s *KVStore) getMinResolvedTSByStoresIDs(ctx context.Context, storeIDs []uint64) (uint64, map[uint64]uint64, error) { + var ( + minResolvedTS uint64 + storeMinResolvedTSs map[uint64]uint64 + err error + ) + minResolvedTS, storeMinResolvedTSs, err = s.pdHttpClient.GetMinResolvedTSByStoresIDs(ctx, storeIDs) + if err != nil { + return 0, nil, err + } + if val, e := util.EvalFailpoint("InjectPDMinResolvedTS"); e == nil { + injectedTS, ok := val.(int) + if !ok { + return minResolvedTS, storeMinResolvedTSs, err + } + minResolvedTS = uint64(injectedTS) + logutil.BgLogger().Info("inject min resolved ts", zap.Uint64("ts", uint64(injectedTS))) + // Currently we only have a store 1 in the test, so it's OK to inject the same min resolved TS for all stores here. + for storeID, v := range storeMinResolvedTSs { + if v != 0 && v != math.MaxUint64 { + storeMinResolvedTSs[storeID] = uint64(injectedTS) + logutil.BgLogger().Info("inject store min resolved ts", zap.Uint64("storeID", storeID), zap.Uint64("ts", uint64(injectedTS))) + } + } + } + return minResolvedTS, storeMinResolvedTSs, err +} + var ( skipSafeTSUpdateCounter = metrics.TiKVSafeTSUpdateCounter.WithLabelValues("skip", "cluster") successSafeTSUpdateCounter = metrics.TiKVSafeTSUpdateCounter.WithLabelValues("success", "cluster") @@ -684,7 +723,7 @@ func (s *KVStore) updateGlobalTxnScopeTSFromPD(ctx context.Context) bool { isGlobal := config.GetTxnScopeFromConfig() == oracle.GlobalTxnScope // Try to get the minimum resolved timestamp of the cluster from PD. if s.pdHttpClient != nil && isGlobal { - clusterMinSafeTS, _, err := s.pdHttpClient.GetMinResolvedTSByStoresIDs(ctx, nil) + clusterMinSafeTS, _, err := s.getMinResolvedTSByStoresIDs(ctx, nil) if err != nil { logutil.BgLogger().Debug("get resolved TS from PD failed", zap.Error(err)) } else if clusterMinSafeTS != 0 { diff --git a/util/pd.go b/util/pd.go deleted file mode 100644 index 6f27c514f..000000000 --- a/util/pd.go +++ /dev/null @@ -1,232 +0,0 @@ -// Copyright 2023 TiKV Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// NOTE: The code in this file is based on code from the -// TiDB project, licensed under the Apache License v 2.0 -// -// https://github.com/pingcap/tidb/tree/cc5e161ac06827589c4966674597c137cc9e809c/store/tikv/util/execdetails.go -// - -// Copyright 2023 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package util - -import ( - "context" - "crypto/tls" - "encoding/json" - "fmt" - "io" - "net/http" - "net/url" - "os" - "strings" - "syscall" - "time" - - "github.com/pingcap/errors" - "github.com/tikv/client-go/v2/internal/logutil" - "go.uber.org/zap" -) - -const ( - // pd request retry time when connection fail. - pdRequestRetryTime = 10 - - minResolvedTSPrefix = "pd/api/v1/min-resolved-ts" -) - -// PDHTTPClient is an HTTP client of pd. -type PDHTTPClient struct { - addrs []string - cli *http.Client -} - -func NewPDHTTPClient( - tlsConf *tls.Config, - pdAddrs []string, -) *PDHTTPClient { - for i, addr := range pdAddrs { - if !strings.HasPrefix(addr, "http") { - if tlsConf != nil { - addr = "https://" + addr - } else { - addr = "http://" + addr - } - pdAddrs[i] = addr - } - } - - return &PDHTTPClient{ - addrs: pdAddrs, - cli: httpClient(tlsConf), - } -} - -// GetMinResolvedTSByStoresIDs get min-resolved-ts from pd by stores ids. -func (p *PDHTTPClient) GetMinResolvedTSByStoresIDs(ctx context.Context, storeIDs []string) (uint64, map[uint64]uint64, error) { - var err error - for _, addr := range p.addrs { - // scope is an optional parameter, it can be `cluster` or specified store IDs. - // - When no scope is given, cluster-level's min_resolved_ts will be returned and storesMinResolvedTS will be nil. - // - When scope is `cluster`, cluster-level's min_resolved_ts will be returned and storesMinResolvedTS will be filled. - // - When scope given a list of stores, min_resolved_ts will be provided for each store - // and the scope-specific min_resolved_ts will be returned. - query := minResolvedTSPrefix - if len(storeIDs) != 0 { - query = fmt.Sprintf("%s?scope=%s", query, strings.Join(storeIDs, ",")) - } - v, e := pdRequest(ctx, addr, query, p.cli, http.MethodGet, nil) - if e != nil { - logutil.BgLogger().Debug("failed to get min resolved ts", zap.String("addr", addr), zap.Error(e)) - err = e - continue - } - logutil.BgLogger().Debug("min resolved ts", zap.String("resp", string(v))) - d := struct { - MinResolvedTS uint64 `json:"min_resolved_ts"` - IsRealTime bool `json:"is_real_time,omitempty"` - StoresMinResolvedTS map[uint64]uint64 `json:"stores_min_resolved_ts"` - }{} - err = json.Unmarshal(v, &d) - if err != nil { - return 0, nil, errors.Trace(err) - } - if !d.IsRealTime { - message := fmt.Errorf("min resolved ts not enabled, addr: %s", addr) - logutil.BgLogger().Debug(message.Error()) - return 0, nil, errors.Trace(message) - } - if val, e := EvalFailpoint("InjectMinResolvedTS"); e == nil { - // Need to make sure successfully get from real pd. - if d.StoresMinResolvedTS != nil { - for storeID, v := range d.StoresMinResolvedTS { - if v != 0 { - // Should be val.(uint64) but failpoint doesn't support that. - if tmp, ok := val.(int); ok { - d.StoresMinResolvedTS[storeID] = uint64(tmp) - logutil.BgLogger().Info("inject min resolved ts", zap.Uint64("storeID", storeID), zap.Uint64("ts", uint64(tmp))) - } - } - } - } else if tmp, ok := val.(int); ok { - // Should be val.(uint64) but failpoint doesn't support that. - // ci's store id is 1, we can change it if we have more stores. - // but for pool ci it's no need to do that :( - d.MinResolvedTS = uint64(tmp) - logutil.BgLogger().Info("inject min resolved ts", zap.Uint64("ts", uint64(tmp))) - } - - } - - return d.MinResolvedTS, d.StoresMinResolvedTS, nil - } - - return 0, nil, errors.Trace(err) -} - -// pdRequest is a func to send an HTTP to pd and return the result bytes. -func pdRequest( - ctx context.Context, - addr string, prefix string, - cli *http.Client, method string, body io.Reader) ([]byte, error) { - u, err := url.Parse(addr) - if err != nil { - return nil, errors.Trace(err) - } - reqURL := fmt.Sprintf("%s/%s", u, prefix) - req, err := http.NewRequestWithContext(ctx, method, reqURL, body) - if err != nil { - return nil, errors.Trace(err) - } - var resp *http.Response - count := 0 - for { - resp, err = cli.Do(req) - count++ - - if _, e := EvalFailpoint("InjectClosed"); e == nil { - resp = nil - err = &url.Error{ - Op: "read", - Err: os.NewSyscallError("connect", syscall.ECONNREFUSED), - } - return nil, errors.Trace(err) - } - - if count > pdRequestRetryTime || (resp != nil && resp.StatusCode < 500) || - err != nil { - break - } - if resp != nil { - _ = resp.Body.Close() - } - time.Sleep(pdRequestRetryInterval()) - } - if err != nil { - return nil, errors.Trace(err) - } - defer func() { - _ = resp.Body.Close() - }() - - if resp.StatusCode != http.StatusOK { - res, _ := io.ReadAll(resp.Body) - return nil, errors.Errorf("[%d] %s %s", resp.StatusCode, res, reqURL) - } - - r, err := io.ReadAll(resp.Body) - if err != nil { - return nil, errors.Trace(err) - } - - return r, err -} - -func pdRequestRetryInterval() time.Duration { - if _, e := EvalFailpoint("FastRetry"); e == nil { - return 0 - } - return time.Second -} - -// httpClient returns an HTTP(s) client. -func httpClient(tlsConf *tls.Config) *http.Client { - // defaultTimeout for non-context requests. - const defaultTimeout = 30 * time.Second - cli := &http.Client{Timeout: defaultTimeout} - if tlsConf != nil { - transport := http.DefaultTransport.(*http.Transport).Clone() - transport.TLSClientConfig = tlsConf - cli.Transport = transport - } - return cli -} - -func (p *PDHTTPClient) Close() { - p.cli.CloseIdleConnections() - logutil.BgLogger().Info("closed pd http client") -} diff --git a/util/pd_test.go b/util/pd_test.go deleted file mode 100644 index 1c7b56c24..000000000 --- a/util/pd_test.go +++ /dev/null @@ -1,97 +0,0 @@ -// Copyright 2023 TiKV Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// NOTE: The code in this file is based on code from the -// TiDB project, licensed under the Apache License v 2.0 -// -// https://github.com/pingcap/tidb/tree/cc5e161ac06827589c4966674597c137cc9e809c/store/tikv/util/rate_limit_test.go -// - -// Copyright 2023 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package util - -import ( - "context" - "fmt" - "net/http" - "net/http/httptest" - "testing" - - "github.com/pingcap/failpoint" - "github.com/stretchr/testify/require" -) - -func TestPDRequestRetry(t *testing.T) { - EnableFailpoints() - ctx := context.Background() - require := require.New(t) - require.Nil(failpoint.Enable("tikvclient/FastRetry", `return()`)) - defer func() { - require.Nil(failpoint.Disable("tikvclient/FastRetry")) - }() - - count := 0 - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - count++ - if count <= pdRequestRetryTime-1 { - w.WriteHeader(http.StatusGatewayTimeout) - return - } - w.WriteHeader(http.StatusOK) - })) - cli := http.DefaultClient - taddr := ts.URL - _, reqErr := pdRequest(ctx, taddr, "", cli, http.MethodGet, nil) - require.Nil(reqErr) - ts.Close() - - count = 0 - ts = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - count++ - if count <= pdRequestRetryTime+1 { - w.WriteHeader(http.StatusGatewayTimeout) - return - } - w.WriteHeader(http.StatusOK) - })) - taddr = ts.URL - _, reqErr = pdRequest(ctx, taddr, "", cli, http.MethodGet, nil) - require.Error(reqErr) - ts.Close() - - require.Nil(failpoint.Enable("tikvclient/InjectClosed", fmt.Sprintf("return(%d)", 0))) - defer func() { - require.Nil(failpoint.Disable("tikvclient/InjectClosed")) - }() - ts = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusOK) - })) - taddr = ts.URL - _, reqErr = pdRequest(ctx, taddr, "", cli, http.MethodGet, nil) - require.Error(reqErr) - ts.Close() -}