From bea52c905195989ea0bf74fbfc25fd6a1de2a1d7 Mon Sep 17 00:00:00 2001 From: bufferflies <1045931706@qq.com> Date: Wed, 9 Aug 2023 14:10:46 +0800 Subject: [PATCH 1/2] sync config add context Signed-off-by: bufferflies <1045931706@qq.com> --- server/cluster/cluster.go | 28 +++++++++++++++++---- server/cluster/cluster_test.go | 45 ++++++++++++++++++++++++++++++++++ 2 files changed, 68 insertions(+), 5 deletions(-) diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 903d932590c..c9fa186f703 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -15,6 +15,7 @@ package cluster import ( + "bytes" "context" "encoding/json" "fmt" @@ -450,6 +451,12 @@ func (c *RaftCluster) runStoreConfigSync() { // - `switchRaftV2` is true if the config of tikv engine is change to raft-kv2. func (c *RaftCluster) syncStoreConfig(stores []*core.StoreInfo) (synced bool, switchRaftV2 bool) { for index := 0; index < len(stores); index++ { + select { + case <-c.ctx.Done(): + log.Info("sync store config job is stopped") + return + default: + } // filter out the stores that are tiflash store := stores[index] if store.IsTiFlash() { @@ -462,8 +469,11 @@ func (c *RaftCluster) syncStoreConfig(stores []*core.StoreInfo) (synced bool, sw } // it will try next store if the current store is failed. address := netutil.ResolveLoopBackAddr(stores[index].GetStatusAddress(), stores[index].GetAddress()) - switchRaftV2, err := c.observeStoreConfig(address) + switchRaftV2, err := c.observeStoreConfig(c.ctx, address) if err != nil { + // delete the store if it is failed and retry next store. + stores = append(stores[:index], stores[index+1:]...) + index-- storeSyncConfigEvent.WithLabelValues(address, "fail").Inc() log.Debug("sync store config failed, it will try next store", zap.Error(err)) continue @@ -479,8 +489,8 @@ func (c *RaftCluster) syncStoreConfig(stores []*core.StoreInfo) (synced bool, sw // observeStoreConfig is used to observe the store config changes and // return whether if the new config changes the engine to raft-kv2. -func (c *RaftCluster) observeStoreConfig(address string) (bool, error) { - cfg, err := c.fetchStoreConfigFromTiKV(address) +func (c *RaftCluster) observeStoreConfig(ctx context.Context, address string) (bool, error) { + cfg, err := c.fetchStoreConfigFromTiKV(ctx, address) if err != nil { return false, err } @@ -503,7 +513,7 @@ func (c *RaftCluster) updateStoreConfig(oldCfg, cfg *config.StoreConfig) (bool, } // fetchStoreConfigFromTiKV tries to fetch the config from the TiKV store URL. -func (c *RaftCluster) fetchStoreConfigFromTiKV(statusAddress string) (*config.StoreConfig, error) { +func (c *RaftCluster) fetchStoreConfigFromTiKV(ctx context.Context, statusAddress string) (*config.StoreConfig, error) { cfg := &config.StoreConfig{} failpoint.Inject("mockFetchStoreConfigFromTiKV", func(val failpoint.Value) { if regionMaxSize, ok := val.(string); ok { @@ -521,12 +531,20 @@ func (c *RaftCluster) fetchStoreConfigFromTiKV(statusAddress string) (*config.St } else { url = fmt.Sprintf("%s://%s/config", "http", statusAddress) } - resp, err := c.httpClient.Get(url) + ctx, cancel := context.WithTimeout(ctx, clientTimeout) + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, bytes.NewBuffer(nil)) + if err != nil { + cancel() + return nil, fmt.Errorf("failed to create http request: %w", err) + } + resp, err := c.httpClient.Do(req) if err != nil { + cancel() return nil, err } defer resp.Body.Close() body, err := io.ReadAll(resp.Body) + cancel() if err != nil { return nil, err } diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index a8d591cf61a..a69f981cc78 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -20,6 +20,8 @@ import ( "fmt" "math" "math/rand" + "net/http" + "net/http/httptest" "sync" "testing" "time" @@ -1386,6 +1388,49 @@ func TestStoreConfigUpdate(t *testing.T) { } } +func TestRemoveUnhealthyStore(t *testing.T) { + arr := []int{1, 2, 3} + re := require.New(t) + index := 0 + arr = append(arr[:index], arr[index+1:]...) + re.Len(arr, 2) +} + +func TestSyncConfigContext(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + _, opt, err := newTestScheduleConfig() + re.NoError(err) + tc := newTestCluster(ctx, opt) + tc.httpClient = &http.Client{} + + server := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) { + time.Sleep(time.Second * 100) + cfg := &config.StoreConfig{} + b, err := json.Marshal(cfg) + if err != nil { + res.WriteHeader(http.StatusInternalServerError) + res.Write([]byte(fmt.Sprintf("failed setting up test server: %s", err))) + return + } + + res.WriteHeader(http.StatusOK) + res.Write(b) + })) + stores := newTestStores(1, "2.0.0") + for _, s := range stores { + re.NoError(tc.putStoreLocked(s)) + } + // trip schema header + now := time.Now() + stores[0].GetMeta().StatusAddress = server.URL[7:] + synced, _ := tc.syncStoreConfig(tc.GetStores()) + re.False(synced) + re.Less(time.Since(now), clientTimeout*2) +} + func TestStoreConfigSync(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) From b14de0fe5387fe347df32d8fce5e7f6e5a347e10 Mon Sep 17 00:00:00 2001 From: bufferflies <1045931706@qq.com> Date: Wed, 9 Aug 2023 15:25:26 +0800 Subject: [PATCH 2/2] use different log Signed-off-by: bufferflies <1045931706@qq.com> --- server/cluster/cluster.go | 4 ++-- server/cluster/cluster_test.go | 8 -------- 2 files changed, 2 insertions(+), 10 deletions(-) diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index c9fa186f703..fd4ef1ad295 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -453,7 +453,7 @@ func (c *RaftCluster) syncStoreConfig(stores []*core.StoreInfo) (synced bool, sw for index := 0; index < len(stores); index++ { select { case <-c.ctx.Done(): - log.Info("sync store config job is stopped") + log.Info("stop sync store config job due to server shutdown") return default: } @@ -535,7 +535,7 @@ func (c *RaftCluster) fetchStoreConfigFromTiKV(ctx context.Context, statusAddres req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, bytes.NewBuffer(nil)) if err != nil { cancel() - return nil, fmt.Errorf("failed to create http request: %w", err) + return nil, fmt.Errorf("failed to create store config http request: %w", err) } resp, err := c.httpClient.Do(req) if err != nil { diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index a69f981cc78..10ea4dd9dbd 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -1388,14 +1388,6 @@ func TestStoreConfigUpdate(t *testing.T) { } } -func TestRemoveUnhealthyStore(t *testing.T) { - arr := []int{1, 2, 3} - re := require.New(t) - index := 0 - arr = append(arr[:index], arr[index+1:]...) - re.Len(arr, 2) -} - func TestSyncConfigContext(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background())