Skip to content

Commit

Permalink
This is an automated cherry-pick of tikv#6919
Browse files Browse the repository at this point in the history
close tikv#6918

Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
bufferflies authored and ti-chi-bot committed Aug 10, 2023
1 parent 3dc0cc3 commit 62a8529
Show file tree
Hide file tree
Showing 2 changed files with 134 additions and 0 deletions.
86 changes: 86 additions & 0 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package cluster

import (
"bytes"
"context"
"fmt"
"math"
Expand Down Expand Up @@ -310,6 +311,12 @@ func (c *RaftCluster) runSyncConfig() {

func syncConfig(manager *config.StoreConfigManager, stores []*core.StoreInfo) bool {
for index := 0; index < len(stores); index++ {
select {
case <-c.ctx.Done():
log.Info("stop sync store config job due to server shutdown")
return
default:
}
// filter out the stores that are tiflash
store := stores[index]
if core.IsStoreContainLabel(store.GetMeta(), core.EngineKey, core.EngineTiFlash) {
Expand All @@ -322,7 +329,15 @@ func syncConfig(manager *config.StoreConfigManager, stores []*core.StoreInfo) bo
}
// it will try next store if the current store is failed.
address := netutil.ResolveLoopBackAddr(stores[index].GetStatusAddress(), stores[index].GetAddress())
<<<<<<< HEAD
if err := manager.ObserveConfig(address); err != nil {
=======
switchRaftV2, err := c.observeStoreConfig(c.ctx, address)
if err != nil {
// delete the store if it is failed and retry next store.
stores = append(stores[:index], stores[index+1:]...)
index--
>>>>>>> 38d087fec (config: sync store config in time (#6919))
storeSyncConfigEvent.WithLabelValues(address, "fail").Inc()
log.Debug("sync store config failed, it will try next store", zap.Error(err))
continue
Expand All @@ -331,7 +346,78 @@ func syncConfig(manager *config.StoreConfigManager, stores []*core.StoreInfo) bo
// it will only try one store.
return true
}
<<<<<<< HEAD
return false
=======
return false, false
}

// observeStoreConfig is used to observe the store config changes and
// return whether if the new config changes the engine to raft-kv2.
func (c *RaftCluster) observeStoreConfig(ctx context.Context, address string) (bool, error) {
cfg, err := c.fetchStoreConfigFromTiKV(ctx, address)
if err != nil {
return false, err
}
oldCfg := c.opt.GetStoreConfig()
if cfg == nil || oldCfg.Equal(cfg) {
return false, nil
}
log.Info("sync the store config successful",
zap.String("store-address", address),
zap.String("store-config", cfg.String()),
zap.String("old-config", oldCfg.String()))
return c.updateStoreConfig(oldCfg, cfg)
}

// updateStoreConfig updates the store config. This is extracted for testing.
func (c *RaftCluster) updateStoreConfig(oldCfg, cfg *config.StoreConfig) (bool, error) {
cfg.Adjust()
c.opt.SetStoreConfig(cfg)
return oldCfg.Storage.Engine != config.RaftstoreV2 && cfg.Storage.Engine == config.RaftstoreV2, nil
}

// fetchStoreConfigFromTiKV tries to fetch the config from the TiKV store URL.
func (c *RaftCluster) fetchStoreConfigFromTiKV(ctx context.Context, statusAddress string) (*config.StoreConfig, error) {
cfg := &config.StoreConfig{}
failpoint.Inject("mockFetchStoreConfigFromTiKV", func(val failpoint.Value) {
if regionMaxSize, ok := val.(string); ok {
cfg.RegionMaxSize = regionMaxSize
cfg.Storage.Engine = config.RaftstoreV2
}
failpoint.Return(cfg, nil)
})
if c.httpClient == nil {
return nil, fmt.Errorf("failed to get store config due to nil client")
}
var url string
if netutil.IsEnableHTTPS(c.httpClient) {
url = fmt.Sprintf("%s://%s/config", "https", statusAddress)
} else {
url = fmt.Sprintf("%s://%s/config", "http", statusAddress)
}
ctx, cancel := context.WithTimeout(ctx, clientTimeout)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, bytes.NewBuffer(nil))
if err != nil {
cancel()
return nil, fmt.Errorf("failed to create store config http request: %w", err)
}
resp, err := c.httpClient.Do(req)
if err != nil {
cancel()
return nil, err
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
cancel()
if err != nil {
return nil, err
}
if err := json.Unmarshal(body, cfg); err != nil {
return nil, err
}
return cfg, nil
>>>>>>> 38d087fec (config: sync store config in time (#6919))
}

// LoadClusterInfo loads cluster related info.
Expand Down
48 changes: 48 additions & 0 deletions server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,12 @@ import (
"fmt"
"math"
"math/rand"
<<<<<<< HEAD

Check failure on line 22 in server/cluster/cluster_test.go

View workflow job for this annotation

GitHub Actions / statics

expected 'STRING', found '<<'

Check failure on line 22 in server/cluster/cluster_test.go

View workflow job for this annotation

GitHub Actions / statics

expected 'STRING', found '<<'
"strings"
=======
"net/http"
"net/http/httptest"
>>>>>>> 38d087fec (config: sync store config in time (#6919))
"sync"
"testing"
"time"
Expand Down Expand Up @@ -1205,7 +1210,50 @@ func (s *testClusterInfoSuite) TestSyncConfig(c *C) {
}
}

<<<<<<< HEAD
func (s *testClusterInfoSuite) TestUpdateStorePendingPeerCount(c *C) {
=======
func TestSyncConfigContext(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

_, opt, err := newTestScheduleConfig()
re.NoError(err)
tc := newTestCluster(ctx, opt)
tc.httpClient = &http.Client{}

server := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) {
time.Sleep(time.Second * 100)
cfg := &config.StoreConfig{}
b, err := json.Marshal(cfg)
if err != nil {
res.WriteHeader(http.StatusInternalServerError)
res.Write([]byte(fmt.Sprintf("failed setting up test server: %s", err)))
return
}

res.WriteHeader(http.StatusOK)
res.Write(b)
}))
stores := newTestStores(1, "2.0.0")
for _, s := range stores {
re.NoError(tc.putStoreLocked(s))
}
// trip schema header
now := time.Now()
stores[0].GetMeta().StatusAddress = server.URL[7:]
synced, _ := tc.syncStoreConfig(tc.GetStores())
re.False(synced)
re.Less(time.Since(now), clientTimeout*2)
}

func TestStoreConfigSync(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

>>>>>>> 38d087fec (config: sync store config in time (#6919))
_, opt, err := newTestScheduleConfig()
c.Assert(err, IsNil)
tc := newTestCluster(s.ctx, opt)
Expand Down

0 comments on commit 62a8529

Please sign in to comment.