Skip to content

Commit

Permalink
core stats: remove mutex and work chan (speed up)
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Sep 3, 2023
1 parent 2f3aa85 commit 99ab90c
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 149 deletions.
2 changes: 1 addition & 1 deletion cmd/cli/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.20

// direct
require (
github.com/NVIDIA/aistore v1.3.20-0.20230902152208-f41d865152f0
github.com/NVIDIA/aistore v1.3.20-0.20230902193432-2f3aa85dc3c7
github.com/fatih/color v1.14.1
github.com/json-iterator/go v1.1.12
github.com/onsi/ginkgo v1.16.5
Expand Down
4 changes: 2 additions & 2 deletions cmd/cli/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMT
code.cloudfoundry.org/bytefmt v0.0.0-20190710193110-1eb035ffe2b6/go.mod h1:wN/zk7mhREp/oviagqUXY3EwuHhWyOvAdsn5Y4CzOrc=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/toml v1.2.1/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
github.com/NVIDIA/aistore v1.3.20-0.20230902152208-f41d865152f0 h1:RAW8a9ReYZZjKc8qIDLWv/AwMxT6Z7uxFL/4jAF09Z4=
github.com/NVIDIA/aistore v1.3.20-0.20230902152208-f41d865152f0/go.mod h1:tZvUalPk4wL/+5+5psJkZRHBqu3i2KV9g97HYyHvwc4=
github.com/NVIDIA/aistore v1.3.20-0.20230902193432-2f3aa85dc3c7 h1:SPos7zzeYOJPwFL4Egmw1xrpq+b/KwhL60y9XuZD+Hg=
github.com/NVIDIA/aistore v1.3.20-0.20230902193432-2f3aa85dc3c7/go.mod h1:tZvUalPk4wL/+5+5psJkZRHBqu3i2KV9g97HYyHvwc4=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/OneOfOne/xxhash v1.2.8 h1:31czK/TI9sNkxIKfaUfGlU47BAxQ0ztGgd9vPyqimf8=
github.com/OneOfOne/xxhash v1.2.8/go.mod h1:eZbhyaAYD41SGSSsnmcpxVoRiQ/MPUTjUdIIOT9Um7Q=
Expand Down
166 changes: 41 additions & 125 deletions stats/common_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ import (
"github.com/prometheus/client_golang/prometheus"
)

const workChanCapacity = 512

const (
dfltPeriodicFlushTime = time.Minute // when `config.Log.FlushTime` is 0 (zero)
dfltPeriodicTimeStamp = time.Hour // extended date/time complementary to log timestamps (e.g., "11:29:11.644596")
Expand Down Expand Up @@ -111,7 +109,6 @@ type (
Value int64 `json:"v,string"`
numSamples int64 // (log + StatsD) only
cumulative int64
mu sync.RWMutex
}
copyValue struct {
Value int64 `json:"v,string"`
Expand All @@ -133,22 +130,15 @@ type (

// Prunner and Trunner
runner struct {
daemon runnerHost
stopCh chan struct{}
workCh chan cos.NamedVal64
ticker *time.Ticker
core *coreStats
ctracker copyTracker // to avoid making it at runtime
sorted []string // sorted names
name string // this stats-runner's name
prev string // prev ctracker.write
next int64 // mono.NanoTime()
chanFull atomic.Int64
// assorted fast-path counters (see reg(... , true))
fast struct {
n map[string]*int64
v []int64
}
daemon runnerHost
stopCh chan struct{}
ticker *time.Ticker
core *coreStats
ctracker copyTracker // to avoid making it at runtime
sorted []string // sorted names
name string // this stats-runner's name
prev string // prev ctracker.write
next int64 // mono.NanoTime()
startedUp atomic.Bool
}
)
Expand Down Expand Up @@ -316,37 +306,22 @@ func (s *coreStats) UnmarshalJSON(b []byte) error { return jsoniter.Unmarshal(b,

func (s *coreStats) get(name string) (val int64) {
v := s.Tracker[name]
switch v.kind {
case KindLatency, KindThroughput:
v.mu.RLock()
val = v.Value
v.mu.RUnlock()
default:
val = ratomic.LoadInt64(&v.Value)
}
val = ratomic.LoadInt64(&v.Value)
return
}

// NOTE naming convention: ".n" for the count and ".ns" for duration (nanoseconds)
func (s *coreStats) update(nv cos.NamedVal64) {
v, ok := s.Tracker[nv.Name]
debug.Assertf(ok, "invalid metric name %q", nv.Name)
switch v.kind {
case KindLatency:
v.mu.Lock()
v.numSamples++
v.cumulative += nv.Value
v.Value += nv.Value
v.mu.Unlock()
ratomic.AddInt64(&v.numSamples, 1)
fallthrough
case KindThroughput:
v.mu.Lock()
v.cumulative += nv.Value
v.Value += nv.Value
v.mu.Unlock()
ratomic.AddInt64(&v.cumulative, nv.Value)
ratomic.AddInt64(&v.Value, nv.Value)
case KindCounter, KindSize:
// NOTE: not locking (KindCounter isn't compound, making an exception to speed-up)
ratomic.AddInt64(&v.Value, nv.Value)

// - non-empty suffix forces an immediate Tx with no aggregation (see below);
// - suffix is an arbitrary string that can be defined at runtime;
// - e.g. usage: per-mountpath error counters.
Expand All @@ -362,22 +337,18 @@ func (s *coreStats) update(nv cos.NamedVal64) {
// log + StatsD (Prometheus is done separately via `Collect`)
func (s *coreStats) copyT(out copyTracker, diskLowUtil ...int64) bool {
idle := true
intl := cos.MaxI64(int64(s.statsTime.Seconds()), 1)
s.sgl.Reset()
for name, v := range s.Tracker {
switch v.kind {
case KindLatency:
var lat int64
v.mu.Lock()
if v.numSamples > 0 {
lat = v.Value / v.numSamples
if num := ratomic.SwapInt64(&v.numSamples, 0); num > 0 {
lat = ratomic.SwapInt64(&v.Value, 0) / num
if !ignore(name) {
idle = false
}
}
v.Value = 0
v.numSamples = 0
v.mu.Unlock()

out[name] = copyValue{lat}
// NOTE: ns => ms, and not reporting zeros
millis := cos.DivRound(lat, int64(time.Millisecond))
Expand All @@ -386,17 +357,12 @@ func (s *coreStats) copyT(out copyTracker, diskLowUtil ...int64) bool {
}
case KindThroughput:
var throughput int64
v.mu.Lock()
if v.Value > 0 {
throughput = v.Value / cos.MaxI64(int64(s.statsTime.Seconds()), 1)
if throughput = ratomic.SwapInt64(&v.Value, 0); throughput > 0 {
throughput /= intl
if !ignore(name) {
idle = false
}
// NOTE: ok to zero-out as we report .cumulative via API
v.Value = 0
}
v.mu.Unlock()

out[name] = copyValue{throughput}
if !s.isPrometheus() && throughput > 0 {
fv := roundMBs(throughput)
Expand Down Expand Up @@ -463,16 +429,12 @@ func (s *coreStats) copyCumulative(ctracker copyTracker) {
for name, v := range s.Tracker {
switch v.kind {
case KindLatency:
v.mu.RLock()
ctracker[name] = copyValue{v.cumulative}
v.mu.RUnlock()
ctracker[name] = copyValue{ratomic.LoadInt64(&v.cumulative)}
case KindThroughput:
v.mu.RLock()
val := copyValue{v.cumulative}
v.mu.RUnlock()
val := copyValue{ratomic.LoadInt64(&v.cumulative)}
ctracker[name] = val

// NOTE: here we effectively add a metric that was never added/updated
// NOTE: effectively, add same-value metric that was never added/updated
// via `runner.Add` and friends. Is OK to replace ".bps" suffix
// as statsValue.cumulative _is_ the total size (aka, KindSize)
n := name[:len(name)-3] + "size"
Expand Down Expand Up @@ -500,10 +462,12 @@ func (s *coreStats) reset(errorsOnly bool) {

for _, v := range s.Tracker {
switch v.kind {
case KindLatency, KindThroughput:
v.mu.Lock()
v.Value, v.cumulative = 0, 0
v.mu.Unlock()
case KindLatency:
ratomic.StoreInt64(&v.numSamples, 0)
fallthrough
case KindThroughput:
ratomic.StoreInt64(&v.Value, 0)
ratomic.StoreInt64(&v.cumulative, 0)
case KindCounter, KindSize, KindComputedThroughput, KindGauge:
ratomic.StoreInt64(&v.Value, 0)
default: // KindSpecial - do nothing
Expand All @@ -522,15 +486,7 @@ var (
)

func (v *statsValue) MarshalJSON() ([]byte, error) {
var s string
switch v.kind {
case KindLatency, KindThroughput:
v.mu.RLock()
s = strconv.FormatInt(v.Value, 10)
v.mu.RUnlock()
default:
s = strconv.FormatInt(ratomic.LoadInt64(&v.Value), 10)
}
s := strconv.FormatInt(ratomic.LoadInt64(&v.Value), 10)
return cos.UnsafeB(s), nil
}

Expand Down Expand Up @@ -615,14 +571,12 @@ var (
)

func (r *runner) GetStats() *Node {
r._fast()
ctracker := make(copyTracker, 48)
r.core.copyCumulative(ctracker)
return &Node{Tracker: ctracker}
}

func (r *runner) ResetStats(errorsOnly bool) {
r._fast()
r.core.reset(errorsOnly)
}

Expand All @@ -637,12 +591,12 @@ func (r *runner) GetMetricNames() cos.StrKVs {
// common (target, proxy) metrics
func (r *runner) regCommon(node *meta.Snode) {
// basic counters
r.reg(node, GetCount, KindCounter, true /* fast */)
r.reg(node, PutCount, KindCounter, true)
r.reg(node, GetCount, KindCounter)
r.reg(node, PutCount, KindCounter)
r.reg(node, AppendCount, KindCounter)
r.reg(node, DeleteCount, KindCounter, true)
r.reg(node, DeleteCount, KindCounter)
r.reg(node, RenameCount, KindCounter)
r.reg(node, ListCount, KindCounter, true)
r.reg(node, ListCount, KindCounter)

// basic error counters, respectively
r.reg(node, errPrefix+GetCount, KindCounter)
Expand All @@ -666,8 +620,9 @@ func (r *runner) regCommon(node *meta.Snode) {
r.reg(node, Uptime, KindSpecial)
}

// NOTE naming - compare with coreStats.initProm()
func (r *runner) reg(node *meta.Snode, name, kind string, fast ...bool) {
// NOTE naming convention: ".n" for the count and ".ns" for duration (nanoseconds)
// compare with coreStats.initProm()
func (r *runner) reg(node *meta.Snode, name, kind string) {
v := &statsValue{kind: kind}
// in StatsD metrics ":" delineates the name and the value - replace with underscore
switch kind {
Expand Down Expand Up @@ -704,61 +659,31 @@ func (r *runner) reg(node *meta.Snode, name, kind string, fast ...bool) {
}
}
r.core.Tracker[name] = v

if len(fast) > 0 && fast[0] {
r.fast.v = append(r.fast.v, int64(0))
r.fast.n[name] = &r.fast.v[len(r.fast.v)-1]
}
}

//
// as cos.StatsUpdater
//

func (r *runner) Add(name string, val int64) {
r.post(cos.NamedVal64{Name: name, Value: val})
r.core.update(cos.NamedVal64{Name: name, Value: val})
}

func (r *runner) Inc(name string) {
r.post(cos.NamedVal64{Name: name, Value: 1})
r.core.update(cos.NamedVal64{Name: name, Value: 1})
}

func (r *runner) IncErr(metric string) {
if IsErrMetric(metric) {
r.post(cos.NamedVal64{Name: metric, Value: 1})
r.core.update(cos.NamedVal64{Name: metric, Value: 1})
} else { // e.g. "err." + GetCount
r.post(cos.NamedVal64{Name: errPrefix + metric, Value: 1})
r.core.update(cos.NamedVal64{Name: errPrefix + metric, Value: 1})
}
}

func (r *runner) AddMany(nvs ...cos.NamedVal64) {
for _, nv := range nvs {
r.post(nv)
}
}

func (r *runner) post(nv cos.NamedVal64) {
if pval, ok := r.fast.n[nv.Name]; ok {
ratomic.AddInt64(pval, nv.Value)
return
}

// otherwise, a regular update path (with an unlikely default clause)
select {
case r.workCh <- nv:
default:
if cnt := r.chanFull.Inc(); cnt < 5 || (cnt%100 == 99 && cmn.FastV(4, cos.SmoduleStats)) {
nlog.ErrorDepth(1, "stats channel full:", nv.Name)
}
}
}

// apply fast-path counters directly
func (r *runner) _fast() {
for name, pval := range r.fast.n {
if val := ratomic.SwapInt64(pval, 0); val > 0 {
r.core.update(cos.NamedVal64{Name: name, Value: val})
}
r.core.update(nv)
}
}

Expand All @@ -774,7 +699,6 @@ func (r *runner) Collect(ch chan<- prometheus.Metric) {
if !r.StartedUp() {
return
}
r._fast()
r.core.promRLock()
for name, v := range r.core.Tracker {
var (
Expand Down Expand Up @@ -841,8 +765,6 @@ func (r *runner) _run(logger statsLogger /* Prunner or Trunner */) error {
waitStartup:
for {
select {
case <-r.workCh:
// Drain workCh until the daemon (proxy or target) starts up.
case <-r.stopCh:
ticker.Stop()
return nil
Expand Down Expand Up @@ -888,13 +810,7 @@ waitStartup:
)
for {
select {
case nv, ok := <-r.workCh:
if ok {
r.core.update(nv)
r._fast()
}
case <-r.ticker.C:
r._fast()
now := mono.NanoTime()
config = cmn.GCO.Get()
logger.log(now, time.Duration(now-startTime) /*uptime*/, config)
Expand Down
3 changes: 0 additions & 3 deletions stats/proxy_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ func (r *Prunner) Init(p cluster.Node) *atomic.Bool {
r.core = &coreStats{}

r.core.init(numProxyStats)
r.runner.fast.n = make(map[string]*int64, 8)
r.runner.fast.v = make([]int64, 0, 8)

r.regCommon(p.Snode()) // common metrics

Expand All @@ -55,7 +53,6 @@ func (r *Prunner) Init(p cluster.Node) *atomic.Bool {
r.runner.daemon = p

r.runner.stopCh = make(chan struct{}, 4)
r.runner.workCh = make(chan cos.NamedVal64, workChanCapacity)

r.core.initMetricClient(p.Snode(), &r.runner)

Expand Down
Loading

0 comments on commit 99ab90c

Please sign in to comment.