Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pkg: fix cgroup monitor #7918

Merged
merged 2 commits into from
Mar 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 61 additions & 47 deletions server/cgmon.go → pkg/cgroup/cgmon.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package server
package cgroup

import (
"context"
Expand All @@ -24,15 +24,15 @@ import (
"github.com/pingcap/log"
"github.com/shirou/gopsutil/v3/mem"
bs "github.com/tikv/pd/pkg/basicserver"
"github.com/tikv/pd/pkg/cgroup"
"go.uber.org/zap"
)

const (
refreshInterval = 10 * time.Second
)

type cgroupMonitor struct {
// Monitor is used to monitor the cgroup.
type Monitor struct {
started bool
ctx context.Context
cancel context.CancelFunc
Expand All @@ -42,70 +42,85 @@ type cgroupMonitor struct {
lastMemoryLimit uint64
}

// StartCgroupMonitor uses to start the cgroup monitoring.
// StartMonitor uses to start the cgroup monitoring.
// WARN: this function is not thread-safe.
func (cgmon *cgroupMonitor) startCgroupMonitor(ctx context.Context) {
if cgmon.started {
func (m *Monitor) StartMonitor(ctx context.Context) {
if m.started {
return
}
cgmon.started = true
// Get configured maxprocs.
cgmon.cfgMaxProcs = runtime.GOMAXPROCS(0)
cgmon.ctx, cgmon.cancel = context.WithCancel(ctx)
cgmon.wg.Add(1)
go cgmon.refreshCgroupLoop()
m.started = true
if runtime.GOOS != "linux" {
return
}
m.ctx, m.cancel = context.WithCancel(ctx)
m.wg.Add(1)
go m.refreshCgroupLoop()
log.Info("cgroup monitor started")
}

// StopCgroupMonitor uses to stop the cgroup monitoring.
// StopMonitor uses to stop the cgroup monitoring.
// WARN: this function is not thread-safe.
func (cgmon *cgroupMonitor) stopCgroupMonitor() {
if !cgmon.started {
func (m *Monitor) StopMonitor() {
if !m.started {
return
}
if runtime.GOOS != "linux" {
return
}
cgmon.started = false
if cgmon.cancel != nil {
cgmon.cancel()
m.started = false
if m.cancel != nil {
m.cancel()
}
cgmon.wg.Wait()
m.wg.Wait()
log.Info("cgroup monitor stopped")
}

func (cgmon *cgroupMonitor) refreshCgroupLoop() {
func (m *Monitor) refreshCgroupLoop() {
ticker := time.NewTicker(refreshInterval)
defer func() {
if r := recover(); r != nil {
log.Error("[pd] panic in the recoverable goroutine",
zap.String("funcInfo", "refreshCgroupLoop"),
zap.String("func-info", "refreshCgroupLoop"),
zap.Reflect("r", r),
zap.Stack("stack"))
}
cgmon.wg.Done()
m.wg.Done()
ticker.Stop()
}()

cgmon.refreshCgroupCPU()
cgmon.refreshCgroupMemory()
err := m.refreshCgroupCPU()
if err != nil {
log.Warn("failed to get cgroup memory limit", zap.Error(err))
}
err = m.refreshCgroupMemory()
if err != nil {
log.Warn("failed to get cgroup memory limit", zap.Error(err))
}
for {
select {
case <-cgmon.ctx.Done():
case <-m.ctx.Done():
return
case <-ticker.C:
cgmon.refreshCgroupCPU()
cgmon.refreshCgroupMemory()
err = m.refreshCgroupCPU()
if err != nil {
log.Debug("failed to get cgroup cpu quota", zap.Error(err))
}
err = m.refreshCgroupMemory()
if err != nil {
log.Debug("failed to get cgroup memory limit", zap.Error(err))
}
}
}
}

func (cgmon *cgroupMonitor) refreshCgroupCPU() {
func (m *Monitor) refreshCgroupCPU() error {
// Get the number of CPUs.
quota := runtime.NumCPU()

// Get CPU quota from cgroup.
cpuPeriod, cpuQuota, err := cgroup.GetCPUPeriodAndQuota()
cpuPeriod, cpuQuota, err := GetCPUPeriodAndQuota()
if err != nil {
log.Warn("failed to get cgroup cpu quota", zap.Error(err))
return
return err
}
if cpuPeriod > 0 && cpuQuota > 0 {
ratio := float64(cpuQuota) / float64(cpuPeriod)
Expand All @@ -114,35 +129,34 @@ func (cgmon *cgroupMonitor) refreshCgroupCPU() {
}
}

if quota != cgmon.lastMaxProcs && quota < cgmon.cfgMaxProcs {
runtime.GOMAXPROCS(quota)
if quota != m.lastMaxProcs {
log.Info("set the maxprocs", zap.Int("quota", quota))
bs.ServerMaxProcsGauge.Set(float64(quota))
cgmon.lastMaxProcs = quota
} else if cgmon.lastMaxProcs == 0 {
log.Info("set the maxprocs", zap.Int("cfgMaxProcs", cgmon.cfgMaxProcs))
bs.ServerMaxProcsGauge.Set(float64(cgmon.cfgMaxProcs))
cgmon.lastMaxProcs = cgmon.cfgMaxProcs
m.lastMaxProcs = quota
} else if m.lastMaxProcs == 0 {
log.Info("set the maxprocs", zap.Int("maxprocs", m.cfgMaxProcs))
bs.ServerMaxProcsGauge.Set(float64(m.cfgMaxProcs))
m.lastMaxProcs = m.cfgMaxProcs
}
return nil
}

func (cgmon *cgroupMonitor) refreshCgroupMemory() {
memLimit, err := cgroup.GetMemoryLimit()
func (m *Monitor) refreshCgroupMemory() error {
memLimit, err := GetMemoryLimit()
if err != nil {
log.Warn("failed to get cgroup memory limit", zap.Error(err))
return
return err
}
vmem, err := mem.VirtualMemory()
if err != nil {
log.Warn("failed to get system memory size", zap.Error(err))
return
return err
}
if memLimit > vmem.Total {
memLimit = vmem.Total
}
if memLimit != cgmon.lastMemoryLimit {
log.Info("set the memory limit", zap.Uint64("memLimit", memLimit))
if memLimit != m.lastMemoryLimit {
log.Info("set the memory limit", zap.Uint64("mem-limit", memLimit))
bs.ServerMemoryLimit.Set(float64(memLimit))
cgmon.lastMemoryLimit = memLimit
m.lastMemoryLimit = memLimit
}
return nil
}
6 changes: 3 additions & 3 deletions pkg/cgroup/cgroup_cpu_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ import (

// GetCgroupCPU returns the CPU usage and quota for the current cgroup.
func GetCgroupCPU() (CPUUsage, error) {
cpuusage, err := getCgroupCPUHelper("/")
cpuusage.NumCPU = runtime.NumCPU()
return cpuusage, err
cpuUsage, err := getCgroupCPUHelper("/")
cpuUsage.NumCPU = runtime.NumCPU()
return cpuUsage, err
}

// CPUQuotaToGOMAXPROCS converts the CPU quota applied to the calling process
Expand Down
7 changes: 4 additions & 3 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/pingcap/sysutil"
"github.com/tikv/pd/pkg/audit"
bs "github.com/tikv/pd/pkg/basicserver"
"github.com/tikv/pd/pkg/cgroup"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/encryption"
"github.com/tikv/pd/pkg/errs"
Expand Down Expand Up @@ -237,7 +238,7 @@ type Server struct {
schedulingPrimaryWatcher *etcdutil.LoopWatcher

// Cgroup Monitor
cgmon cgroupMonitor
cgMonitor cgroup.Monitor
}

// HandlerBuilder builds a server HTTP handler.
Expand Down Expand Up @@ -545,7 +546,7 @@ func (s *Server) Close() {

log.Info("closing server")

s.cgmon.stopCgroupMonitor()
s.cgMonitor.StopMonitor()

s.stopServerLoop()
if s.IsAPIServiceMode() {
Expand Down Expand Up @@ -622,7 +623,7 @@ func (s *Server) Run() error {
return err
}

s.cgmon.startCgroupMonitor(s.ctx)
s.cgMonitor.StartMonitor(s.ctx)

failpoint.Inject("delayStartServerLoop", func() {
time.Sleep(2 * time.Second)
Expand Down
Loading