diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 9fa4591430e..f80138c92ff 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -307,22 +307,28 @@ func (c *RaftCluster) runSyncConfig() { defer ticker.Stop() stores := c.GetStores() - syncConfig(c.storeConfigManager, stores) + syncConfig(c.ctx, c.storeConfigManager, stores) for { select { case <-c.ctx.Done(): log.Info("sync store config job is stopped") return case <-ticker.C: - if !syncConfig(c.storeConfigManager, stores) { + if !syncConfig(c.ctx, c.storeConfigManager, stores) { stores = c.GetStores() } } } } -func syncConfig(manager *config.StoreConfigManager, stores []*core.StoreInfo) bool { +func syncConfig(ctx context.Context, manager *config.StoreConfigManager, stores []*core.StoreInfo) bool { for index := 0; index < len(stores); index++ { + select { + case <-ctx.Done(): + log.Info("stop sync store config job due to raft cluster exit") + return false + default: + } // filter out the stores that are tiflash store := stores[index] if core.IsStoreContainLabel(store.GetMeta(), core.EngineKey, core.EngineTiFlash) { @@ -335,7 +341,9 @@ func syncConfig(manager *config.StoreConfigManager, stores []*core.StoreInfo) bo } // it will try next store if the current store is failed. address := netutil.ResolveLoopBackAddr(stores[index].GetStatusAddress(), stores[index].GetAddress()) - if err := manager.ObserveConfig(address); err != nil { + if err := manager.ObserveConfig(ctx, address); err != nil { + stores = append(stores[:index], stores[index+1:]...) + index-- storeSyncConfigEvent.WithLabelValues(address, "fail").Inc() log.Debug("sync store config failed, it will try next store", zap.Error(err)) continue diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index b19fb71bce1..9adce941a06 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -16,9 +16,12 @@ package cluster import ( "context" + "encoding/json" "fmt" "math" "math/rand" + "net/http" + "net/http/httptest" "sync" "testing" "time" @@ -1319,11 +1322,47 @@ func TestSyncConfig(t *testing.T) { for _, v := range testdata { tc.storeConfigManager = config.NewTestStoreConfigManager(v.whiteList) re.Equal(uint64(144), tc.GetStoreConfig().GetRegionMaxSize()) - re.Equal(v.updated, syncConfig(tc.storeConfigManager, tc.GetStores())) + re.Equal(v.updated, syncConfig(tc.ctx, tc.storeConfigManager, tc.GetStores())) re.Equal(v.maxRegionSize, tc.GetStoreConfig().GetRegionMaxSize()) } } +func TestSyncConfigContext(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + _, opt, err := newTestScheduleConfig() + re.NoError(err) + tc := newTestCluster(ctx, opt) + tc.storeConfigManager = config.NewStoreConfigManager(http.DefaultClient) + tc.httpClient = &http.Client{} + + server := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) { + time.Sleep(time.Second * 100) + cfg := &config.StoreConfig{} + b, err := json.Marshal(cfg) + if err != nil { + res.WriteHeader(http.StatusInternalServerError) + res.Write([]byte(fmt.Sprintf("failed setting up test server: %s", err))) + return + } + + res.WriteHeader(http.StatusOK) + res.Write(b) + })) + stores := newTestStores(1, "2.0.0") + for _, s := range stores { + re.NoError(tc.putStoreLocked(s)) + } + // trip schema header + now := time.Now() + stores[0].GetMeta().StatusAddress = server.URL[7:] + synced := syncConfig(tc.ctx, tc.storeConfigManager, stores) + re.False(synced) + re.Less(time.Since(now), clientTimeout*2) +} + func TestUpdateStorePendingPeerCount(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) diff --git a/server/config/store_config.go b/server/config/store_config.go index 960ea6688e7..e322f3e122e 100644 --- a/server/config/store_config.go +++ b/server/config/store_config.go @@ -15,12 +15,14 @@ package config import ( + "context" "encoding/json" "fmt" "io" "net/http" "reflect" "sync/atomic" + "time" "github.com/pingcap/log" "github.com/tikv/pd/pkg/errs" @@ -41,6 +43,7 @@ var ( defaultRegionMaxKey = uint64(1440000) // default region split key is 960000 defaultRegionSplitKey = uint64(960000) + clientTimeout = 3 * time.Second ) // StoreConfig is the config of store like TiKV. @@ -191,8 +194,8 @@ func NewTestStoreConfigManager(whiteList []string) *StoreConfigManager { } // ObserveConfig is used to observe the config change. -func (m *StoreConfigManager) ObserveConfig(address string) error { - cfg, err := m.source.GetConfig(address) +func (m *StoreConfigManager) ObserveConfig(ctx context.Context, address string) error { + cfg, err := m.source.GetConfig(ctx, address) if err != nil { return err } @@ -222,7 +225,7 @@ func (m *StoreConfigManager) GetStoreConfig() *StoreConfig { // Source is used to get the store config. type Source interface { - GetConfig(statusAddress string) (*StoreConfig, error) + GetConfig(ctx context.Context, statusAddress string) (*StoreConfig, error) } // TiKVConfigSource is used to get the store config from TiKV. @@ -239,9 +242,15 @@ func newTiKVConfigSource(schema string, client *http.Client) *TiKVConfigSource { } // GetConfig returns the store config from TiKV. -func (s TiKVConfigSource) GetConfig(statusAddress string) (*StoreConfig, error) { +func (s TiKVConfigSource) GetConfig(ctx context.Context, statusAddress string) (*StoreConfig, error) { url := fmt.Sprintf("%s://%s/config", s.schema, statusAddress) - resp, err := s.client.Get(url) + ctx, cancel := context.WithTimeout(ctx, clientTimeout) + defer cancel() + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + return nil, fmt.Errorf("failed to create store config http request: %w", err) + } + resp, err := s.client.Do(req) if err != nil { return nil, err } @@ -269,7 +278,7 @@ func newFakeSource(whiteList []string) *FakeSource { } // GetConfig returns the config. -func (f *FakeSource) GetConfig(url string) (*StoreConfig, error) { +func (f *FakeSource) GetConfig(_ context.Context, url string) (*StoreConfig, error) { if !slice.Contains(f.whiteList, url) { return nil, fmt.Errorf("[url:%s] is not in white list", url) } diff --git a/server/config/store_config_test.go b/server/config/store_config_test.go index 6916fedc929..3ab3e4900e3 100644 --- a/server/config/store_config_test.go +++ b/server/config/store_config_test.go @@ -15,6 +15,7 @@ package config import ( + "context" "crypto/tls" "encoding/json" "net/http" @@ -63,13 +64,13 @@ func TestTiKVConfig(t *testing.T) { func TestUpdateConfig(t *testing.T) { re := require.New(t) manager := NewTestStoreConfigManager([]string{"tidb.com"}) - manager.ObserveConfig("tikv.com") + manager.ObserveConfig(context.Background(), "tikv.com") re.Equal(uint64(144), manager.GetStoreConfig().GetRegionMaxSize()) - manager.ObserveConfig("tidb.com") + manager.ObserveConfig(context.Background(), "tidb.com") re.Equal(uint64(10), manager.GetStoreConfig().GetRegionMaxSize()) // case2: the config should not update if config is same expect some ignore field. - c, err := manager.source.GetConfig("tidb.com") + c, err := manager.source.GetConfig(context.Background(), "tidb.com") re.NoError(err) re.True(manager.GetStoreConfig().Equal(c))