Skip to content

Commit

Permalink
mcs, tso: support weighted-election for TSO keyspace group primary el…
Browse files Browse the repository at this point in the history
…ection (tikv#6617)

close tikv#6616

Add the tso server registry watch loop in tso's keyspace group manager.
re-distribute TSO keyspace group primaries according to their replica priorities

Signed-off-by: Bin Shi <[email protected]>
  • Loading branch information
binshi-bing committed Jun 26, 2023
1 parent 34386ce commit b4b1fcc
Show file tree
Hide file tree
Showing 9 changed files with 498 additions and 102 deletions.
2 changes: 1 addition & 1 deletion pkg/mcs/discovery/discover.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (

// Discover is used to get all the service instances of the specified service name.
func Discover(cli *clientv3.Client, clusterID, serviceName string) ([]string, error) {
key := discoveryPath(clusterID, serviceName) + "/"
key := ServicePath(clusterID, serviceName) + "/"
endKey := clientv3.GetPrefixRangeEnd(key)

withRange := clientv3.WithRange(endKey)
Expand Down
8 changes: 5 additions & 3 deletions pkg/mcs/discovery/key_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,17 @@ const (
registryKey = "registry"
)

func registryPath(clusterID, serviceName, serviceAddr string) string {
// RegistryPath returns the full path to store microservice addresses.
func RegistryPath(clusterID, serviceName, serviceAddr string) string {
return strings.Join([]string{registryPrefix, clusterID, serviceName, registryKey, serviceAddr}, "/")
}

func discoveryPath(clusterID, serviceName string) string {
// ServicePath returns the path to store microservice addresses.
func ServicePath(clusterID, serviceName string) string {
return strings.Join([]string{registryPrefix, clusterID, serviceName, registryKey}, "/")
}

// TSOPath returns the path to store TSO addresses.
func TSOPath(clusterID uint64) string {
return discoveryPath(strconv.FormatUint(clusterID, 10), "tso") + "/"
return ServicePath(strconv.FormatUint(clusterID, 10), "tso") + "/"
}
2 changes: 1 addition & 1 deletion pkg/mcs/discovery/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type ServiceRegister struct {
// NewServiceRegister creates a new ServiceRegister.
func NewServiceRegister(ctx context.Context, cli *clientv3.Client, clusterID, serviceName, serviceAddr, serializedValue string, ttl int64) *ServiceRegister {
cctx, cancel := context.WithCancel(ctx)
serviceKey := registryPath(clusterID, serviceName, serviceAddr)
serviceKey := RegistryPath(clusterID, serviceName, serviceAddr)
return &ServiceRegister{
ctx: cctx,
cancel: cancel,
Expand Down
3 changes: 2 additions & 1 deletion pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,8 @@ func (s *Server) startServer() (err error) {
tsoSvcRootPath := fmt.Sprintf(tsoSvcRootPathFormat, s.clusterID)
s.serviceID = &discovery.ServiceRegistryEntry{ServiceAddr: s.cfg.AdvertiseListenAddr}
s.keyspaceGroupManager = tso.NewKeyspaceGroupManager(
s.serverLoopCtx, s.serviceID, s.etcdClient, s.httpClient, s.cfg.AdvertiseListenAddr, legacySvcRootPath, tsoSvcRootPath, s.cfg)
s.serverLoopCtx, s.serviceID, s.etcdClient, s.httpClient, s.cfg.AdvertiseListenAddr,
discovery.TSOPath(s.clusterID), legacySvcRootPath, tsoSvcRootPath, s.cfg)
if err := s.keyspaceGroupManager.Initialize(); err != nil {
return err
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/mcs/utils/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,5 +76,8 @@ const (
DefaultKeyspaceGroupReplicaCount = 2

// DefaultKeyspaceGroupReplicaPriority is the default priority of a keyspace group replica.
// It's used in keyspace group primary weighted-election to balance primaries' distribution.
// Among multiple replicas of a keyspace group, the higher the priority, the more likely
// the replica is to be elected as primary.
DefaultKeyspaceGroupReplicaPriority = 0
)
250 changes: 224 additions & 26 deletions pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"encoding/json"
"fmt"
"math"
"net/http"
"path"
"regexp"
Expand All @@ -27,6 +28,7 @@ import (
"time"

perrors "github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/election"
Expand Down Expand Up @@ -55,6 +57,10 @@ const (
// mergingCheckInterval is the interval for merging check to see if the keyspace groups
// merging process could be moved forward.
mergingCheckInterval = 5 * time.Second
// defaultPrimaryPriorityCheckInterval is the default interval for checking if the priorities
// of the primaries on this TSO server/pod have changed. A goroutine will periodically check
// do this check and re-distribute the primaries if necessary.
defaultPrimaryPriorityCheckInterval = 10 * time.Second
)

type state struct {
Expand Down Expand Up @@ -153,6 +159,41 @@ func (s *state) getKeyspaceGroupMetaWithCheck(
mcsutils.DefaultKeyspaceGroupID, nil
}

func (s *state) getNextPrimaryToReset(
groupID int, localAddress string,
) (member ElectionMember, kg *endpoint.KeyspaceGroup, localPriority, nextGroupID int) {
s.RLock()
defer s.RUnlock()

// Both s.ams and s.kgs are arrays with the fixed size defined by the const value MaxKeyspaceGroupCountInUse.
groupSize := int(mcsutils.MaxKeyspaceGroupCountInUse)
groupID %= groupSize
for j := 0; j < groupSize; groupID, j = (groupID+1)%groupSize, j+1 {
am := s.ams[groupID]
kg := s.kgs[groupID]
if am != nil && kg != nil && am.GetMember().IsLeader() {
maxPriority := math.MinInt32
localPriority := math.MaxInt32
for _, member := range kg.Members {
if member.Priority > maxPriority {
maxPriority = member.Priority
}
if member.Address == localAddress {
localPriority = member.Priority
}
}

if localPriority < maxPriority {
// return here and reset the primary outside of the critical section
// as resetting the primary may take some time.
return am.GetMember(), kg, localPriority, (groupID + 1) % groupSize
}
}
}

return nil, nil, 0, groupID
}

// kgPrimaryPathBuilder builds the path for keyspace group primary election.
// default keyspace group: "/ms/{cluster_id}/tso/00000/primary".
// non-default keyspace group: "/ms/{cluster_id}/tso/keyspace_groups/election/{group}/primary".
Expand Down Expand Up @@ -198,6 +239,10 @@ type KeyspaceGroupManager struct {
// which participate in the election of its keyspace group's primary, in the format of
// "electionNamePrefix:keyspace-group-id"
electionNamePrefix string
// tsoServiceKey is the path for storing the registered tso servers.
// Key: /ms/{cluster_id}/tso/registry/{tsoServerAddress}
// Value: discover.ServiceRegistryEntry
tsoServiceKey string
// legacySvcRootPath defines the legacy root path for all etcd paths which derives from
// the PD/API service. It's in the format of "/pd/{cluster_id}".
// The main paths for different usages include:
Expand Down Expand Up @@ -238,17 +283,26 @@ type KeyspaceGroupManager struct {
loadKeyspaceGroupsBatchSize int64
loadFromEtcdMaxRetryTimes int

// compiledKGMembershipIDRegexp is the compiled regular expression for matching keyspace group id in the
// keyspace group membership path.
// compiledKGMembershipIDRegexp is the compiled regular expression for matching keyspace group id
// in the keyspace group membership path.
compiledKGMembershipIDRegexp *regexp.Regexp
// groupUpdateRetryList is the list of keyspace groups which failed to update and need to retry.
groupUpdateRetryList map[uint32]*endpoint.KeyspaceGroup
groupWatcher *etcdutil.LoopWatcher

primaryPathBuilder *kgPrimaryPathBuilder

// mergeCheckerCancelMap is the cancel function map for the merge checker of each keyspace group.
mergeCheckerCancelMap sync.Map // GroupID -> context.CancelFunc

primaryPathBuilder *kgPrimaryPathBuilder
primaryPriorityCheckInterval time.Duration

// tsoNodes is the registered tso servers.
tsoNodes sync.Map // store as map[string]struct{}
// serviceRegistryMap stores the mapping from the service registry key to the service address.
// Note: it is only used in tsoNodesWatcher.
serviceRegistryMap map[string]string
// tsoNodesWatcher is the watcher for the registered tso servers.
tsoNodesWatcher *etcdutil.LoopWatcher
}

// NewKeyspaceGroupManager creates a new Keyspace Group Manager.
Expand All @@ -258,6 +312,7 @@ func NewKeyspaceGroupManager(
etcdClient *clientv3.Client,
httpClient *http.Client,
electionNamePrefix string,
tsoServiceKey string,
legacySvcRootPath string,
tsoSvcRootPath string,
cfg ServiceConfig,
Expand All @@ -270,16 +325,19 @@ func NewKeyspaceGroupManager(

ctx, cancel := context.WithCancel(ctx)
kgm := &KeyspaceGroupManager{
ctx: ctx,
cancel: cancel,
tsoServiceID: tsoServiceID,
etcdClient: etcdClient,
httpClient: httpClient,
electionNamePrefix: electionNamePrefix,
legacySvcRootPath: legacySvcRootPath,
tsoSvcRootPath: tsoSvcRootPath,
cfg: cfg,
groupUpdateRetryList: make(map[uint32]*endpoint.KeyspaceGroup),
ctx: ctx,
cancel: cancel,
tsoServiceID: tsoServiceID,
etcdClient: etcdClient,
httpClient: httpClient,
electionNamePrefix: electionNamePrefix,
tsoServiceKey: tsoServiceKey,
legacySvcRootPath: legacySvcRootPath,
tsoSvcRootPath: tsoSvcRootPath,
primaryPriorityCheckInterval: defaultPrimaryPriorityCheckInterval,
cfg: cfg,
groupUpdateRetryList: make(map[uint32]*endpoint.KeyspaceGroup),
serviceRegistryMap: make(map[string]string),
}
kgm.legacySvcStorage = endpoint.NewStorageEndpoint(
kv.NewEtcdKVBase(kgm.etcdClient, kgm.legacySvcRootPath), nil)
Expand All @@ -296,6 +354,100 @@ func NewKeyspaceGroupManager(

// Initialize this KeyspaceGroupManager
func (kgm *KeyspaceGroupManager) Initialize() error {
if err := kgm.InitializeTSOServerWatchLoop(); err != nil {
log.Error("failed to initialize tso server watch loop", zap.Error(err))
kgm.Close() // Close the manager to clean up the allocated resources.
return errs.ErrLoadKeyspaceGroupsTerminated.Wrap(err)
}
if err := kgm.InitializeGroupWatchLoop(); err != nil {
log.Error("failed to initialize group watch loop", zap.Error(err))
kgm.Close() // Close the manager to clean up the loaded keyspace groups.
return errs.ErrLoadKeyspaceGroupsTerminated.Wrap(err)
}

kgm.wg.Add(1)
go kgm.primaryPriorityCheckLoop()

return nil
}

// Close this KeyspaceGroupManager
func (kgm *KeyspaceGroupManager) Close() {
log.Info("closing keyspace group manager")

// Note: don't change the order. We need to cancel all service loops in the keyspace group manager
// before closing all keyspace groups. It's to prevent concurrent addition/removal of keyspace groups
// during critical periods such as service shutdown and online keyspace group, while the former requires
// snapshot isolation to ensure all keyspace groups are properly closed and no new keyspace group is
// added/initialized after that.
kgm.cancel()
kgm.wg.Wait()
kgm.state.deinitialize()

log.Info("keyspace group manager closed")
}

// GetServiceConfig returns the service config.
func (kgm *KeyspaceGroupManager) GetServiceConfig() ServiceConfig {
return kgm.cfg
}

// InitializeTSOServerWatchLoop initializes the watch loop monitoring the path for storing the
// registered tso servers.
// Key: /ms/{cluster_id}/tso/registry/{tsoServerAddress}
// Value: discover.ServiceRegistryEntry
func (kgm *KeyspaceGroupManager) InitializeTSOServerWatchLoop() error {
tsoServiceEndKey := clientv3.GetPrefixRangeEnd(kgm.tsoServiceKey) + "/"

putFn := func(kv *mvccpb.KeyValue) error {
s := &discovery.ServiceRegistryEntry{}
if err := json.Unmarshal(kv.Value, s); err != nil {
log.Warn("failed to unmarshal service registry entry",
zap.String("event-kv-key", string(kv.Key)), zap.Error(err))
return err
}
kgm.tsoNodes.Store(s.ServiceAddr, struct{}{})
kgm.serviceRegistryMap[string(kv.Key)] = s.ServiceAddr
return nil
}
deleteFn := func(kv *mvccpb.KeyValue) error {
key := string(kv.Key)
if serviceAddr, ok := kgm.serviceRegistryMap[key]; ok {
delete(kgm.serviceRegistryMap, key)
kgm.tsoNodes.Delete(serviceAddr)
return nil
}
return perrors.Errorf("failed to find the service address for key %s", key)
}

kgm.tsoNodesWatcher = etcdutil.NewLoopWatcher(
kgm.ctx,
&kgm.wg,
kgm.etcdClient,
"tso-nodes-watcher",
kgm.tsoServiceKey,
putFn,
deleteFn,
func() error { return nil },
clientv3.WithRange(tsoServiceEndKey),
)

kgm.wg.Add(1)
go kgm.tsoNodesWatcher.StartWatchLoop()

if err := kgm.tsoNodesWatcher.WaitLoad(); err != nil {
log.Error("failed to load the registered tso servers", errs.ZapError(err))
return err
}

return nil
}

// InitializeGroupWatchLoop initializes the watch loop monitoring the path for storing keyspace group
// membership/distribution metadata.
// Key: /pd/{cluster_id}/tso/keyspace_groups/membership/{group}
// Value: endpoint.KeyspaceGroup
func (kgm *KeyspaceGroupManager) InitializeGroupWatchLoop() error {
rootPath := kgm.legacySvcRootPath
startKey := strings.Join([]string{rootPath, endpoint.KeyspaceGroupIDPath(mcsutils.DefaultKeyspaceGroupID)}, "/")
endKey := strings.Join(
Expand Down Expand Up @@ -375,20 +527,66 @@ func (kgm *KeyspaceGroupManager) Initialize() error {
return nil
}

// Close this KeyspaceGroupManager
func (kgm *KeyspaceGroupManager) Close() {
log.Info("closing keyspace group manager")
func (kgm *KeyspaceGroupManager) primaryPriorityCheckLoop() {
defer logutil.LogPanic()
defer kgm.wg.Done()

// Note: don't change the order. We need to cancel all service loops in the keyspace group manager
// before closing all keyspace groups. It's to prevent concurrent addition/removal of keyspace groups
// during critical periods such as service shutdown and online keyspace group, while the former requires
// snapshot isolation to ensure all keyspace groups are properly closed and no new keyspace group is
// added/initialized after that.
kgm.cancel()
kgm.wg.Wait()
kgm.state.deinitialize()
failpoint.Inject("fastPrimaryPriorityCheck", func() {
kgm.primaryPriorityCheckInterval = 200 * time.Millisecond
})

log.Info("keyspace group manager closed")
ctx, cancel := context.WithCancel(kgm.ctx)
defer cancel()
groupID := 0
for {
select {
case <-ctx.Done():
log.Info("exit primary priority check loop")
return
case <-time.After(kgm.primaryPriorityCheckInterval):
// Every primaryPriorityCheckInterval, we only reset the primary of one keyspace group
member, kg, localPriority, nextGroupID := kgm.getNextPrimaryToReset(groupID, kgm.tsoServiceID.ServiceAddr)
if member != nil {
aliveTSONodes := make(map[string]struct{})
kgm.tsoNodes.Range(func(key, _ interface{}) bool {
aliveTSONodes[key.(string)] = struct{}{}
return true
})
if len(aliveTSONodes) == 0 {
log.Warn("no alive tso node", zap.String("local-address", kgm.tsoServiceID.ServiceAddr))
continue
}
// If there is a alive member with higher priority, reset the leader.
resetLeader := false
for _, member := range kg.Members {
if member.Priority <= localPriority {
continue
}
if _, ok := aliveTSONodes[member.Address]; ok {
resetLeader = true
break
}
}
if resetLeader {
select {
case <-ctx.Done():
default:
member.ResetLeader()
log.Info("reset primary",
zap.String("local-address", kgm.tsoServiceID.ServiceAddr),
zap.Uint32("keyspace-group-id", kg.ID),
zap.Int("local-priority", localPriority))
}
} else {
log.Warn("no need to reset primary as the replicas with higher priority are offline",
zap.String("local-address", kgm.tsoServiceID.ServiceAddr),
zap.Uint32("keyspace-group-id", kg.ID),
zap.Int("local-priority", localPriority))
}
}
groupID = nextGroupID
}
}
}

func (kgm *KeyspaceGroupManager) isAssignedToMe(group *endpoint.KeyspaceGroup) bool {
Expand Down
Loading

0 comments on commit b4b1fcc

Please sign in to comment.