diff --git a/cmd/cli/go.mod b/cmd/cli/go.mod index 4039eb5ad4..82bf062886 100644 --- a/cmd/cli/go.mod +++ b/cmd/cli/go.mod @@ -4,7 +4,7 @@ go 1.20 // direct require ( - github.com/NVIDIA/aistore v1.3.20-0.20230902152208-f41d865152f0 + github.com/NVIDIA/aistore v1.3.20-0.20230902193432-2f3aa85dc3c7 github.com/fatih/color v1.14.1 github.com/json-iterator/go v1.1.12 github.com/onsi/ginkgo v1.16.5 diff --git a/cmd/cli/go.sum b/cmd/cli/go.sum index 766e1b5aac..05ba430f93 100644 --- a/cmd/cli/go.sum +++ b/cmd/cli/go.sum @@ -3,8 +3,8 @@ cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMT code.cloudfoundry.org/bytefmt v0.0.0-20190710193110-1eb035ffe2b6/go.mod h1:wN/zk7mhREp/oviagqUXY3EwuHhWyOvAdsn5Y4CzOrc= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/toml v1.2.1/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= -github.com/NVIDIA/aistore v1.3.20-0.20230902152208-f41d865152f0 h1:RAW8a9ReYZZjKc8qIDLWv/AwMxT6Z7uxFL/4jAF09Z4= -github.com/NVIDIA/aistore v1.3.20-0.20230902152208-f41d865152f0/go.mod h1:tZvUalPk4wL/+5+5psJkZRHBqu3i2KV9g97HYyHvwc4= +github.com/NVIDIA/aistore v1.3.20-0.20230902193432-2f3aa85dc3c7 h1:SPos7zzeYOJPwFL4Egmw1xrpq+b/KwhL60y9XuZD+Hg= +github.com/NVIDIA/aistore v1.3.20-0.20230902193432-2f3aa85dc3c7/go.mod h1:tZvUalPk4wL/+5+5psJkZRHBqu3i2KV9g97HYyHvwc4= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/OneOfOne/xxhash v1.2.8 h1:31czK/TI9sNkxIKfaUfGlU47BAxQ0ztGgd9vPyqimf8= github.com/OneOfOne/xxhash v1.2.8/go.mod h1:eZbhyaAYD41SGSSsnmcpxVoRiQ/MPUTjUdIIOT9Um7Q= diff --git a/stats/common_stats.go b/stats/common_stats.go index 2120749400..f6ca6ef98b 100644 --- a/stats/common_stats.go +++ b/stats/common_stats.go @@ -33,8 +33,6 @@ import ( "github.com/prometheus/client_golang/prometheus" ) -const workChanCapacity = 512 - const ( dfltPeriodicFlushTime = time.Minute // when `config.Log.FlushTime` is 0 (zero) dfltPeriodicTimeStamp = time.Hour // extended date/time complementary to log timestamps (e.g., "11:29:11.644596") @@ -111,7 +109,6 @@ type ( Value int64 `json:"v,string"` numSamples int64 // (log + StatsD) only cumulative int64 - mu sync.RWMutex } copyValue struct { Value int64 `json:"v,string"` @@ -133,22 +130,15 @@ type ( // Prunner and Trunner runner struct { - daemon runnerHost - stopCh chan struct{} - workCh chan cos.NamedVal64 - ticker *time.Ticker - core *coreStats - ctracker copyTracker // to avoid making it at runtime - sorted []string // sorted names - name string // this stats-runner's name - prev string // prev ctracker.write - next int64 // mono.NanoTime() - chanFull atomic.Int64 - // assorted fast-path counters (see reg(... , true)) - fast struct { - n map[string]*int64 - v []int64 - } + daemon runnerHost + stopCh chan struct{} + ticker *time.Ticker + core *coreStats + ctracker copyTracker // to avoid making it at runtime + sorted []string // sorted names + name string // this stats-runner's name + prev string // prev ctracker.write + next int64 // mono.NanoTime() startedUp atomic.Bool } ) @@ -316,37 +306,22 @@ func (s *coreStats) UnmarshalJSON(b []byte) error { return jsoniter.Unmarshal(b, func (s *coreStats) get(name string) (val int64) { v := s.Tracker[name] - switch v.kind { - case KindLatency, KindThroughput: - v.mu.RLock() - val = v.Value - v.mu.RUnlock() - default: - val = ratomic.LoadInt64(&v.Value) - } + val = ratomic.LoadInt64(&v.Value) return } -// NOTE naming convention: ".n" for the count and ".ns" for duration (nanoseconds) func (s *coreStats) update(nv cos.NamedVal64) { v, ok := s.Tracker[nv.Name] debug.Assertf(ok, "invalid metric name %q", nv.Name) switch v.kind { case KindLatency: - v.mu.Lock() - v.numSamples++ - v.cumulative += nv.Value - v.Value += nv.Value - v.mu.Unlock() + ratomic.AddInt64(&v.numSamples, 1) + fallthrough case KindThroughput: - v.mu.Lock() - v.cumulative += nv.Value - v.Value += nv.Value - v.mu.Unlock() + ratomic.AddInt64(&v.cumulative, nv.Value) + ratomic.AddInt64(&v.Value, nv.Value) case KindCounter, KindSize: - // NOTE: not locking (KindCounter isn't compound, making an exception to speed-up) ratomic.AddInt64(&v.Value, nv.Value) - // - non-empty suffix forces an immediate Tx with no aggregation (see below); // - suffix is an arbitrary string that can be defined at runtime; // - e.g. usage: per-mountpath error counters. @@ -362,22 +337,18 @@ func (s *coreStats) update(nv cos.NamedVal64) { // log + StatsD (Prometheus is done separately via `Collect`) func (s *coreStats) copyT(out copyTracker, diskLowUtil ...int64) bool { idle := true + intl := cos.MaxI64(int64(s.statsTime.Seconds()), 1) s.sgl.Reset() for name, v := range s.Tracker { switch v.kind { case KindLatency: var lat int64 - v.mu.Lock() - if v.numSamples > 0 { - lat = v.Value / v.numSamples + if num := ratomic.SwapInt64(&v.numSamples, 0); num > 0 { + lat = ratomic.SwapInt64(&v.Value, 0) / num if !ignore(name) { idle = false } } - v.Value = 0 - v.numSamples = 0 - v.mu.Unlock() - out[name] = copyValue{lat} // NOTE: ns => ms, and not reporting zeros millis := cos.DivRound(lat, int64(time.Millisecond)) @@ -386,17 +357,12 @@ func (s *coreStats) copyT(out copyTracker, diskLowUtil ...int64) bool { } case KindThroughput: var throughput int64 - v.mu.Lock() - if v.Value > 0 { - throughput = v.Value / cos.MaxI64(int64(s.statsTime.Seconds()), 1) + if throughput = ratomic.SwapInt64(&v.Value, 0); throughput > 0 { + throughput /= intl if !ignore(name) { idle = false } - // NOTE: ok to zero-out as we report .cumulative via API - v.Value = 0 } - v.mu.Unlock() - out[name] = copyValue{throughput} if !s.isPrometheus() && throughput > 0 { fv := roundMBs(throughput) @@ -463,16 +429,12 @@ func (s *coreStats) copyCumulative(ctracker copyTracker) { for name, v := range s.Tracker { switch v.kind { case KindLatency: - v.mu.RLock() - ctracker[name] = copyValue{v.cumulative} - v.mu.RUnlock() + ctracker[name] = copyValue{ratomic.LoadInt64(&v.cumulative)} case KindThroughput: - v.mu.RLock() - val := copyValue{v.cumulative} - v.mu.RUnlock() + val := copyValue{ratomic.LoadInt64(&v.cumulative)} ctracker[name] = val - // NOTE: here we effectively add a metric that was never added/updated + // NOTE: effectively, add same-value metric that was never added/updated // via `runner.Add` and friends. Is OK to replace ".bps" suffix // as statsValue.cumulative _is_ the total size (aka, KindSize) n := name[:len(name)-3] + "size" @@ -500,10 +462,12 @@ func (s *coreStats) reset(errorsOnly bool) { for _, v := range s.Tracker { switch v.kind { - case KindLatency, KindThroughput: - v.mu.Lock() - v.Value, v.cumulative = 0, 0 - v.mu.Unlock() + case KindLatency: + ratomic.StoreInt64(&v.numSamples, 0) + fallthrough + case KindThroughput: + ratomic.StoreInt64(&v.Value, 0) + ratomic.StoreInt64(&v.cumulative, 0) case KindCounter, KindSize, KindComputedThroughput, KindGauge: ratomic.StoreInt64(&v.Value, 0) default: // KindSpecial - do nothing @@ -522,15 +486,7 @@ var ( ) func (v *statsValue) MarshalJSON() ([]byte, error) { - var s string - switch v.kind { - case KindLatency, KindThroughput: - v.mu.RLock() - s = strconv.FormatInt(v.Value, 10) - v.mu.RUnlock() - default: - s = strconv.FormatInt(ratomic.LoadInt64(&v.Value), 10) - } + s := strconv.FormatInt(ratomic.LoadInt64(&v.Value), 10) return cos.UnsafeB(s), nil } @@ -615,14 +571,12 @@ var ( ) func (r *runner) GetStats() *Node { - r._fast() ctracker := make(copyTracker, 48) r.core.copyCumulative(ctracker) return &Node{Tracker: ctracker} } func (r *runner) ResetStats(errorsOnly bool) { - r._fast() r.core.reset(errorsOnly) } @@ -637,12 +591,12 @@ func (r *runner) GetMetricNames() cos.StrKVs { // common (target, proxy) metrics func (r *runner) regCommon(node *meta.Snode) { // basic counters - r.reg(node, GetCount, KindCounter, true /* fast */) - r.reg(node, PutCount, KindCounter, true) + r.reg(node, GetCount, KindCounter) + r.reg(node, PutCount, KindCounter) r.reg(node, AppendCount, KindCounter) - r.reg(node, DeleteCount, KindCounter, true) + r.reg(node, DeleteCount, KindCounter) r.reg(node, RenameCount, KindCounter) - r.reg(node, ListCount, KindCounter, true) + r.reg(node, ListCount, KindCounter) // basic error counters, respectively r.reg(node, errPrefix+GetCount, KindCounter) @@ -666,8 +620,9 @@ func (r *runner) regCommon(node *meta.Snode) { r.reg(node, Uptime, KindSpecial) } -// NOTE naming - compare with coreStats.initProm() -func (r *runner) reg(node *meta.Snode, name, kind string, fast ...bool) { +// NOTE naming convention: ".n" for the count and ".ns" for duration (nanoseconds) +// compare with coreStats.initProm() +func (r *runner) reg(node *meta.Snode, name, kind string) { v := &statsValue{kind: kind} // in StatsD metrics ":" delineates the name and the value - replace with underscore switch kind { @@ -704,11 +659,6 @@ func (r *runner) reg(node *meta.Snode, name, kind string, fast ...bool) { } } r.core.Tracker[name] = v - - if len(fast) > 0 && fast[0] { - r.fast.v = append(r.fast.v, int64(0)) - r.fast.n[name] = &r.fast.v[len(r.fast.v)-1] - } } // @@ -716,49 +666,24 @@ func (r *runner) reg(node *meta.Snode, name, kind string, fast ...bool) { // func (r *runner) Add(name string, val int64) { - r.post(cos.NamedVal64{Name: name, Value: val}) + r.core.update(cos.NamedVal64{Name: name, Value: val}) } func (r *runner) Inc(name string) { - r.post(cos.NamedVal64{Name: name, Value: 1}) + r.core.update(cos.NamedVal64{Name: name, Value: 1}) } func (r *runner) IncErr(metric string) { if IsErrMetric(metric) { - r.post(cos.NamedVal64{Name: metric, Value: 1}) + r.core.update(cos.NamedVal64{Name: metric, Value: 1}) } else { // e.g. "err." + GetCount - r.post(cos.NamedVal64{Name: errPrefix + metric, Value: 1}) + r.core.update(cos.NamedVal64{Name: errPrefix + metric, Value: 1}) } } func (r *runner) AddMany(nvs ...cos.NamedVal64) { for _, nv := range nvs { - r.post(nv) - } -} - -func (r *runner) post(nv cos.NamedVal64) { - if pval, ok := r.fast.n[nv.Name]; ok { - ratomic.AddInt64(pval, nv.Value) - return - } - - // otherwise, a regular update path (with an unlikely default clause) - select { - case r.workCh <- nv: - default: - if cnt := r.chanFull.Inc(); cnt < 5 || (cnt%100 == 99 && cmn.FastV(4, cos.SmoduleStats)) { - nlog.ErrorDepth(1, "stats channel full:", nv.Name) - } - } -} - -// apply fast-path counters directly -func (r *runner) _fast() { - for name, pval := range r.fast.n { - if val := ratomic.SwapInt64(pval, 0); val > 0 { - r.core.update(cos.NamedVal64{Name: name, Value: val}) - } + r.core.update(nv) } } @@ -774,7 +699,6 @@ func (r *runner) Collect(ch chan<- prometheus.Metric) { if !r.StartedUp() { return } - r._fast() r.core.promRLock() for name, v := range r.core.Tracker { var ( @@ -841,8 +765,6 @@ func (r *runner) _run(logger statsLogger /* Prunner or Trunner */) error { waitStartup: for { select { - case <-r.workCh: - // Drain workCh until the daemon (proxy or target) starts up. case <-r.stopCh: ticker.Stop() return nil @@ -888,13 +810,7 @@ waitStartup: ) for { select { - case nv, ok := <-r.workCh: - if ok { - r.core.update(nv) - r._fast() - } case <-r.ticker.C: - r._fast() now := mono.NanoTime() config = cmn.GCO.Get() logger.log(now, time.Duration(now-startTime) /*uptime*/, config) diff --git a/stats/proxy_stats.go b/stats/proxy_stats.go index aea6d4e090..f170ceaf19 100644 --- a/stats/proxy_stats.go +++ b/stats/proxy_stats.go @@ -43,8 +43,6 @@ func (r *Prunner) Init(p cluster.Node) *atomic.Bool { r.core = &coreStats{} r.core.init(numProxyStats) - r.runner.fast.n = make(map[string]*int64, 8) - r.runner.fast.v = make([]int64, 0, 8) r.regCommon(p.Snode()) // common metrics @@ -55,7 +53,6 @@ func (r *Prunner) Init(p cluster.Node) *atomic.Bool { r.runner.daemon = p r.runner.stopCh = make(chan struct{}, 4) - r.runner.workCh = make(chan cos.NamedVal64, workChanCapacity) r.core.initMetricClient(p.Snode(), &r.runner) diff --git a/stats/target_stats.go b/stats/target_stats.go index dab3ca93df..e956a92d51 100644 --- a/stats/target_stats.go +++ b/stats/target_stats.go @@ -122,8 +122,6 @@ func (r *Trunner) Init(t cluster.Target) *atomic.Bool { r.core = &coreStats{} r.core.init(numTargetStats) - r.runner.fast.n = make(map[string]*int64, 16) - r.runner.fast.v = make([]int64, 0, 16) r.regCommon(t.Snode()) @@ -138,7 +136,6 @@ func (r *Trunner) Init(t cluster.Target) *atomic.Bool { r.runner.daemon = t r.runner.stopCh = make(chan struct{}, 4) - r.runner.workCh = make(chan cos.NamedVal64, workChanCapacity) r.core.initMetricClient(t.Snode(), &r.runner) @@ -179,8 +176,8 @@ func isDiskUtilMetric(name string) bool { // target-specific metrics, in addition to common and already added via regCommon() func (r *Trunner) RegMetrics(node *meta.Snode) { - r.reg(node, GetColdCount, KindCounter, true /* fast */) - r.reg(node, GetColdSize, KindSize, true) + r.reg(node, GetColdCount, KindCounter) + r.reg(node, GetColdSize, KindSize) r.reg(node, LruEvictCount, KindCounter) r.reg(node, LruEvictSize, KindSize) @@ -197,11 +194,11 @@ func (r *Trunner) RegMetrics(node *meta.Snode) { r.reg(node, PutRedirLatency, KindLatency) // bps - r.reg(node, GetThroughput, KindThroughput, true) - r.reg(node, PutThroughput, KindThroughput, true) + r.reg(node, GetThroughput, KindThroughput) + r.reg(node, PutThroughput, KindThroughput) - r.reg(node, GetSize, KindSize, true) - r.reg(node, PutSize, KindSize, true) + r.reg(node, GetSize, KindSize) + r.reg(node, PutSize, KindSize) // errors r.reg(node, ErrCksumCount, KindCounter) @@ -211,10 +208,10 @@ func (r *Trunner) RegMetrics(node *meta.Snode) { r.reg(node, ErrIOCount, KindCounter) // streams - r.reg(node, StreamsOutObjCount, KindCounter, true) - r.reg(node, StreamsOutObjSize, KindSize, true) - r.reg(node, StreamsInObjCount, KindCounter, true) - r.reg(node, StreamsInObjSize, KindSize, true) + r.reg(node, StreamsOutObjCount, KindCounter) + r.reg(node, StreamsOutObjSize, KindSize) + r.reg(node, StreamsInObjCount, KindCounter) + r.reg(node, StreamsInObjSize, KindSize) // special r.reg(node, RestartCount, KindCounter) @@ -224,12 +221,12 @@ func (r *Trunner) RegMetrics(node *meta.Snode) { r.reg(node, DownloadLatency, KindLatency) // dsort - r.reg(node, DSortCreationReqCount, KindCounter, true) - r.reg(node, DSortCreationRespCount, KindCounter, true) + r.reg(node, DSortCreationReqCount, KindCounter) + r.reg(node, DSortCreationRespCount, KindCounter) r.reg(node, DSortCreationRespLatency, KindLatency) - r.reg(node, DSortExtractShardDskCnt, KindCounter, true) - r.reg(node, DSortExtractShardMemCnt, KindCounter, true) - r.reg(node, DSortExtractShardSize, KindSize, true) + r.reg(node, DSortExtractShardDskCnt, KindCounter) + r.reg(node, DSortExtractShardMemCnt, KindCounter) + r.reg(node, DSortExtractShardSize, KindSize) // Prometheus r.core.initProm(node) @@ -314,6 +311,7 @@ func (r *Trunner) log(now int64, uptime time.Duration, config *cmn.Config) { } // 4. append disk stats to log subject to (idle) filtering + // see related: `ignoreIdle` r.logDiskStats(now) // 5. memory pressure