Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mcs: forward region requests to scheduling server #7023

Merged
merged 8 commits into from
Sep 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ require (
github.com/pingcap/errcode v0.3.0
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00
github.com/pingcap/kvproto v0.0.0-20230911090708-d603cce32b96
github.com/pingcap/kvproto v0.0.0-20230920042517-db656f45023b
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21
github.com/pingcap/tidb-dashboard v0.0.0-20230911054332-22add1e00511
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -439,8 +439,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20230911090708-d603cce32b96 h1:Upb52Po0Ev1lPKQdUT4suRwQ5Z49A7gEmJ0trADKftM=
github.com/pingcap/kvproto v0.0.0-20230911090708-d603cce32b96/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc=
github.com/pingcap/kvproto v0.0.0-20230920042517-db656f45023b h1:XwwIxepR+uuSYWhdQtstEdr67XUE7X6lpSIHVh5iWjs=
github.com/pingcap/kvproto v0.0.0-20230920042517-db656f45023b/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc=
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
Expand Down
65 changes: 65 additions & 0 deletions pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cluster

import (
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/schedule"
"github.com/tikv/pd/pkg/schedule/placement"
"github.com/tikv/pd/pkg/statistics"
)

// Cluster provides an overview of a cluster's basic information.
type Cluster interface {
GetHotStat() *statistics.HotStat
GetRegionStats() *statistics.RegionStatistics
GetLabelStats() *statistics.LabelStatistics
GetCoordinator() *schedule.Coordinator
GetRuleManager() *placement.RuleManager
}

// HandleStatsAsync handles the flow asynchronously.
func HandleStatsAsync(c Cluster, region *core.RegionInfo) {
c.GetHotStat().CheckWriteAsync(statistics.NewCheckExpiredItemTask(region))
c.GetHotStat().CheckReadAsync(statistics.NewCheckExpiredItemTask(region))
reportInterval := region.GetInterval()
interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp()
for _, peer := range region.GetPeers() {
peerInfo := core.NewPeerInfo(peer, region.GetWriteLoads(), interval)
c.GetHotStat().CheckWriteAsync(statistics.NewCheckPeerTask(peerInfo, region))
}
c.GetCoordinator().GetSchedulersController().CheckTransferWitnessLeader(region)
}

// HandleOverlaps handles the overlap regions.
func HandleOverlaps(c Cluster, overlaps []*core.RegionInfo) {
for _, item := range overlaps {
if c.GetRegionStats() != nil {
c.GetRegionStats().ClearDefunctRegion(item.GetID())
}
c.GetLabelStats().ClearDefunctRegion(item.GetID())
c.GetRuleManager().InvalidCache(item.GetID())
}
}

// Collect collects the cluster information.
func Collect(c Cluster, region *core.RegionInfo, stores []*core.StoreInfo, hasRegionStats, isNew, isPrepared bool) {
if hasRegionStats {
c.GetRegionStats().Observe(region, stores)
}
if !isPrepared && isNew {
c.GetCoordinator().GetPrepareChecker().Collect(region)
}
}
62 changes: 44 additions & 18 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,31 @@ const (
InitClusterRegionThreshold = 100
)

// RegionHeartbeatResponse is the interface for region heartbeat response.
type RegionHeartbeatResponse interface {
GetTargetPeer() *metapb.Peer
GetRegionId() uint64
}

// RegionHeartbeatRequest is the interface for region heartbeat request.
type RegionHeartbeatRequest interface {
GetTerm() uint64
GetRegion() *metapb.Region
GetLeader() *metapb.Peer
GetDownPeers() []*pdpb.PeerStats
GetPendingPeers() []*metapb.Peer
GetBytesWritten() uint64
GetKeysWritten() uint64
GetBytesRead() uint64
GetKeysRead() uint64
GetInterval() *pdpb.TimeInterval
GetQueryStats() *pdpb.QueryStats
GetApproximateSize() uint64
GetApproximateKeys() uint64
}

// RegionFromHeartbeat constructs a Region from region heartbeat.
func RegionFromHeartbeat(heartbeat *pdpb.RegionHeartbeatRequest, opts ...RegionCreateOption) *RegionInfo {
func RegionFromHeartbeat(heartbeat RegionHeartbeatRequest, opts ...RegionCreateOption) *RegionInfo {
// Convert unit to MB.
// If region isn't empty and less than 1MB, use 1MB instead.
regionSize := heartbeat.GetApproximateSize() / units.MiB
Expand All @@ -153,25 +176,28 @@ func RegionFromHeartbeat(heartbeat *pdpb.RegionHeartbeatRequest, opts ...RegionC
if heartbeat.GetApproximateSize() > 0 && regionSize < EmptyRegionApproximateSize {
regionSize = EmptyRegionApproximateSize
}
regionKvSize := heartbeat.GetApproximateKvSize() / units.MiB

region := &RegionInfo{
term: heartbeat.GetTerm(),
meta: heartbeat.GetRegion(),
leader: heartbeat.GetLeader(),
downPeers: heartbeat.GetDownPeers(),
pendingPeers: heartbeat.GetPendingPeers(),
cpuUsage: heartbeat.GetCpuUsage(),
writtenBytes: heartbeat.GetBytesWritten(),
writtenKeys: heartbeat.GetKeysWritten(),
readBytes: heartbeat.GetBytesRead(),
readKeys: heartbeat.GetKeysRead(),
approximateSize: int64(regionSize),
approximateKvSize: int64(regionKvSize),
approximateKeys: int64(heartbeat.GetApproximateKeys()),
interval: heartbeat.GetInterval(),
replicationStatus: heartbeat.GetReplicationStatus(),
queryStats: heartbeat.GetQueryStats(),
term: heartbeat.GetTerm(),
meta: heartbeat.GetRegion(),
leader: heartbeat.GetLeader(),
downPeers: heartbeat.GetDownPeers(),
pendingPeers: heartbeat.GetPendingPeers(),
writtenBytes: heartbeat.GetBytesWritten(),
writtenKeys: heartbeat.GetKeysWritten(),
readBytes: heartbeat.GetBytesRead(),
readKeys: heartbeat.GetKeysRead(),
approximateSize: int64(regionSize),
approximateKeys: int64(heartbeat.GetApproximateKeys()),
interval: heartbeat.GetInterval(),
queryStats: heartbeat.GetQueryStats(),
}

// scheduling service doesn't need the following fields.
if h, ok := heartbeat.(*pdpb.RegionHeartbeatRequest); ok {
region.approximateKvSize = int64(h.GetApproximateKvSize() / units.MiB)
region.replicationStatus = h.GetReplicationStatus()
region.cpuUsage = h.GetCpuUsage()
}

for _, opt := range opts {
Expand Down
81 changes: 78 additions & 3 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/kvproto/pkg/schedulingpb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/cluster"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/mcs/scheduling/server/config"
"github.com/tikv/pd/pkg/schedule"
sc "github.com/tikv/pd/pkg/schedule/config"
"github.com/tikv/pd/pkg/schedule/hbstream"
"github.com/tikv/pd/pkg/schedule/labeler"
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/schedule/placement"
"github.com/tikv/pd/pkg/schedule/schedulers"
"github.com/tikv/pd/pkg/slice"
Expand All @@ -38,7 +40,7 @@
ruleManager *placement.RuleManager
labelerManager *labeler.RegionLabeler
regionStats *statistics.RegionStatistics
labelLevelStats *statistics.LabelStatistics
labelStats *statistics.LabelStatistics
hotStat *statistics.HotStat
storage storage.Storage
coordinator *schedule.Coordinator
Expand Down Expand Up @@ -66,8 +68,8 @@
labelerManager: labelerManager,
persistConfig: persistConfig,
hotStat: statistics.NewHotStat(ctx),
labelStats: statistics.NewLabelStatistics(),
regionStats: statistics.NewRegionStatistics(basicCluster, persistConfig, ruleManager),
labelLevelStats: statistics.NewLabelStatistics(),
storage: storage,
clusterID: clusterID,
checkMembershipCh: checkMembershipCh,
Expand All @@ -86,6 +88,21 @@
return c.coordinator
}

// GetHotStat gets hot stat.
func (c *Cluster) GetHotStat() *statistics.HotStat {
return c.hotStat
}

// GetRegionStats gets region statistics.
func (c *Cluster) GetRegionStats() *statistics.RegionStatistics {
return c.regionStats
}

// GetLabelStats gets label statistics.
func (c *Cluster) GetLabelStats() *statistics.LabelStatistics {
return c.labelStats

Check warning on line 103 in pkg/mcs/scheduling/server/cluster.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/cluster.go#L103

Added line #L103 was not covered by tests
}

// GetBasicCluster returns the basic cluster.
func (c *Cluster) GetBasicCluster() *core.BasicCluster {
return c.BasicCluster
Expand Down Expand Up @@ -287,7 +304,7 @@
// UpdateRegionsLabelLevelStats updates the status of the region label level by types.
func (c *Cluster) UpdateRegionsLabelLevelStats(regions []*core.RegionInfo) {
for _, region := range regions {
c.labelLevelStats.Observe(region, c.getStoresWithoutLabelLocked(region, core.EngineKey, core.EngineTiFlash), c.persistConfig.GetLocationLabels())
c.labelStats.Observe(region, c.getStoresWithoutLabelLocked(region, core.EngineKey, core.EngineTiFlash), c.persistConfig.GetLocationLabels())
}
}

Expand Down Expand Up @@ -389,3 +406,61 @@
c.cancel()
c.wg.Wait()
}

// HandleRegionHeartbeat processes RegionInfo reports from client.
func (c *Cluster) HandleRegionHeartbeat(region *core.RegionInfo) error {
if err := c.processRegionHeartbeat(region); err != nil {
return err

Check warning on line 413 in pkg/mcs/scheduling/server/cluster.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/cluster.go#L413

Added line #L413 was not covered by tests
}

c.coordinator.GetOperatorController().Dispatch(region, operator.DispatchFromHeartBeat, c.coordinator.RecordOpStepWithTTL)
return nil
}

// processRegionHeartbeat updates the region information.
func (c *Cluster) processRegionHeartbeat(region *core.RegionInfo) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can reduce some of the duplicate functions, and of course, we can do it in the next pr.

origin, _, err := c.PreCheckPutRegion(region)
if err != nil {
return err

Check warning on line 424 in pkg/mcs/scheduling/server/cluster.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/cluster.go#L424

Added line #L424 was not covered by tests
}
if c.GetStoreConfig().IsEnableRegionBucket() {
region.InheritBuckets(origin)

Check warning on line 427 in pkg/mcs/scheduling/server/cluster.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/cluster.go#L427

Added line #L427 was not covered by tests
}

cluster.HandleStatsAsync(c, region)

hasRegionStats := c.regionStats != nil
// Save to storage if meta is updated, except for flashback.
// Save to cache if meta or leader is updated, or contains any down/pending peer.
// Mark isNew if the region in cache does not have leader.
isNew, _, saveCache, _ := core.GenerateRegionGuideFunc(true)(region, origin)
if !saveCache && !isNew {
// Due to some config changes need to update the region stats as well,
// so we do some extra checks here.
if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) {
c.regionStats.Observe(region, c.GetRegionStores(region))

Check warning on line 441 in pkg/mcs/scheduling/server/cluster.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/cluster.go#L440-L441

Added lines #L440 - L441 were not covered by tests
}
return nil

Check warning on line 443 in pkg/mcs/scheduling/server/cluster.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/cluster.go#L443

Added line #L443 was not covered by tests
}

var overlaps []*core.RegionInfo
if saveCache {
// To prevent a concurrent heartbeat of another region from overriding the up-to-date region info by a stale one,
// check its validation again here.
//
// However it can't solve the race condition of concurrent heartbeats from the same region.
if overlaps, err = c.AtomicCheckAndPutRegion(region); err != nil {
return err

Check warning on line 453 in pkg/mcs/scheduling/server/cluster.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/cluster.go#L453

Added line #L453 was not covered by tests
}

cluster.HandleOverlaps(c, overlaps)
}

cluster.Collect(c, region, c.GetRegionStores(region), hasRegionStats, isNew, c.IsPrepared())
return nil
}

// IsPrepared return true if the prepare checker is ready.
func (c *Cluster) IsPrepared() bool {
return c.coordinator.GetPrepareChecker().IsPrepared()
}
1 change: 0 additions & 1 deletion pkg/mcs/scheduling/server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ func (c *Config) Parse(flagSet *pflag.FlagSet) error {
configutil.AdjustCommandLineString(flagSet, &c.BackendEndpoints, "backend-endpoints")
configutil.AdjustCommandLineString(flagSet, &c.ListenAddr, "listen-addr")
configutil.AdjustCommandLineString(flagSet, &c.AdvertiseListenAddr, "advertise-listen-addr")

return c.adjust(meta)
}

Expand Down
Loading
Loading