Skip to content

Commit

Permalink
tso: add some metrics for Keyspace Group operations (tikv#7022)
Browse files Browse the repository at this point in the history
ref tikv#7011

Add some metrics for Keyspace Group operations.

Signed-off-by: JmPotato <[email protected]>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
JmPotato and ti-chi-bot[bot] authored Sep 1, 2023
1 parent 62f05da commit 87f2da8
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 7 deletions.
41 changes: 34 additions & 7 deletions pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,16 @@ type state struct {
// keyspaceLookupTable is a map from keyspace to the keyspace group to which it belongs.
keyspaceLookupTable map[uint32]uint32
// splittingGroups is the cache of splitting keyspace group related information.
splittingGroups map[uint32]struct{}
// The key is the keyspace group ID, and the value is the time when the keyspace group
// is created as the split target.
splittingGroups map[uint32]time.Time
// deletedGroups is the cache of deleted keyspace group related information.
deletedGroups map[uint32]struct{}
}

func (s *state) initialize() {
s.keyspaceLookupTable = make(map[uint32]uint32)
s.splittingGroups = make(map[uint32]struct{})
s.splittingGroups = make(map[uint32]time.Time)
s.deletedGroups = make(map[uint32]struct{})
}

Expand Down Expand Up @@ -341,6 +343,9 @@ type KeyspaceGroupManager struct {
serviceRegistryMap map[string]string
// tsoNodesWatcher is the watcher for the registered tso servers.
tsoNodesWatcher *etcdutil.LoopWatcher

// pre-initialized metrics
metrics *keyspaceGroupMetrics
}

// NewKeyspaceGroupManager creates a new Keyspace Group Manager.
Expand Down Expand Up @@ -376,6 +381,7 @@ func NewKeyspaceGroupManager(
cfg: cfg,
groupUpdateRetryList: make(map[uint32]*endpoint.KeyspaceGroup),
serviceRegistryMap: make(map[string]string),
metrics: newKeyspaceGroupMetrics(),
}
kgm.legacySvcStorage = endpoint.NewStorageEndpoint(
kv.NewEtcdKVBase(kgm.etcdClient, kgm.legacySvcRootPath), nil)
Expand Down Expand Up @@ -659,12 +665,15 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGro
kgm.mergeCheckerCancelMap.Store(group.ID, cancel)
kgm.wg.Add(1)
go kgm.mergingChecker(ctx, group.ID, group.MergeState.MergeList)
kgm.metrics.mergeTargetGauge.Inc()
kgm.metrics.mergeSourceGauge.Add(float64(len(group.MergeState.MergeList)))
}
// If the merge state has been finished, cancel its merging checker.
if oldGroup.IsMergeTarget() && !group.IsMergeTarget() {
if cancel, loaded := kgm.mergeCheckerCancelMap.LoadAndDelete(group.ID); loaded && cancel != nil {
cancel.(context.CancelFunc)()
}
kgm.metrics.mergeTargetGauge.Dec()
}

// If this host is already assigned a replica of this keyspace group, i.e., the election member
Expand Down Expand Up @@ -732,7 +741,8 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGro
kgm.ams[group.ID] = am
// If the group is the split target, add it to the splitting group map.
if group.IsSplitTarget() {
kgm.splittingGroups[group.ID] = struct{}{}
kgm.splittingGroups[group.ID] = time.Now()
kgm.metrics.splitTargetGauge.Inc()
}
kgm.Unlock()
}
Expand Down Expand Up @@ -858,10 +868,24 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroupMembership(
}
}
}
// Check if the split is completed.
if oldGroup != nil && oldGroup.IsSplitTarget() && !newGroup.IsSplitting() {
kgm.ams[groupID].GetMember().(*member.Participant).SetCampaignChecker(nil)
delete(kgm.splittingGroups, groupID)
// Check the split state.
if oldGroup != nil {
// SplitTarget -> !Splitting
if oldGroup.IsSplitTarget() && !newGroup.IsSplitting() {
kgm.ams[groupID].GetMember().(*member.Participant).SetCampaignChecker(nil)
splitTime := kgm.splittingGroups[groupID]
delete(kgm.splittingGroups, groupID)
kgm.metrics.splitTargetGauge.Dec()
kgm.metrics.splitDuration.Observe(time.Since(splitTime).Seconds())
}
// SplitSource -> !SplitSource
if oldGroup.IsSplitSource() && !newGroup.IsSplitting() {
kgm.metrics.splitSourceGauge.Dec()
}
// !Splitting -> SplitSource
if !oldGroup.IsSplitting() && newGroup.IsSplitSource() {
kgm.metrics.splitSourceGauge.Inc()
}
}
kgm.kgs[groupID] = newGroup
}
Expand Down Expand Up @@ -1196,6 +1220,7 @@ func (kgm *KeyspaceGroupManager) finishMergeKeyspaceGroup(id uint32) error {
// mergingChecker is used to check if the keyspace group is in merge state, and if so, it will
// make sure the newly merged TSO keep consistent with the original ones.
func (kgm *KeyspaceGroupManager) mergingChecker(ctx context.Context, mergeTargetID uint32, mergeList []uint32) {
startTime := time.Now()
log.Info("start to merge the keyspace group",
zap.String("member", kgm.tsoServiceID.ServiceAddr),
zap.Uint32("merge-target-id", mergeTargetID),
Expand Down Expand Up @@ -1263,6 +1288,7 @@ func (kgm *KeyspaceGroupManager) mergingChecker(ctx context.Context, mergeTarget
if len(mergeMap) > 0 {
continue
}
kgm.metrics.mergeSourceGauge.Add(-float64(len(mergeList)))
log.Info("all the keyspace group primaries in the merge list are gone, "+
"start to calculate the newly merged TSO",
zap.String("member", kgm.tsoServiceID.ServiceAddr),
Expand Down Expand Up @@ -1330,6 +1356,7 @@ func (kgm *KeyspaceGroupManager) mergingChecker(ctx context.Context, mergeTarget
zap.Error(err))
continue
}
kgm.metrics.mergeDuration.Observe(time.Since(startTime).Seconds())
log.Info("finished merging keyspace group",
zap.String("member", kgm.tsoServiceID.ServiceAddr),
zap.Uint32("merge-target-id", mergeTargetID),
Expand Down
41 changes: 41 additions & 0 deletions pkg/tso/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ const (
)

var (
// TSO metrics
tsoCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "pd",
Expand Down Expand Up @@ -54,13 +55,33 @@ var (
Name: "role",
Help: "Indicate the PD server role info, whether it's a TSO allocator.",
}, []string{groupLabel, dcLabel})

// Keyspace Group metrics
keyspaceGroupStateGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "pd",
Subsystem: "keyspace_group",
Name: "state",
Help: "Gauge of the Keyspace Group states.",
}, []string{typeLabel})

keyspaceGroupOpDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "pd",
Subsystem: "keyspace_group",
Name: "operation_duration_seconds",
Help: "Bucketed histogram of processing time(s) of the Keyspace Group operations.",
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 13),
}, []string{typeLabel})
)

func init() {
prometheus.MustRegister(tsoCounter)
prometheus.MustRegister(tsoGauge)
prometheus.MustRegister(tsoGap)
prometheus.MustRegister(tsoAllocatorRole)
prometheus.MustRegister(keyspaceGroupStateGauge)
prometheus.MustRegister(keyspaceGroupOpDuration)
}

type tsoMetrics struct {
Expand Down Expand Up @@ -125,3 +146,23 @@ func newTSOMetrics(groupID, dcLocation string) *tsoMetrics {
globalTSOSyncRTTGauge: tsoGauge.WithLabelValues("global_tso_sync_rtt", groupID, dcLocation),
}
}

type keyspaceGroupMetrics struct {
splitSourceGauge prometheus.Gauge
splitTargetGauge prometheus.Gauge
mergeSourceGauge prometheus.Gauge
mergeTargetGauge prometheus.Gauge
splitDuration prometheus.Observer
mergeDuration prometheus.Observer
}

func newKeyspaceGroupMetrics() *keyspaceGroupMetrics {
return &keyspaceGroupMetrics{
splitSourceGauge: keyspaceGroupStateGauge.WithLabelValues("split-source"),
splitTargetGauge: keyspaceGroupStateGauge.WithLabelValues("split-target"),
mergeSourceGauge: keyspaceGroupStateGauge.WithLabelValues("merge-source"),
mergeTargetGauge: keyspaceGroupStateGauge.WithLabelValues("merge-target"),
splitDuration: keyspaceGroupOpDuration.WithLabelValues("split"),
mergeDuration: keyspaceGroupOpDuration.WithLabelValues("merge"),
}
}

0 comments on commit 87f2da8

Please sign in to comment.