Skip to content

Commit

Permalink
pool: fix multiple bugs on setPgNumMinMaxProperties (#12)
Browse files Browse the repository at this point in the history
unit test coverage extended

Co-authored-by: Peter Goron <[email protected]>
  • Loading branch information
pgoron and Peter Goron authored Aug 18, 2023
1 parent 891a1a1 commit fe36239
Show file tree
Hide file tree
Showing 3 changed files with 307 additions and 61 deletions.
102 changes: 59 additions & 43 deletions pkg/daemon/ceph/client/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,13 @@ const (
reallyConfirmFlag = "--yes-i-really-really-mean-it"
targetSizeRatioProperty = "target_size_ratio"
CompressionModeProperty = "compression_mode"
PgNumProperty = "pg_num"
PgpNumProperty = "pgp_num"
PgNumMinProperty = "pg_num_min"
PgNumMaxProperty = "pg_num_max"
PgAutoscaleModeProperty = "pg_autoscale_mode"
PgAutoscaleModeOn = "on"
PgAutoscaleModeOff = "off"
)

type CephStoragePoolSummary struct {
Expand Down Expand Up @@ -191,6 +196,23 @@ func CreatePoolWithPGs(context *clusterd.Context, clusterInfo *ClusterInfo, clus
if pool.Name == "" {
return errors.New("pool name must be specified")
}

// honor pg_num_min if provided by user
if pgNumMinStr, found := pool.Parameters[PgNumMinProperty]; found {
pgNumMin, err := strconv.Atoi(pgNumMinStr)
if err != nil {
return errors.Errorf("failed to parse pg_num_min parameter of pool %q. %v", pool.Name, err)
}
pgNum, err := strconv.Atoi(pgCount)
if err != nil {
return errors.Errorf("failed to parse pgCount of pool %q. %v", pool.Name, err)
}

if pgNum < pgNumMin {
pgCount = strconv.Itoa(pgNumMin)
}
}

if pool.IsReplicated() {
return createReplicatedPoolForApp(context, clusterInfo, clusterSpec, pool, pgCount, appName)
}
Expand Down Expand Up @@ -285,14 +307,18 @@ func givePoolAppTag(context *clusterd.Context, clusterInfo *ClusterInfo, poolNam
return nil
}

func setPgNumMinMaxProperties(context *clusterd.Context, clusterInfo *ClusterInfo, pool cephv1.NamedPoolSpec, poolDetails *CephStoragePoolDetails) error {
needToRestoreAutoScaler := ("on" == poolDetails.PgAutoScaleMode)
func setPgNumMinMaxProperties(context *clusterd.Context, clusterInfo *ClusterInfo, pool cephv1.NamedPoolSpec) error {
poolDetails, err := GetPoolDetails(context, clusterInfo, pool.Name)
if err != nil {
return err
}

needToRestoreAutoScaler := (PgAutoscaleModeOn == poolDetails.PgAutoScaleMode)
curPgNum := poolDetails.PgNum
curPgNumMin := -1
curPgNumMax := -1
wantedPgNumMin := -1
wantedPgNumMax := -1
var err error
var wantedPgNumMin int
var wantedPgNumMax int

if poolDetails.PgNumMin != nil {
curPgNumMin = (*poolDetails.PgNumMin)
Expand All @@ -302,27 +328,29 @@ func setPgNumMinMaxProperties(context *clusterd.Context, clusterInfo *ClusterInf
}

// parse pg_num_min and ensure consistency with pg_num_max if already set
if wantedPgNumMinStr, found := pool.Parameters["pg_num_min"]; found {
if wantedPgNumMinStr, found := pool.Parameters[PgNumMinProperty]; found {
wantedPgNumMin, err = strconv.Atoi(wantedPgNumMinStr)
if err != nil {
logger.Errorf("failed to parse pg_num_min parameter of pool %q. %v", pool.Name, err)
wantedPgNumMin = curPgNumMin
}
if curPgNumMax != -1 && wantedPgNumMin > curPgNumMax {
return errors.Errorf("wanted pg_num_min (%d) can't be greater than current pg_num_max (%d) for pool %q", wantedPgNumMin, curPgNumMax, pool.Name)
}
} else {
wantedPgNumMin = curPgNumMin
}

// parse pg_num_max and ensure consistency with pg_num_min if already set
if wantedPgNumMaxStr, found := pool.Parameters["pg_num_max"]; found {
if wantedPgNumMaxStr, found := pool.Parameters[PgNumMaxProperty]; found {
wantedPgNumMax, err = strconv.Atoi(wantedPgNumMaxStr)
if err != nil {
logger.Errorf("failed to parse pg_num_max parameter of pool %q. %v", pool.Name, err)
wantedPgNumMax = curPgNumMax
}
if curPgNumMin != -1 && wantedPgNumMax < curPgNumMin {
return errors.Errorf("wanted pg_num_max (%d) can't be less than current pg_num_min (%d) for pool %q", wantedPgNumMax, curPgNumMin, pool.Name)
}
} else {
wantedPgNumMax = curPgNumMax
}

if wantedPgNumMin != -1 && wantedPgNumMax != -1 && wantedPgNumMin > wantedPgNumMax {
return errors.Errorf("pg_num_min (%d) can't be greater than pg_num_max (%d) for pool %q", wantedPgNumMin, wantedPgNumMax, pool.Name)
}

if wantedPgNumMin == curPgNumMin && wantedPgNumMax == curPgNumMax {
Expand All @@ -337,87 +365,80 @@ func setPgNumMinMaxProperties(context *clusterd.Context, clusterInfo *ClusterInf

// restore pg autoscaler if disabled during adjustment of pg_num_min & pg_num_max
defer func() {
if poolDetails.PgAutoScaleMode == "off" && needToRestoreAutoScaler {
SetPoolProperty(context, clusterInfo, pool.Name, "pg_autoscale_mode", "on")
if poolDetails.PgAutoScaleMode == PgAutoscaleModeOff && needToRestoreAutoScaler {
SetPoolProperty(context, clusterInfo, pool.Name, PgAutoscaleModeProperty, PgAutoscaleModeOn)
}
}()

if wantedPgNumMin != curPgNumMin {
if wantedPgNumMin > int(curPgNum) {
// number of PG is below expected pg_num_min, we must increase pg(p)_num before adjusting pg_num_min
// pg_autoscale_mode need to be disabled during the operation to make process reliable
if poolDetails.PgAutoScaleMode == "on" {
err = SetPoolProperty(context, clusterInfo, pool.Name, "pg_autoscale_mode", "off")
if poolDetails.PgAutoScaleMode == PgAutoscaleModeOn {
err = SetPoolProperty(context, clusterInfo, pool.Name, PgAutoscaleModeProperty, PgAutoscaleModeOff)
if err != nil {
return errors.Wrapf(err, "failed to disable pg autoscaler on pool %q before applying pg_num_min", pool.Name)
}
poolDetails.PgAutoScaleMode = "off"
poolDetails.PgAutoScaleMode = PgAutoscaleModeOff
}

err = SetPoolProperty(context, clusterInfo, pool.Name, "pg_num", strconv.Itoa(wantedPgNumMin))
err = SetPoolProperty(context, clusterInfo, pool.Name, PgNumProperty, strconv.Itoa(wantedPgNumMin))
if err != nil {
return errors.Wrapf(err, "failed to align pg_num to pg_num_min for pool %q", pool.Name)
}
curPgNum = uint(wantedPgNumMin)
poolDetails.PgNum = curPgNum

err = SetPoolProperty(context, clusterInfo, pool.Name, "pgp_num", strconv.Itoa(wantedPgNumMin))
err = SetPoolProperty(context, clusterInfo, pool.Name, PgpNumProperty, strconv.Itoa(wantedPgNumMin))
if err != nil {
return errors.Wrapf(err, "failed to align pgp_num to pg_num_min for pool %q", pool.Name)
}
poolDetails.PgpNum = uint(wantedPgNumMin)
}

err := SetPoolProperty(context, clusterInfo, pool.Name, "pg_num_min", strconv.Itoa(wantedPgNumMin))
err := SetPoolProperty(context, clusterInfo, pool.Name, PgNumMinProperty, strconv.Itoa(wantedPgNumMin))
if err != nil {
return errors.Wrapf(err, "failed to set pg_num_min quota for pool %q", pool.Name)
}
curPgNumMin = wantedPgNumMin
poolDetails.PgNumMin = &curPgNumMin
}

if wantedPgNumMax != curPgNumMax {
if wantedPgNumMax < int(curPgNum) {
// number of PG is above expected pg_num_max, we must decrease pg(p)_num before adjusting pg_num_max
// pg_autoscale_mode need to be disabled during the operation to make process reliable
if poolDetails.PgAutoScaleMode == "on" {
err = SetPoolProperty(context, clusterInfo, pool.Name, "pg_autoscale_mode", "off")
if poolDetails.PgAutoScaleMode == PgAutoscaleModeOn {
err = SetPoolProperty(context, clusterInfo, pool.Name, PgAutoscaleModeProperty, PgAutoscaleModeOff)
if err != nil {
return errors.Wrapf(err, "failed to disable pg autoscaler on pool %q before applying pg_num_max", pool.Name)
}
poolDetails.PgAutoScaleMode = "off"
poolDetails.PgAutoScaleMode = PgAutoscaleModeOff
}

err = SetPoolProperty(context, clusterInfo, pool.Name, "pg_num", strconv.Itoa(wantedPgNumMax))
err = SetPoolProperty(context, clusterInfo, pool.Name, PgNumProperty, strconv.Itoa(wantedPgNumMax))
if err != nil {
return errors.Wrapf(err, "failed to align pg_num to pg_num_max for pool %q", pool.Name)
}
curPgNum = uint(wantedPgNumMax)
poolDetails.PgNum = curPgNum

err = SetPoolProperty(context, clusterInfo, pool.Name, "pgp_num", strconv.Itoa(wantedPgNumMax))
err = SetPoolProperty(context, clusterInfo, pool.Name, PgpNumProperty, strconv.Itoa(wantedPgNumMax))
if err != nil {
return errors.Wrapf(err, "failed to align pgp_num to pg_num_min for pool %q", pool.Name)
}
poolDetails.PgpNum = uint(wantedPgNumMax)
}

err := SetPoolProperty(context, clusterInfo, pool.Name, "pg_num_max", strconv.Itoa(wantedPgNumMax))
err := SetPoolProperty(context, clusterInfo, pool.Name, PgNumMaxProperty, strconv.Itoa(wantedPgNumMax))
if err != nil {
return errors.Wrapf(err, "failed to set pg_num_max quota for pool %q", pool.Name)
}
curPgNumMin = wantedPgNumMax
poolDetails.PgNumMin = &curPgNumMin
}

return nil
}

func isASpecificallyManagedPoolProperty(propName string) bool {
switch propName {
case "pg_num_min":
case PgNumMinProperty:
return true
case "pg_num_max":
case PgNumMaxProperty:
return true
}
return false
Expand Down Expand Up @@ -548,13 +569,8 @@ func createECPoolForApp(context *clusterd.Context, clusterInfo *ClusterInfo, ecP
}
}

poolDetails, err := GetPoolDetails(context, clusterInfo, pool.Name)
if err != nil {
return errors.Wrapf(err, "failed to get pool details for pool %s", pool.Name)
}

// update pg_num_min/pg_num_max properties
if err := setPgNumMinMaxProperties(context, clusterInfo, pool, &poolDetails); err != nil {
if err := setPgNumMinMaxProperties(context, clusterInfo, pool); err != nil {
return err
}

Expand Down Expand Up @@ -622,7 +638,7 @@ func createReplicatedPoolForApp(context *clusterd.Context, clusterInfo *ClusterI
}

// update pg_num_min/pg_num_max properties
if err := setPgNumMinMaxProperties(context, clusterInfo, pool, &poolDetails); err != nil {
if err := setPgNumMinMaxProperties(context, clusterInfo, pool); err != nil {
return err
}

Expand Down
Loading

0 comments on commit fe36239

Please sign in to comment.