Skip to content

Commit

Permalink
config: sync store config in time (#6919) (#6932)
Browse files Browse the repository at this point in the history
close #6918

add timeout context for observer tikv config to avoid wait too long

Signed-off-by: bufferflies <[email protected]>

Co-authored-by: bufferflies <[email protected]>
Co-authored-by: buffer <[email protected]>
  • Loading branch information
ti-chi-bot and bufferflies authored Aug 14, 2023
1 parent 3a19dec commit 9a9fc0f
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 14 deletions.
16 changes: 12 additions & 4 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,22 +307,28 @@ func (c *RaftCluster) runSyncConfig() {
defer ticker.Stop()
stores := c.GetStores()

syncConfig(c.storeConfigManager, stores)
syncConfig(c.ctx, c.storeConfigManager, stores)
for {
select {
case <-c.ctx.Done():
log.Info("sync store config job is stopped")
return
case <-ticker.C:
if !syncConfig(c.storeConfigManager, stores) {
if !syncConfig(c.ctx, c.storeConfigManager, stores) {
stores = c.GetStores()
}
}
}
}

func syncConfig(manager *config.StoreConfigManager, stores []*core.StoreInfo) bool {
func syncConfig(ctx context.Context, manager *config.StoreConfigManager, stores []*core.StoreInfo) bool {
for index := 0; index < len(stores); index++ {
select {
case <-ctx.Done():
log.Info("stop sync store config job due to raft cluster exit")
return false
default:
}
// filter out the stores that are tiflash
store := stores[index]
if core.IsStoreContainLabel(store.GetMeta(), core.EngineKey, core.EngineTiFlash) {
Expand All @@ -335,7 +341,9 @@ func syncConfig(manager *config.StoreConfigManager, stores []*core.StoreInfo) bo
}
// it will try next store if the current store is failed.
address := netutil.ResolveLoopBackAddr(stores[index].GetStatusAddress(), stores[index].GetAddress())
if err := manager.ObserveConfig(address); err != nil {
if err := manager.ObserveConfig(ctx, address); err != nil {
stores = append(stores[:index], stores[index+1:]...)
index--
storeSyncConfigEvent.WithLabelValues(address, "fail").Inc()
log.Debug("sync store config failed, it will try next store", zap.Error(err))
continue
Expand Down
41 changes: 40 additions & 1 deletion server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@ package cluster

import (
"context"
"encoding/json"
"fmt"
"math"
"math/rand"
"net/http"
"net/http/httptest"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -1319,11 +1322,47 @@ func TestSyncConfig(t *testing.T) {
for _, v := range testdata {
tc.storeConfigManager = config.NewTestStoreConfigManager(v.whiteList)
re.Equal(uint64(144), tc.GetStoreConfig().GetRegionMaxSize())
re.Equal(v.updated, syncConfig(tc.storeConfigManager, tc.GetStores()))
re.Equal(v.updated, syncConfig(tc.ctx, tc.storeConfigManager, tc.GetStores()))
re.Equal(v.maxRegionSize, tc.GetStoreConfig().GetRegionMaxSize())
}
}

func TestSyncConfigContext(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

_, opt, err := newTestScheduleConfig()
re.NoError(err)
tc := newTestCluster(ctx, opt)
tc.storeConfigManager = config.NewStoreConfigManager(http.DefaultClient)
tc.httpClient = &http.Client{}

server := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) {
time.Sleep(time.Second * 100)
cfg := &config.StoreConfig{}
b, err := json.Marshal(cfg)
if err != nil {
res.WriteHeader(http.StatusInternalServerError)
res.Write([]byte(fmt.Sprintf("failed setting up test server: %s", err)))
return
}

res.WriteHeader(http.StatusOK)
res.Write(b)
}))
stores := newTestStores(1, "2.0.0")
for _, s := range stores {
re.NoError(tc.putStoreLocked(s))
}
// trip schema header
now := time.Now()
stores[0].GetMeta().StatusAddress = server.URL[7:]
synced := syncConfig(tc.ctx, tc.storeConfigManager, stores)
re.False(synced)
re.Less(time.Since(now), clientTimeout*2)
}

func TestUpdateStorePendingPeerCount(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
Expand Down
21 changes: 15 additions & 6 deletions server/config/store_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
package config

import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"reflect"
"sync/atomic"
"time"

"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
Expand All @@ -41,6 +43,7 @@ var (
defaultRegionMaxKey = uint64(1440000)
// default region split key is 960000
defaultRegionSplitKey = uint64(960000)
clientTimeout = 3 * time.Second
)

// StoreConfig is the config of store like TiKV.
Expand Down Expand Up @@ -191,8 +194,8 @@ func NewTestStoreConfigManager(whiteList []string) *StoreConfigManager {
}

// ObserveConfig is used to observe the config change.
func (m *StoreConfigManager) ObserveConfig(address string) error {
cfg, err := m.source.GetConfig(address)
func (m *StoreConfigManager) ObserveConfig(ctx context.Context, address string) error {
cfg, err := m.source.GetConfig(ctx, address)
if err != nil {
return err
}
Expand Down Expand Up @@ -222,7 +225,7 @@ func (m *StoreConfigManager) GetStoreConfig() *StoreConfig {

// Source is used to get the store config.
type Source interface {
GetConfig(statusAddress string) (*StoreConfig, error)
GetConfig(ctx context.Context, statusAddress string) (*StoreConfig, error)
}

// TiKVConfigSource is used to get the store config from TiKV.
Expand All @@ -239,9 +242,15 @@ func newTiKVConfigSource(schema string, client *http.Client) *TiKVConfigSource {
}

// GetConfig returns the store config from TiKV.
func (s TiKVConfigSource) GetConfig(statusAddress string) (*StoreConfig, error) {
func (s TiKVConfigSource) GetConfig(ctx context.Context, statusAddress string) (*StoreConfig, error) {
url := fmt.Sprintf("%s://%s/config", s.schema, statusAddress)
resp, err := s.client.Get(url)
ctx, cancel := context.WithTimeout(ctx, clientTimeout)
defer cancel()
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return nil, fmt.Errorf("failed to create store config http request: %w", err)
}
resp, err := s.client.Do(req)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -269,7 +278,7 @@ func newFakeSource(whiteList []string) *FakeSource {
}

// GetConfig returns the config.
func (f *FakeSource) GetConfig(url string) (*StoreConfig, error) {
func (f *FakeSource) GetConfig(_ context.Context, url string) (*StoreConfig, error) {
if !slice.Contains(f.whiteList, url) {
return nil, fmt.Errorf("[url:%s] is not in white list", url)
}
Expand Down
7 changes: 4 additions & 3 deletions server/config/store_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package config

import (
"context"
"crypto/tls"
"encoding/json"
"net/http"
Expand Down Expand Up @@ -63,13 +64,13 @@ func TestTiKVConfig(t *testing.T) {
func TestUpdateConfig(t *testing.T) {
re := require.New(t)
manager := NewTestStoreConfigManager([]string{"tidb.com"})
manager.ObserveConfig("tikv.com")
manager.ObserveConfig(context.Background(), "tikv.com")
re.Equal(uint64(144), manager.GetStoreConfig().GetRegionMaxSize())
manager.ObserveConfig("tidb.com")
manager.ObserveConfig(context.Background(), "tidb.com")
re.Equal(uint64(10), manager.GetStoreConfig().GetRegionMaxSize())

// case2: the config should not update if config is same expect some ignore field.
c, err := manager.source.GetConfig("tidb.com")
c, err := manager.source.GetConfig(context.Background(), "tidb.com")
re.NoError(err)
re.True(manager.GetStoreConfig().Equal(c))

Expand Down

0 comments on commit 9a9fc0f

Please sign in to comment.