Skip to content

Commit

Permalink
core: add node-is-idle interface & impl
Browse files Browse the repository at this point in the history
* (util, load) parameterized

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Oct 18, 2024
1 parent cc010a1 commit 0d55285
Show file tree
Hide file tree
Showing 8 changed files with 140 additions and 82 deletions.
10 changes: 10 additions & 0 deletions ais/htrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/NVIDIA/aistore/core/meta"
"github.com/NVIDIA/aistore/memsys"
"github.com/NVIDIA/aistore/stats"
"github.com/NVIDIA/aistore/sys"
"github.com/NVIDIA/aistore/xact/xreg"
jsoniter "github.com/json-iterator/go"
"github.com/prometheus/client_golang/prometheus/promhttp"
Expand Down Expand Up @@ -90,6 +91,15 @@ type htrun struct {
// interface guard
var _ core.Node = (*htrun)(nil)

func (*htrun) IsIdle(_ int64, load float64) bool {
avg, err := sys.LoadAverage()
if err != nil {
nlog.Errorln(err)
return false
}
return avg.One < load && avg.Five < load
}

func (h *htrun) Snode() *meta.Snode { return h.si }
func (h *htrun) callerName() string { return h.si.String() }
func (h *htrun) SID() string { return h.si.ID() }
Expand Down
5 changes: 5 additions & 0 deletions ais/tgtimpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ func (*target) GetAllRunning(inout *core.AllRunningInOut, periodic bool) {
xreg.GetAllRunning(inout, periodic)
}

// TODO: consider adding 'no-running-jobs'
func (t *target) IsIdle(util int64, load float64) bool {
return t.htrun.IsIdle(util, load) && fs.DisksIdle(util)
}

func (t *target) Health(si *meta.Snode, timeout time.Duration, query url.Values) ([]byte, int, error) {
return t.reqHealth(si, timeout, query, t.owner.smap.get(), false /*retry*/)
}
Expand Down
17 changes: 9 additions & 8 deletions core/mock/target_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,15 @@ func NewTarget(bo meta.Bowner) *TargetMock {
func (t *TargetMock) Bowner() meta.Bowner { return t.BO }
func (t *TargetMock) Sowner() meta.Sowner { return t.SO }

func (*TargetMock) SID() string { return mockID }
func (*TargetMock) String() string { return "tmock" }
func (*TargetMock) Snode() *meta.Snode { return &meta.Snode{DaeID: mockID} }
func (*TargetMock) ClusterStarted() bool { return true }
func (*TargetMock) NodeStarted() bool { return true }
func (*TargetMock) DataClient() *http.Client { return http.DefaultClient }
func (*TargetMock) PageMM() *memsys.MMSA { return memsys.PageMM() }
func (*TargetMock) ByteMM() *memsys.MMSA { return memsys.ByteMM() }
func (*TargetMock) SID() string { return mockID }
func (*TargetMock) String() string { return "tmock" }
func (*TargetMock) Snode() *meta.Snode { return &meta.Snode{DaeID: mockID} }
func (*TargetMock) ClusterStarted() bool { return true }
func (*TargetMock) NodeStarted() bool { return true }
func (*TargetMock) IsIdle(int64, float64) bool { return true }
func (*TargetMock) DataClient() *http.Client { return http.DefaultClient }
func (*TargetMock) PageMM() *memsys.MMSA { return memsys.PageMM() }
func (*TargetMock) ByteMM() *memsys.MMSA { return memsys.ByteMM() }

func (*TargetMock) GetAllRunning(*core.AllRunningInOut, bool) {}
func (*TargetMock) PutObject(*core.LOM, *core.PutParams) error { return nil }
Expand Down
2 changes: 2 additions & 0 deletions core/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ type (
// Memory allocators
PageMM() *memsys.MMSA
ByteMM() *memsys.MMSA

IsIdle(util int64, load float64) bool
}
)

Expand Down
160 changes: 94 additions & 66 deletions fs/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,10 @@ func (mi *Mountpath) String() string {
return mi.info[:l-1] + ", waiting-dd]"
}

func (mi *Mountpath) IsIdle(config *cmn.Config) bool {
// (see also: DisksIdle)
func (mi *Mountpath) IsIdle(config *cmn.Config, util int64) bool {
curr := mfs.ios.GetMpathUtil(mi.Path)
return curr >= 0 && curr < config.Disk.DiskUtilLowWM
return curr >= 0 && curr < min(config.Disk.DiskUtilLowWM, util)
}

func (mi *Mountpath) IsAvail() bool {
Expand Down Expand Up @@ -795,7 +796,7 @@ func Remove(mpath string, cb ...func()) (*Mountpath, error) {
} else {
nlog.Infof("removed mountpath %s (remain available: %d)", mi, availCnt)
}
moveMarkers(availableCopy, mi)
_moveMarkers(availableCopy, mi)
putAvailMPI(availableCopy)
if availCnt > 0 && len(cb) > 0 {
cb[0]()
Expand Down Expand Up @@ -875,7 +876,7 @@ func Disable(mpath string, cb ...func()) (disabledMpath *Mountpath, err error) {
mfs.ios.RemoveMpath(cleanMpath, config.TestingEnv())
delete(availableCopy, cleanMpath)
delete(mfs.fsIDs, mi.FsID)
moveMarkers(availableCopy, mi)
_moveMarkers(availableCopy, mi)
PutMPI(availableCopy, disabledCopy)
if l := len(availableCopy); l == 0 {
nlog.Errorf("disabled the last available mountpath %s", mi)
Expand All @@ -894,6 +895,50 @@ func Disable(mpath string, cb ...func()) (disabledMpath *Mountpath, err error) {
return nil, cmn.NewErrMpathNotFound(mpath, "" /*fqn*/, false /*disabled*/)
}

func _moveMarkers(avail MPI, from *Mountpath) {
var (
fromPath = filepath.Join(from.Path, fname.MarkersDir)
finfos, err = os.ReadDir(fromPath)
)
if err != nil {
if !os.IsNotExist(err) {
nlog.Errorf("Failed to read markers' dir %q: %v", fromPath, err)
}
return
}
if len(finfos) == 0 {
return // no markers, nothing to do
}

// NOTE: `from` path must no longer be in the available mountpaths
_, ok := avail[from.Path]
debug.Assert(!ok, from.String())
for _, mi := range avail {
ok = true
for _, fi := range finfos {
debug.Assert(!fi.IsDir(), fname.MarkersDir+cos.PathSeparator+fi.Name()) // marker is a file
var (
fromPath = filepath.Join(from.Path, fname.MarkersDir, fi.Name())
toPath = filepath.Join(mi.Path, fname.MarkersDir, fi.Name())
)
_, _, err := cos.CopyFile(fromPath, toPath, nil, cos.ChecksumNone)
if err != nil && os.IsNotExist(err) {
nlog.Errorf("Failed to move marker %q to %q: %v)", fromPath, toPath, err)
mfs.hc.FSHC(err, mi, "")
ok = false
}
}
if ok {
break
}
}
from.ClearMDs(true /*inclBMD*/)
}

//
// avail & disabled
//

func NumAvail() int {
avail := GetAvail()
return len(avail)
Expand Down Expand Up @@ -922,6 +967,10 @@ func getDisabled() MPI {
return *disabled
}

//
// buckets
//

func CreateBucket(bck *cmn.Bck, nilbmd bool) (errs []error) {
var (
avail = GetAvail()
Expand Down Expand Up @@ -1013,49 +1062,10 @@ func RenameBucketDirs(bckFrom, bckTo *cmn.Bck) (err error) {
return
}

func moveMarkers(avail MPI, from *Mountpath) {
var (
fromPath = filepath.Join(from.Path, fname.MarkersDir)
finfos, err = os.ReadDir(fromPath)
)
if err != nil {
if !os.IsNotExist(err) {
nlog.Errorf("Failed to read markers' dir %q: %v", fromPath, err)
}
return
}
if len(finfos) == 0 {
return // no markers, nothing to do
}

// NOTE: `from` path must no longer be in the available mountpaths
_, ok := avail[from.Path]
debug.Assert(!ok, from.String())
for _, mi := range avail {
ok = true
for _, fi := range finfos {
debug.Assert(!fi.IsDir(), fname.MarkersDir+cos.PathSeparator+fi.Name()) // marker is a file
var (
fromPath = filepath.Join(from.Path, fname.MarkersDir, fi.Name())
toPath = filepath.Join(mi.Path, fname.MarkersDir, fi.Name())
)
_, _, err := cos.CopyFile(fromPath, toPath, nil, cos.ChecksumNone)
if err != nil && os.IsNotExist(err) {
nlog.Errorf("Failed to move marker %q to %q: %v)", fromPath, toPath, err)
mfs.hc.FSHC(err, mi, "")
ok = false
}
}
if ok {
break
}
}
from.ClearMDs(true /*inclBMD*/)
}

// load node ID
//
// load node ID - traverses all mountpaths to load and validate
//

// traverses all mountpaths to load and validate node ID
func LoadNodeID(mpaths cos.StrKVs) (mDaeID string, err error) {
for mp := range mpaths {
daeID, err := _loadXattrID(mp)
Expand Down Expand Up @@ -1168,6 +1178,19 @@ func DiskStats(allds ios.AllDiskStats, tcdf *Tcdf, config *cmn.Config, refreshCa
}
}

func DisksIdle(util int64) bool {
var (
config = cmn.GCO.Get()
avail = GetAvail()
)
for _, mi := range avail {
if !mi.IsIdle(config, util) {
return false
}
}
return true
}

//
// cap status: get, refresh, periodic
//
Expand Down Expand Up @@ -1322,26 +1345,6 @@ func CapStatusGetWhat() (fsInfo apc.CapacityInfo) {
return
}

/////////
// MPI //
/////////

func (mpi MPI) String() string {
return fmt.Sprintf("%v", mpi.toSlice())
}

func (mpi MPI) toSlice() []string {
var (
paths = make([]string, len(mpi))
idx int
)
for key := range mpi {
paths[idx] = key
idx++
}
return paths
}

///////////////
// CapStatus //
///////////////
Expand Down Expand Up @@ -1395,3 +1398,28 @@ func (cs *CapStatus) _next(config *cmn.Config) time.Duration {
ratio := (util - umin) * 100 / (umax - umin)
return time.Duration(100-ratio)*(tmax-tmin)/100 + tmin
}

/////////
// MPI //
/////////

func (mpi MPI) String() string {
var (
sb strings.Builder
i, l int
)
for key := range mpi {
l += len(key) + 1
}
sb.Grow(l + 2)
sb.WriteByte('[')
for key := range mpi {
sb.WriteString(key)
i++
if i < l-1 {
sb.WriteByte(' ')
}
}
sb.WriteByte(']')
return sb.String()
}
2 changes: 1 addition & 1 deletion memsys/housekeep_mm.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func (r *MMSA) freeIdle() (total int64) {
func (r *MMSA) freeMemToOS(mingc int64, force bool) {
avg, err := sys.LoadAverage()
if err != nil {
nlog.Errorf("Failed to load averages: %v", err) // (unlikely)
nlog.Errorln(err) // (unlikely)
avg.One = 999
}
togc := r.toGC.Load()
Expand Down
2 changes: 1 addition & 1 deletion space/lru.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ func (j *lruJ) postRemove(prev, size int64) (capCheck int64, err error) {
}

func (j *lruJ) _throttle(usedPct int64) (err error) {
if j.mi.IsIdle(j.config) {
if j.mi.IsIdle(j.config, j.config.Disk.DiskUtilLowWM) {
return
}
// throttle self
Expand Down
24 changes: 18 additions & 6 deletions sys/cpu_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package sys

import (
"errors"
"fmt"
"io"
"runtime"
"strconv"
Expand Down Expand Up @@ -73,13 +74,22 @@ func containerNumCPU() (int, error) {
return int(max(approx, 1)), nil
}

// LoadAverage returns the system load average
func LoadAverage() (avg LoadAvg, err error) {
avg = LoadAvg{}
//
// load averages
//

type errLoadAvg struct {
err error
}

func (e *errLoadAvg) Error() string {
return fmt.Sprint("failed to load averages: ", e.err)
}

func LoadAverage() (avg LoadAvg, _ error) {
line, err := cos.ReadOneLine(hostLoadAvgPath)
if err != nil {
return avg, err
return avg, &errLoadAvg{err}
}

fields := strings.Fields(line)
Expand All @@ -90,6 +100,8 @@ func LoadAverage() (avg LoadAvg, err error) {
if err == nil {
avg.Fifteen, err = strconv.ParseFloat(fields[2], 64)
}

return avg, err
if err == nil {
return avg, nil
}
return avg, &errLoadAvg{err}
}

0 comments on commit 0d55285

Please sign in to comment.