-
Notifications
You must be signed in to change notification settings - Fork 720
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
rg/controller: use the default resource group if the requested one doesn't exist #8387
Merged
Merged
Changes from 1 commit
Commits
Show all changes
3 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -38,6 +38,7 @@ | |
) | ||
|
||
const ( | ||
defaultResourceGroupName = "default" | ||
controllerConfigPath = "resource_group/controller" | ||
maxNotificationChanLen = 200 | ||
needTokensAmplification = 1.1 | ||
|
@@ -356,22 +357,27 @@ | |
if err = proto.Unmarshal(item.Kv.Value, group); err != nil { | ||
continue | ||
} | ||
if item, ok := c.groupsController.Load(group.Name); ok { | ||
gc := item.(*groupCostController) | ||
if gc, ok := c.loadGroupController(group.Name); ok { | ||
gc.modifyMeta(group) | ||
} | ||
case meta_storagepb.Event_DELETE: | ||
if item.PrevKv != nil { | ||
if err = proto.Unmarshal(item.PrevKv.Value, group); err != nil { | ||
continue | ||
} | ||
if _, ok := c.groupsController.LoadAndDelete(group.Name); ok { | ||
// Do not delete the resource group immediately, just mark it as tombstone. | ||
// For the requests that are still in progress, fallback to the default resource group. | ||
if gc, ok := c.loadGroupController(group.Name); ok { | ||
gc.tombstone.Store(true) | ||
resourceGroupStatusGauge.DeleteLabelValues(group.Name, group.Name) | ||
resourceGroupStatusGauge.WithLabelValues(group.Name, defaultResourceGroupName).Set(1) | ||
log.Info("[resource group controller] mark resource group as tombstone", zap.String("name", group.Name)) | ||
} | ||
} else { | ||
// Prev-kv is compacted means there must have been a delete event before this event, | ||
// which means that this is just a duplicated event, so we can just ignore it. | ||
log.Info("previous key-value pair has been compacted", zap.String("required-key", string(item.Kv.Key)), zap.String("value", string(item.Kv.Value))) | ||
log.Info("[resource group controller] previous key-value pair has been compacted", | ||
zap.String("required-key", string(item.Kv.Key)), zap.String("value", string(item.Kv.Value))) | ||
} | ||
} | ||
} | ||
|
@@ -420,12 +426,32 @@ | |
return nil | ||
} | ||
|
||
// loadGroupController just wraps the `Load` method of `sync.Map`. | ||
func (c *ResourceGroupsController) loadGroupController(name string) (*groupCostController, bool) { | ||
tmp, ok := c.groupsController.Load(name) | ||
if !ok { | ||
return nil, false | ||
} | ||
return tmp.(*groupCostController), true | ||
} | ||
|
||
// loadOrStoreGroupController just wraps the `LoadOrStore` method of `sync.Map`. | ||
func (c *ResourceGroupsController) loadOrStoreGroupController(name string, gc *groupCostController) (*groupCostController, bool) { | ||
tmp, loaded := c.groupsController.LoadOrStore(name, gc) | ||
return tmp.(*groupCostController), loaded | ||
} | ||
|
||
// tryGetResourceGroup will try to get the resource group controller from local cache first, | ||
// if the local cache misses, it will then call gRPC to fetch the resource group info from server. | ||
func (c *ResourceGroupsController) tryGetResourceGroup(ctx context.Context, name string) (*groupCostController, error) { | ||
// Get from the local cache first. | ||
if tmp, ok := c.groupsController.Load(name); ok { | ||
return tmp.(*groupCostController), nil | ||
gc, ok := c.loadGroupController(name) | ||
if ok { | ||
// If the resource group is marked as tombstone, fallback to the default resource group. | ||
if gc.tombstone.Load() { | ||
return c.tryGetResourceGroup(ctx, defaultResourceGroupName) | ||
} | ||
return gc, nil | ||
} | ||
// Call gRPC to fetch the resource group info. | ||
group, err := c.provider.GetResourceGroup(ctx, name) | ||
|
@@ -436,24 +462,21 @@ | |
return nil, errors.Errorf("%s does not exists", name) | ||
} | ||
// Check again to prevent initializing the same resource group concurrently. | ||
if tmp, ok := c.groupsController.Load(name); ok { | ||
gc := tmp.(*groupCostController) | ||
if gc, ok = c.loadGroupController(name); ok { | ||
return gc, nil | ||
} | ||
// Initialize the resource group controller. | ||
gc, err := newGroupCostController(group, c.ruConfig, c.lowTokenNotifyChan, c.tokenBucketUpdateChan) | ||
gc, err = newGroupCostController(group, c.ruConfig, c.lowTokenNotifyChan, c.tokenBucketUpdateChan) | ||
if err != nil { | ||
return nil, err | ||
} | ||
// TODO: re-init the state if user change mode from RU to RAW mode. | ||
gc.initRunState() | ||
// Check again to prevent initializing the same resource group concurrently. | ||
tmp, loaded := c.groupsController.LoadOrStore(group.GetName(), gc) | ||
gc, loaded := c.loadOrStoreGroupController(group.Name, gc) | ||
if !loaded { | ||
resourceGroupStatusGauge.WithLabelValues(name, group.Name).Set(1) | ||
log.Info("[resource group controller] create resource group cost controller", zap.String("name", group.GetName())) | ||
} | ||
return tmp.(*groupCostController), nil | ||
return gc, nil | ||
} | ||
|
||
func (c *ResourceGroupsController) cleanUpResourceGroup() { | ||
|
@@ -465,14 +488,15 @@ | |
latestConsumption := *gc.mu.consumption | ||
gc.mu.Unlock() | ||
if equalRU(latestConsumption, *gc.run.consumption) { | ||
if gc.tombstone { | ||
if gc.inactive || gc.tombstone.Load() { | ||
c.groupsController.Delete(resourceGroupName) | ||
resourceGroupStatusGauge.DeleteLabelValues(resourceGroupName, resourceGroupName) | ||
resourceGroupStatusGauge.DeleteLabelValues(resourceGroupName, defaultResourceGroupName) | ||
return true | ||
} | ||
gc.tombstone = true | ||
gc.inactive = true | ||
} else { | ||
gc.tombstone = false | ||
gc.inactive = false | ||
} | ||
return true | ||
}) | ||
|
@@ -498,12 +522,11 @@ | |
c.run.inDegradedMode = false | ||
for _, res := range resp { | ||
name := res.GetResourceGroupName() | ||
v, ok := c.groupsController.Load(name) | ||
gc, ok := c.loadGroupController(name) | ||
if !ok { | ||
log.Warn("[resource group controller] a non-existent resource group was found when handle token response", zap.String("name", name)) | ||
continue | ||
} | ||
gc := v.(*groupCostController) | ||
gc.handleTokenBucketResponse(res) | ||
} | ||
} | ||
|
@@ -572,12 +595,16 @@ | |
func (c *ResourceGroupsController) OnResponse( | ||
resourceGroupName string, req RequestInfo, resp ResponseInfo, | ||
) (*rmpb.Consumption, error) { | ||
tmp, ok := c.groupsController.Load(resourceGroupName) | ||
gc, ok := c.loadGroupController(resourceGroupName) | ||
if !ok { | ||
// If the resource group does not exist, use the default resource group. | ||
if resourceGroupName != defaultResourceGroupName { | ||
return c.OnResponse(defaultResourceGroupName, req, resp) | ||
} | ||
log.Warn("[resource group controller] resource group name does not exist", zap.String("name", resourceGroupName)) | ||
return &rmpb.Consumption{}, nil | ||
Comment on lines
605
to
606
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need to advance the log and remove There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's better not to do so since that may cause a lot of logs during the workload. |
||
} | ||
return tmp.(*groupCostController).onResponse(req, resp) | ||
return gc.onResponse(req, resp) | ||
} | ||
|
||
// IsBackgroundRequest If the resource group has background jobs, we should not record consumption and wait for it. | ||
|
@@ -594,8 +621,7 @@ | |
func (c *ResourceGroupsController) checkBackgroundSettings(ctx context.Context, bg *rmpb.BackgroundSettings, requestResource string) bool { | ||
// fallback to default resource group. | ||
if bg == nil { | ||
resourceGroupName := "default" | ||
gc, err := c.tryGetResourceGroup(ctx, resourceGroupName) | ||
gc, err := c.tryGetResourceGroup(ctx, defaultResourceGroupName) | ||
if err != nil { | ||
return false | ||
} | ||
|
@@ -681,7 +707,10 @@ | |
requestUnitTokens map[rmpb.RequestUnitType]*tokenCounter | ||
} | ||
|
||
tombstone bool | ||
// tombstone is set to true when the resource group is deleted. | ||
tombstone atomic.Bool | ||
// inactive is set to true when the resource group has not been updated for a long time. | ||
inactive bool | ||
} | ||
|
||
type groupMetricsCollection struct { | ||
|
@@ -774,6 +803,8 @@ | |
gc.mu.consumption = &rmpb.Consumption{} | ||
gc.mu.storeCounter = make(map[uint64]*rmpb.Consumption) | ||
gc.mu.globalCounter = &rmpb.Consumption{} | ||
// TODO: re-init the state if user change mode from RU to RAW mode. | ||
gc.initRunState() | ||
return gc, nil | ||
} | ||
|
||
|
@@ -1359,14 +1390,14 @@ | |
return delta, nil | ||
} | ||
|
||
// GetActiveResourceGroup is used to get action resource group. | ||
// GetActiveResourceGroup is used to get active resource group. | ||
// This is used for test only. | ||
func (c *ResourceGroupsController) GetActiveResourceGroup(resourceGroupName string) *rmpb.ResourceGroup { | ||
tmp, ok := c.groupsController.Load(resourceGroupName) | ||
if !ok { | ||
gc, ok := c.loadGroupController(resourceGroupName) | ||
if !ok || gc.tombstone.Load() { | ||
return nil | ||
} | ||
return tmp.(*groupCostController).getMeta() | ||
return gc.getMeta() | ||
} | ||
|
||
// This is used for test only. | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we reset the tombstone flag in here if the user deletes it and creates it again?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch. Fixed.