Skip to content

Commit

Permalink
Merge branch 'master' into wg-lock
Browse files Browse the repository at this point in the history
  • Loading branch information
lhy1024 authored Sep 26, 2023
2 parents 8ba64b8 + eac55a7 commit 55adf55
Show file tree
Hide file tree
Showing 93 changed files with 1,495 additions and 582 deletions.
6 changes: 5 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ ifeq ("$(WITH_RACE)", "1")
BUILD_CGO_ENABLED := 1
endif

ifeq ($(PLUGIN), 1)
BUILD_TAGS += with_plugin
endif

LDFLAGS += -X "$(PD_PKG)/pkg/versioninfo.PDReleaseVersion=$(shell git describe --tags --dirty --always)"
LDFLAGS += -X "$(PD_PKG)/pkg/versioninfo.PDBuildTS=$(shell date -u '+%Y-%m-%d %I:%M:%S')"
LDFLAGS += -X "$(PD_PKG)/pkg/versioninfo.PDGitHash=$(shell git rev-parse HEAD)"
Expand Down Expand Up @@ -286,4 +290,4 @@ clean-build:
rm -rf $(BUILD_BIN_PATH)
rm -rf $(GO_TOOLS_BIN_PATH)

.PHONY: clean clean-test clean-build
.PHONY: clean clean-test clean-build
25 changes: 18 additions & 7 deletions client/resource_group/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,8 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
cfgRevision := resp.GetHeader().GetRevision()
var watchMetaChannel, watchConfigChannel chan []*meta_storagepb.Event
if !c.ruConfig.isSingleGroupByKeyspace {
watchMetaChannel, err = c.provider.Watch(ctx, pd.GroupSettingsPathPrefixBytes, pd.WithRev(metaRevision), pd.WithPrefix())
// Use WithPrevKV() to get the previous key-value pair when get Delete Event.
watchMetaChannel, err = c.provider.Watch(ctx, pd.GroupSettingsPathPrefixBytes, pd.WithRev(metaRevision), pd.WithPrefix(), pd.WithPrevKV())
if err != nil {
log.Warn("watch resource group meta failed", zap.Error(err))
}
Expand All @@ -260,7 +261,8 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
}
case <-watchRetryTimer.C:
if !c.ruConfig.isSingleGroupByKeyspace && watchMetaChannel == nil {
watchMetaChannel, err = c.provider.Watch(ctx, pd.GroupSettingsPathPrefixBytes, pd.WithRev(metaRevision), pd.WithPrefix())
// Use WithPrevKV() to get the previous key-value pair when get Delete Event.
watchMetaChannel, err = c.provider.Watch(ctx, pd.GroupSettingsPathPrefixBytes, pd.WithRev(metaRevision), pd.WithPrefix(), pd.WithPrevKV())
if err != nil {
log.Warn("watch resource group meta failed", zap.Error(err))
watchRetryTimer.Reset(watchRetryInterval)
Expand Down Expand Up @@ -319,18 +321,27 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
for _, item := range resp {
metaRevision = item.Kv.ModRevision
group := &rmpb.ResourceGroup{}
if err := proto.Unmarshal(item.Kv.Value, group); err != nil {
continue
}
switch item.Type {
case meta_storagepb.Event_PUT:
if err = proto.Unmarshal(item.Kv.Value, group); err != nil {
continue
}
if item, ok := c.groupsController.Load(group.Name); ok {
gc := item.(*groupCostController)
gc.modifyMeta(group)
}
case meta_storagepb.Event_DELETE:
if _, ok := c.groupsController.LoadAndDelete(group.Name); ok {
resourceGroupStatusGauge.DeleteLabelValues(group.Name)
if item.PrevKv != nil {
if err = proto.Unmarshal(item.PrevKv.Value, group); err != nil {
continue
}
if _, ok := c.groupsController.LoadAndDelete(group.Name); ok {
resourceGroupStatusGauge.DeleteLabelValues(group.Name)
}
} else {
// Prev-kv is compacted means there must have been a delete event before this event,
// which means that this is just a duplicated event, so we can just ignore it.
log.Info("previous key-value pair has been compacted", zap.String("required-key", string(item.Kv.Key)), zap.String("value", string(item.Kv.Value)))
}
}
}
Expand Down
47 changes: 0 additions & 47 deletions client/resource_manager_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ type ResourceManagerClient interface {
ModifyResourceGroup(ctx context.Context, metaGroup *rmpb.ResourceGroup) (string, error)
DeleteResourceGroup(ctx context.Context, resourceGroupName string) (string, error)
LoadResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup, int64, error)
WatchResourceGroup(ctx context.Context, revision int64) (chan []*rmpb.ResourceGroup, error)
AcquireTokenBuckets(ctx context.Context, request *rmpb.TokenBucketsRequest) ([]*rmpb.TokenBucketResponse, error)
Watch(ctx context.Context, key []byte, opts ...OpOption) (chan []*meta_storagepb.Event, error)
}
Expand Down Expand Up @@ -188,52 +187,6 @@ func (c *client) LoadResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup,
return groups, resp.Header.Revision, nil
}

// WatchResourceGroup [just for TEST] watches resource groups changes.
// It returns a stream of slices of resource groups.
// The first message in stream contains all current resource groups,
// all subsequent messages contains new events[PUT/DELETE] for all resource groups.
func (c *client) WatchResourceGroup(ctx context.Context, revision int64) (chan []*rmpb.ResourceGroup, error) {
configChan, err := c.Watch(ctx, GroupSettingsPathPrefixBytes, WithRev(revision), WithPrefix())
if err != nil {
return nil, err
}
resourceGroupWatcherChan := make(chan []*rmpb.ResourceGroup)
go func() {
defer func() {
close(resourceGroupWatcherChan)
if r := recover(); r != nil {
log.Error("[pd] panic in ResourceManagerClient `WatchResourceGroups`", zap.Any("error", r))
return
}
}()
for {
select {
case <-ctx.Done():
return
case res, ok := <-configChan:
if !ok {
return
}
groups := make([]*rmpb.ResourceGroup, 0, len(res))
for _, item := range res {
switch item.Type {
case meta_storagepb.Event_PUT:
group := &rmpb.ResourceGroup{}
if err := proto.Unmarshal(item.Kv.Value, group); err != nil {
return
}
groups = append(groups, group)
case meta_storagepb.Event_DELETE:
continue
}
}
resourceGroupWatcherChan <- groups
}
}
}()
return resourceGroupWatcherChan, err
}

func (c *client) AcquireTokenBuckets(ctx context.Context, request *rmpb.TokenBucketsRequest) ([]*rmpb.TokenBucketResponse, error) {
req := &tokenRequest{
done: make(chan error, 1),
Expand Down
30 changes: 15 additions & 15 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -531,21 +531,6 @@ error = '''
plugin is not found: %s
'''

["PD:operator:ErrRegionAbnormalPeer"]
error = '''
region %v has abnormal peer
'''

["PD:operator:ErrRegionNotAdjacent"]
error = '''
two regions are not adjacent
'''

["PD:operator:ErrRegionNotFound"]
error = '''
region %v not found
'''

["PD:os:ErrOSOpen"]
error = '''
open error
Expand Down Expand Up @@ -616,6 +601,21 @@ error = '''
failed to unmarshal proto
'''

["PD:region:ErrRegionAbnormalPeer"]
error = '''
region %v has abnormal peer
'''

["PD:region:ErrRegionNotAdjacent"]
error = '''
two regions are not adjacent
'''

["PD:region:ErrRegionNotFound"]
error = '''
region %v not found
'''

["PD:region:ErrRegionRuleContent"]
error = '''
invalid region rule content, %s
Expand Down
Loading

0 comments on commit 55adf55

Please sign in to comment.