Skip to content

Commit

Permalink
config: sync store config in time (#6919) (#6931)
Browse files Browse the repository at this point in the history
close #6918

add timeout context for observer tikv config to avoid wait too long

Co-authored-by: Shirly <[email protected]>
Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
3 people authored Sep 14, 2024
1 parent abc78cd commit ec9f2c3
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 15 deletions.
18 changes: 14 additions & 4 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,22 +296,28 @@ func (c *RaftCluster) runSyncConfig() {
defer ticker.Stop()
stores := c.GetStores()

syncConfig(c.storeConfigManager, stores)
syncConfig(c.ctx, c.storeConfigManager, stores)
for {
select {
case <-c.ctx.Done():
log.Info("sync store config job is stopped")
return
case <-ticker.C:
if !syncConfig(c.storeConfigManager, stores) {
if !syncConfig(c.ctx, c.storeConfigManager, stores) {
stores = c.GetStores()
}
}
}
}

func syncConfig(manager *config.StoreConfigManager, stores []*core.StoreInfo) bool {
func syncConfig(ctx context.Context, manager *config.StoreConfigManager, stores []*core.StoreInfo) bool {
for index := 0; index < len(stores); index++ {
select {
case <-ctx.Done():
log.Info("stop sync store config job due to server shutdown")
return false
default:
}
// filter out the stores that are tiflash
store := stores[index]
if core.IsStoreContainLabel(store.GetMeta(), core.EngineKey, core.EngineTiFlash) {
Expand All @@ -324,7 +330,11 @@ func syncConfig(manager *config.StoreConfigManager, stores []*core.StoreInfo) bo
}
// it will try next store if the current store is failed.
address := netutil.ResolveLoopBackAddr(stores[index].GetStatusAddress(), stores[index].GetAddress())
if err := manager.ObserveConfig(address); err != nil {
err := manager.ObserveConfig(ctx, address)
if err != nil {
// delete the store if it is failed and retry next store.
stores = append(stores[:index], stores[index+1:]...)
index--
storeSyncConfigEvent.WithLabelValues(address, "fail").Inc()
log.Debug("sync store config failed, it will try next store", zap.Error(err))
continue
Expand Down
39 changes: 38 additions & 1 deletion server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@ package cluster

import (
"context"
"encoding/json"
"fmt"
"math"
"math/rand"
"net/http"
"net/http/httptest"
"strings"
"sync"
"testing"
Expand Down Expand Up @@ -1173,6 +1176,40 @@ func (s *testClusterInfoSuite) TestOfflineAndMerge(c *C) {
}
}

func (s *testClusterInfoSuite) TestSyncConfigContext(c *C) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

_, opt, _ := newTestScheduleConfig()
tc := newTestCluster(ctx, opt)
tc.storeConfigManager = config.NewStoreConfigManager(http.DefaultClient)
tc.httpClient = &http.Client{}

server := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) {
time.Sleep(time.Second * 100)
cfg := &config.StoreConfig{}
b, err := json.Marshal(cfg)
if err != nil {
res.WriteHeader(http.StatusInternalServerError)
res.Write([]byte(fmt.Sprintf("failed setting up test server: %s", err)))
return
}

res.WriteHeader(http.StatusOK)
res.Write(b)
}))
stores := newTestStores(1, "2.0.0")
for _, s := range stores {
tc.putStoreLocked(s)
}
// trip schema header
now := time.Now()
stores[0].GetMeta().StatusAddress = server.URL[7:]
synced := syncConfig(tc.ctx, tc.storeConfigManager, stores)
c.Assert(synced, IsFalse)
c.Assert(time.Since(now), Less, clientTimeout*2)
}

func (s *testClusterInfoSuite) TestSyncConfig(c *C) {
_, opt, err := newTestScheduleConfig()
c.Assert(err, IsNil)
Expand Down Expand Up @@ -1200,7 +1237,7 @@ func (s *testClusterInfoSuite) TestSyncConfig(c *C) {
for _, v := range testdata {
tc.storeConfigManager = config.NewTestStoreConfigManager(v.whiteList)
c.Assert(tc.GetStoreConfig().GetRegionMaxSize(), Equals, uint64(144))
c.Assert(syncConfig(tc.storeConfigManager, tc.GetStores()), Equals, v.updated)
c.Assert(syncConfig(context.Background(), tc.storeConfigManager, tc.GetStores()), Equals, v.updated)
c.Assert(tc.GetStoreConfig().GetRegionMaxSize(), Equals, v.maxRegionSize)
}
}
Expand Down
29 changes: 21 additions & 8 deletions server/config/store_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@
package config

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io/ioutil"
"io"
"net/http"
"reflect"
"sync/atomic"
"time"

"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
Expand All @@ -41,6 +44,8 @@ var (
defaultRegionMaxKey = uint64(1440000)
// default region split key is 960000
defaultRegionSplitKey = uint64(960000)

clientTimeout = 3 * time.Second
)

// StoreConfig is the config of store like TiKV.
Expand Down Expand Up @@ -187,8 +192,8 @@ func NewTestStoreConfigManager(whiteList []string) *StoreConfigManager {
}

// ObserveConfig is used to observe the config change.
func (m *StoreConfigManager) ObserveConfig(address string) error {
cfg, err := m.source.GetConfig(address)
func (m *StoreConfigManager) ObserveConfig(ctx context.Context, address string) error {
cfg, err := m.source.GetConfig(ctx, address)
if err != nil {
return err
}
Expand All @@ -211,7 +216,7 @@ func (m *StoreConfigManager) GetStoreConfig() *StoreConfig {

// Source is used to get the store config.
type Source interface {
GetConfig(statusAddress string) (*StoreConfig, error)
GetConfig(ctx context.Context, statusAddress string) (*StoreConfig, error)
}

// TiKVConfigSource is used to get the store config from TiKV.
Expand All @@ -228,14 +233,22 @@ func newTiKVConfigSource(schema string, client *http.Client) *TiKVConfigSource {
}

// GetConfig returns the store config from TiKV.
func (s TiKVConfigSource) GetConfig(statusAddress string) (*StoreConfig, error) {
func (s TiKVConfigSource) GetConfig(ctx context.Context, statusAddress string) (*StoreConfig, error) {
url := fmt.Sprintf("%s://%s/config", s.schema, statusAddress)
resp, err := s.client.Get(url)
ctx, cancel := context.WithTimeout(ctx, clientTimeout)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, bytes.NewBuffer(nil))
if err != nil {
cancel()
return nil, fmt.Errorf("failed to create store config http request: %w", err)
}
resp, err := s.client.Do(req)
if err != nil {
cancel()
return nil, err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
body, err := io.ReadAll(resp.Body)
cancel()
if err != nil {
return nil, err
}
Expand All @@ -258,7 +271,7 @@ func newFakeSource(whiteList []string) *FakeSource {
}

// GetConfig returns the config.
func (f *FakeSource) GetConfig(url string) (*StoreConfig, error) {
func (f *FakeSource) GetConfig(_ context.Context, url string) (*StoreConfig, error) {
if !slice.Contains(f.whiteList, url) {
return nil, fmt.Errorf("[url:%s] is not in white list", url)
}
Expand Down
5 changes: 3 additions & 2 deletions server/config/store_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package config

import (
"context"
"crypto/tls"
"encoding/json"
"net/http"
Expand Down Expand Up @@ -62,9 +63,9 @@ func (t *testTiKVConfigSuite) TestTiKVConfig(c *C) {

func (t *testTiKVConfigSuite) TestUpdateConfig(c *C) {
manager := NewTestStoreConfigManager([]string{"tidb.com"})
manager.ObserveConfig("tikv.com")
manager.ObserveConfig(context.Background(), "tikv.com")
c.Assert(manager.GetStoreConfig().GetRegionMaxSize(), Equals, uint64(144))
manager.ObserveConfig("tidb.com")
manager.ObserveConfig(context.Background(), "tidb.com")
c.Assert(manager.GetStoreConfig().GetRegionMaxSize(), Equals, uint64(10))

client := &http.Client{
Expand Down

0 comments on commit ec9f2c3

Please sign in to comment.