Skip to content

Commit

Permalink
fix cgroup monitor
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed Mar 14, 2024
1 parent c2c5d84 commit 4dec7d0
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 53 deletions.
108 changes: 61 additions & 47 deletions server/cgmon.go → pkg/cgroup/cgmon.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package server
package cgroup

import (
"context"
Expand All @@ -24,15 +24,15 @@ import (
"github.com/pingcap/log"
"github.com/shirou/gopsutil/v3/mem"
bs "github.com/tikv/pd/pkg/basicserver"
"github.com/tikv/pd/pkg/cgroup"
"go.uber.org/zap"
)

const (
refreshInterval = 10 * time.Second
)

type cgroupMonitor struct {
// Monitor is used to monitor the cgroup.
type Monitor struct {
started bool
ctx context.Context
cancel context.CancelFunc
Expand All @@ -42,70 +42,85 @@ type cgroupMonitor struct {
lastMemoryLimit uint64
}

// StartCgroupMonitor uses to start the cgroup monitoring.
// StartMonitor uses to start the cgroup monitoring.
// WARN: this function is not thread-safe.
func (cgmon *cgroupMonitor) startCgroupMonitor(ctx context.Context) {
if cgmon.started {
func (m *Monitor) StartMonitor(ctx context.Context) {
if m.started {
return
}
cgmon.started = true
// Get configured maxprocs.
cgmon.cfgMaxProcs = runtime.GOMAXPROCS(0)
cgmon.ctx, cgmon.cancel = context.WithCancel(ctx)
cgmon.wg.Add(1)
go cgmon.refreshCgroupLoop()
m.started = true
if runtime.GOOS != "linux" {
return

Check warning on line 53 in pkg/cgroup/cgmon.go

View check run for this annotation

Codecov / codecov/patch

pkg/cgroup/cgmon.go#L53

Added line #L53 was not covered by tests
}
m.ctx, m.cancel = context.WithCancel(ctx)
m.wg.Add(1)
go m.refreshCgroupLoop()
log.Info("cgroup monitor started")
}

// StopCgroupMonitor uses to stop the cgroup monitoring.
// StopMonitor uses to stop the cgroup monitoring.
// WARN: this function is not thread-safe.
func (cgmon *cgroupMonitor) stopCgroupMonitor() {
if !cgmon.started {
func (m *Monitor) StopMonitor() {
if !m.started {
return

Check warning on line 65 in pkg/cgroup/cgmon.go

View check run for this annotation

Codecov / codecov/patch

pkg/cgroup/cgmon.go#L65

Added line #L65 was not covered by tests
}
if runtime.GOOS != "linux" {
return
}
cgmon.started = false
if cgmon.cancel != nil {
cgmon.cancel()
m.started = false
if m.cancel != nil {
m.cancel()
}
cgmon.wg.Wait()
m.wg.Wait()
log.Info("cgroup monitor stopped")
}

func (cgmon *cgroupMonitor) refreshCgroupLoop() {
func (m *Monitor) refreshCgroupLoop() {
ticker := time.NewTicker(refreshInterval)
defer func() {
if r := recover(); r != nil {
log.Error("[pd] panic in the recoverable goroutine",
zap.String("funcInfo", "refreshCgroupLoop"),
zap.String("func-info", "refreshCgroupLoop"),

Check warning on line 83 in pkg/cgroup/cgmon.go

View check run for this annotation

Codecov / codecov/patch

pkg/cgroup/cgmon.go#L83

Added line #L83 was not covered by tests
zap.Reflect("r", r),
zap.Stack("stack"))
}
cgmon.wg.Done()
m.wg.Done()
ticker.Stop()
}()

cgmon.refreshCgroupCPU()
cgmon.refreshCgroupMemory()
err := m.refreshCgroupCPU()
if err != nil {
log.Warn("failed to get cgroup memory limit", zap.Error(err))

Check warning on line 93 in pkg/cgroup/cgmon.go

View check run for this annotation

Codecov / codecov/patch

pkg/cgroup/cgmon.go#L93

Added line #L93 was not covered by tests
}
err = m.refreshCgroupMemory()
if err != nil {
log.Warn("failed to get cgroup memory limit", zap.Error(err))

Check warning on line 97 in pkg/cgroup/cgmon.go

View check run for this annotation

Codecov / codecov/patch

pkg/cgroup/cgmon.go#L97

Added line #L97 was not covered by tests
}
for {
select {
case <-cgmon.ctx.Done():
case <-m.ctx.Done():
return
case <-ticker.C:
cgmon.refreshCgroupCPU()
cgmon.refreshCgroupMemory()
err = m.refreshCgroupCPU()
if err != nil {
log.Debug("failed to get cgroup cpu quota", zap.Error(err))

Check warning on line 106 in pkg/cgroup/cgmon.go

View check run for this annotation

Codecov / codecov/patch

pkg/cgroup/cgmon.go#L106

Added line #L106 was not covered by tests
}
err = m.refreshCgroupMemory()
if err != nil {
log.Debug("failed to get cgroup memory limit", zap.Error(err))

Check warning on line 110 in pkg/cgroup/cgmon.go

View check run for this annotation

Codecov / codecov/patch

pkg/cgroup/cgmon.go#L110

Added line #L110 was not covered by tests
}
}
}
}

func (cgmon *cgroupMonitor) refreshCgroupCPU() {
func (m *Monitor) refreshCgroupCPU() error {
// Get the number of CPUs.
quota := runtime.NumCPU()

// Get CPU quota from cgroup.
cpuPeriod, cpuQuota, err := cgroup.GetCPUPeriodAndQuota()
cpuPeriod, cpuQuota, err := GetCPUPeriodAndQuota()
if err != nil {
log.Warn("failed to get cgroup cpu quota", zap.Error(err))
return
return err

Check warning on line 123 in pkg/cgroup/cgmon.go

View check run for this annotation

Codecov / codecov/patch

pkg/cgroup/cgmon.go#L123

Added line #L123 was not covered by tests
}
if cpuPeriod > 0 && cpuQuota > 0 {
ratio := float64(cpuQuota) / float64(cpuPeriod)
Expand All @@ -114,35 +129,34 @@ func (cgmon *cgroupMonitor) refreshCgroupCPU() {
}
}

if quota != cgmon.lastMaxProcs && quota < cgmon.cfgMaxProcs {
runtime.GOMAXPROCS(quota)
if quota != m.lastMaxProcs {
log.Info("set the maxprocs", zap.Int("quota", quota))
bs.ServerMaxProcsGauge.Set(float64(quota))
cgmon.lastMaxProcs = quota
} else if cgmon.lastMaxProcs == 0 {
log.Info("set the maxprocs", zap.Int("cfgMaxProcs", cgmon.cfgMaxProcs))
bs.ServerMaxProcsGauge.Set(float64(cgmon.cfgMaxProcs))
cgmon.lastMaxProcs = cgmon.cfgMaxProcs
m.lastMaxProcs = quota
} else if m.lastMaxProcs == 0 {
log.Info("set the maxprocs", zap.Int("maxprocs", m.cfgMaxProcs))
bs.ServerMaxProcsGauge.Set(float64(m.cfgMaxProcs))
m.lastMaxProcs = m.cfgMaxProcs

Check warning on line 139 in pkg/cgroup/cgmon.go

View check run for this annotation

Codecov / codecov/patch

pkg/cgroup/cgmon.go#L137-L139

Added lines #L137 - L139 were not covered by tests
}
return nil
}

func (cgmon *cgroupMonitor) refreshCgroupMemory() {
memLimit, err := cgroup.GetMemoryLimit()
func (m *Monitor) refreshCgroupMemory() error {
memLimit, err := GetMemoryLimit()
if err != nil {
log.Warn("failed to get cgroup memory limit", zap.Error(err))
return
return err

Check warning on line 147 in pkg/cgroup/cgmon.go

View check run for this annotation

Codecov / codecov/patch

pkg/cgroup/cgmon.go#L147

Added line #L147 was not covered by tests
}
vmem, err := mem.VirtualMemory()
if err != nil {
log.Warn("failed to get system memory size", zap.Error(err))
return
return err

Check warning on line 151 in pkg/cgroup/cgmon.go

View check run for this annotation

Codecov / codecov/patch

pkg/cgroup/cgmon.go#L151

Added line #L151 was not covered by tests
}
if memLimit > vmem.Total {
memLimit = vmem.Total
}
if memLimit != cgmon.lastMemoryLimit {
log.Info("set the memory limit", zap.Uint64("memLimit", memLimit))
if memLimit != m.lastMemoryLimit {
log.Info("set the memory limit", zap.Uint64("mem-limit", memLimit))
bs.ServerMemoryLimit.Set(float64(memLimit))
cgmon.lastMemoryLimit = memLimit
m.lastMemoryLimit = memLimit
}
return nil
}
6 changes: 3 additions & 3 deletions pkg/cgroup/cgroup_cpu_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ import (

// GetCgroupCPU returns the CPU usage and quota for the current cgroup.
func GetCgroupCPU() (CPUUsage, error) {
cpuusage, err := getCgroupCPUHelper("/")
cpuusage.NumCPU = runtime.NumCPU()
return cpuusage, err
cpuUsage, err := getCgroupCPUHelper("/")
cpuUsage.NumCPU = runtime.NumCPU()
return cpuUsage, err
}

// CPUQuotaToGOMAXPROCS converts the CPU quota applied to the calling process
Expand Down
7 changes: 4 additions & 3 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/pingcap/sysutil"
"github.com/tikv/pd/pkg/audit"
bs "github.com/tikv/pd/pkg/basicserver"
"github.com/tikv/pd/pkg/cgroup"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/encryption"
"github.com/tikv/pd/pkg/errs"
Expand Down Expand Up @@ -237,7 +238,7 @@ type Server struct {
schedulingPrimaryWatcher *etcdutil.LoopWatcher

// Cgroup Monitor
cgmon cgroupMonitor
cgMonitor cgroup.Monitor
}

// HandlerBuilder builds a server HTTP handler.
Expand Down Expand Up @@ -545,7 +546,7 @@ func (s *Server) Close() {

log.Info("closing server")

s.cgmon.stopCgroupMonitor()
s.cgMonitor.StopMonitor()

s.stopServerLoop()
if s.IsAPIServiceMode() {
Expand Down Expand Up @@ -622,7 +623,7 @@ func (s *Server) Run() error {
return err
}

s.cgmon.startCgroupMonitor(s.ctx)
s.cgMonitor.StartMonitor(s.ctx)

failpoint.Inject("delayStartServerLoop", func() {
time.Sleep(2 * time.Second)
Expand Down

0 comments on commit 4dec7d0

Please sign in to comment.