Skip to content

Commit

Permalink
Merge branch 'master' into fix_etcd_client_panic
Browse files Browse the repository at this point in the history
  • Loading branch information
lhy1024 authored Jul 31, 2023
2 parents 747308f + ef81248 commit 449d2b2
Show file tree
Hide file tree
Showing 13 changed files with 575 additions and 158 deletions.
3 changes: 1 addition & 2 deletions pkg/keyspace/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,7 @@ func NewKeyspaceGroupManager(
// The PD(TSO) Client relies on this info to discover tso servers.
if m.client != nil {
m.initTSONodesWatcher(m.client, m.clusterID)
m.wg.Add(1)
go m.tsoNodesWatcher.StartWatchLoop()
m.tsoNodesWatcher.StartWatchLoop()
}
return m
}
Expand Down
121 changes: 121 additions & 0 deletions pkg/mcs/scheduling/server/meta/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package meta

import (
"context"
"encoding/json"
"sync"

"github.com/pingcap/log"
sc "github.com/tikv/pd/pkg/schedule/config"
"github.com/tikv/pd/pkg/utils/etcdutil"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/mvcc/mvccpb"
"go.uber.org/zap"
)

// persistedConfig is the configuration struct that is persisted in etcd.
// We only use it internally to do the unmarshal work.
type persistedConfig struct {
sync.RWMutex
Schedule sc.ScheduleConfig `toml:"schedule" json:"schedule"`
Replication sc.ReplicationConfig `toml:"replication" json:"replication"`
}

// ConfigWatcher is used to watch the PD API server for any configuration changes.
type ConfigWatcher struct {
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc

etcdClient *clientv3.Client
watcher *etcdutil.LoopWatcher

config *persistedConfig
// TODO: watch the scheduler config change.
}

// NewConfigWatcher creates a new watcher to watch the config meta change from PD API server.
func NewConfigWatcher(
ctx context.Context,
etcdClient *clientv3.Client,
// configPath is the path of the configuration in etcd:
// - Key: /pd/{cluster_id}/config
// - Value: configuration JSON.
configPath string,
) (*ConfigWatcher, error) {
ctx, cancel := context.WithCancel(ctx)
cw := &ConfigWatcher{
ctx: ctx,
cancel: cancel,
etcdClient: etcdClient,
config: &persistedConfig{},
}
putFn := func(kv *mvccpb.KeyValue) error {
cfg := &persistedConfig{}
if err := json.Unmarshal(kv.Value, cfg); err != nil {
log.Warn("failed to unmarshal scheduling config entry",
zap.String("event-kv-key", string(kv.Key)), zap.Error(err))
return err
}
cw.config.Lock()
cw.config.Schedule = cfg.Schedule
cw.config.Replication = cfg.Replication
cw.config.Unlock()
return nil
}
deleteFn := func(kv *mvccpb.KeyValue) error {
return nil
}
postEventFn := func() error {
return nil
}
cw.watcher = etcdutil.NewLoopWatcher(
ctx,
&cw.wg,
etcdClient,
"scheduling-config-watcher",
configPath,
putFn,
deleteFn,
postEventFn,
)
cw.watcher.StartWatchLoop()
if err := cw.watcher.WaitLoad(); err != nil {
return nil, err
}
return cw, nil
}

// Close closes the watcher.
func (cw *ConfigWatcher) Close() {
cw.cancel()
cw.wg.Wait()
}

// GetScheduleConfig returns the schedule configuration.
func (cw *ConfigWatcher) GetScheduleConfig() sc.ScheduleConfig {
cw.config.RLock()
defer cw.config.RUnlock()
return cw.config.Schedule
}

// GetReplicationConfig returns the replication configuration.
func (cw *ConfigWatcher) GetReplicationConfig() sc.ReplicationConfig {
cw.config.RLock()
defer cw.config.RUnlock()
return cw.config.Replication
}
11 changes: 6 additions & 5 deletions pkg/schedule/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ import (
)

const (
defaultMaxReplicas = 3
// DefaultMaxReplicas is the default number of replicas for each region.
DefaultMaxReplicas = 3
defaultMaxSnapshotCount = 64
defaultMaxPendingPeerCount = 64
defaultMaxMergeRegionSize = 20
Expand Down Expand Up @@ -60,8 +61,8 @@ const (
defaultRegionScoreFormulaVersion = "v2"
defaultLeaderSchedulePolicy = "count"
defaultStoreLimitVersion = "v1"

defaultSplitMergeInterval = time.Hour
// DefaultSplitMergeInterval is the default value of config split merge interval.
DefaultSplitMergeInterval = time.Hour
defaultSwitchWitnessInterval = time.Hour
defaultPatrolRegionInterval = 10 * time.Millisecond
defaultMaxStoreDownTime = 30 * time.Minute
Expand Down Expand Up @@ -298,7 +299,7 @@ func (c *ScheduleConfig) Adjust(meta *configutil.ConfigMetaData, reloading bool)
if !meta.IsDefined("max-merge-region-size") {
configutil.AdjustUint64(&c.MaxMergeRegionSize, defaultMaxMergeRegionSize)
}
configutil.AdjustDuration(&c.SplitMergeInterval, defaultSplitMergeInterval)
configutil.AdjustDuration(&c.SplitMergeInterval, DefaultSplitMergeInterval)
configutil.AdjustDuration(&c.SwitchWitnessInterval, defaultSwitchWitnessInterval)
configutil.AdjustDuration(&c.PatrolRegionInterval, defaultPatrolRegionInterval)
configutil.AdjustDuration(&c.MaxStoreDownTime, defaultMaxStoreDownTime)
Expand Down Expand Up @@ -599,7 +600,7 @@ func (c *ReplicationConfig) Validate() error {

// Adjust adjusts the config.
func (c *ReplicationConfig) Adjust(meta *configutil.ConfigMetaData) error {
configutil.AdjustUint64(&c.MaxReplicas, defaultMaxReplicas)
configutil.AdjustUint64(&c.MaxReplicas, DefaultMaxReplicas)
if !meta.IsDefined("enable-placement-rules") {
c.EnablePlacementRules = defaultEnablePlacementRules
}
Expand Down
Loading

0 comments on commit 449d2b2

Please sign in to comment.