Skip to content

Commit

Permalink
[usm] reliability, http protocol, benchmark DDSketch pool
Browse files Browse the repository at this point in the history
  • Loading branch information
yuri-lipnesh committed Dec 6, 2024
1 parent a4a88c6 commit 763707c
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 4 deletions.
27 changes: 27 additions & 0 deletions pkg/network/protocols/http/statkeeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,12 @@ import (
"sync"
"time"

"github.com/DataDog/sketches-go/ddsketch"

"github.com/DataDog/datadog-agent/pkg/network/config"
"github.com/DataDog/datadog-agent/pkg/network/usm/utils"
"github.com/DataDog/datadog-agent/pkg/util/log"
ddsync "github.com/DataDog/datadog-agent/pkg/util/sync"
)

// StatKeeper is responsible for aggregating HTTP stats.
Expand All @@ -34,6 +37,9 @@ type StatKeeper struct {
buffer []byte

oversizedLogLimit *log.Limit

// pool of ddsketch objects
ddsketchPool *ddsync.TypedPool[ddsketch.DDSketch]
}

// NewStatkeeper returns a new StatKeeper.
Expand All @@ -59,6 +65,7 @@ func NewStatkeeper(c *config.Config, telemetry *Telemetry, incompleteBuffer Inco
buffer: make([]byte, getPathBufferSize(c)),
telemetry: telemetry,
oversizedLogLimit: log.NewLogLimit(10, time.Minute*10),
ddsketchPool: NewSketchPool(),
}
}

Expand Down Expand Up @@ -88,6 +95,7 @@ func (h *StatKeeper) GetAndResetAllStats() (stats map[Key]*RequestStats) {

// Rotate stats
stats = h.stats
h.releaseSketchPool()
h.stats = make(map[Key]*RequestStats)

// Rotate ConnectionAggregator
Expand All @@ -107,6 +115,7 @@ func (h *StatKeeper) GetAndResetAllStats() (stats map[Key]*RequestStats) {
// Close closes the stat keeper.
func (h *StatKeeper) Close() {
h.oversizedLogLimit.Close()
h.releaseSketchPool()
}

func (h *StatKeeper) add(tx Transaction) {
Expand Down Expand Up @@ -217,3 +226,21 @@ func (h *StatKeeper) clearEphemeralPorts(aggregator *utils.ConnectionAggregator,
stats[key] = aggregation
}
}

// NewSketchPool - creates new pool of DDSketch objects
func NewSketchPool() *ddsync.TypedPool[ddsketch.DDSketch] {
sketchPool := ddsync.NewTypedPool(func() *ddsketch.DDSketch {
sketch, err := ddsketch.NewDefaultDDSketch(RelativeAccuracy)
if err != nil {
log.Debugf("http stats, could not create new ddsketch for pool, error: %v", err)
}
return sketch
})
return sketchPool
}

func (h *StatKeeper) releaseSketchPool() {
for _, stats := range h.stats {
stats.ReleaseSketches()
}
}
27 changes: 24 additions & 3 deletions pkg/network/protocols/http/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/DataDog/datadog-agent/pkg/process/util"
"github.com/DataDog/datadog-agent/pkg/util/intern"
"github.com/DataDog/datadog-agent/pkg/util/log"
ddsync "github.com/DataDog/datadog-agent/pkg/util/sync"
)

// Interner is used to intern strings to save memory allocations.
Expand Down Expand Up @@ -138,7 +139,8 @@ func (r *RequestStat) initSketch() (err error) {

// RequestStats stores HTTP request statistics.
type RequestStats struct {
Data map[uint16]*RequestStat
Data map[uint16]*RequestStat
ddsketchPool *ddsync.TypedPool[ddsketch.DDSketch]
}

// NewRequestStats creates a new RequestStats object.
Expand All @@ -148,6 +150,14 @@ func NewRequestStats() *RequestStats {
}
}

// NewRequestStatsWithPool creates a new RequestStats object.
func NewRequestStatsWithPool(sketchPool *ddsync.TypedPool[ddsketch.DDSketch]) *RequestStats {
return &RequestStats{
Data: make(map[uint16]*RequestStat),
ddsketchPool: sketchPool,
}
}

// isValid checks is the status code is in the range of valid HTTP responses.
func (r *RequestStats) isValid(status uint16) bool {
return status >= 100 && status < 600
Expand Down Expand Up @@ -219,9 +229,10 @@ func (r *RequestStats) AddRequest(statusCode uint16, latency float64, staticTags
stats.FirstLatencySample = latency
return
}

if stats.Latencies == nil {
if err := stats.initSketch(); err != nil {
if r.ddsketchPool != nil {
stats.Latencies = r.ddsketchPool.Get()
} else if err := stats.initSketch(); err != nil {
return
}

Expand All @@ -245,3 +256,13 @@ func (r *RequestStats) HalfAllCounts() {
}
}
}

// ReleaseSketches adds all obtained sketch objects to the pool.
func (r *RequestStats) ReleaseSketches() {
if r.ddsketchPool != nil {
for _, stats := range r.Data {
r.ddsketchPool.Put(stats.Latencies)
stats.Latencies = nil
}
}
}
39 changes: 38 additions & 1 deletion pkg/network/protocols/http/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ package http
import (
"testing"

"github.com/DataDog/sketches-go/ddsketch"
"github.com/stretchr/testify/assert"

"github.com/DataDog/sketches-go/ddsketch"
)

func TestAddRequest(t *testing.T) {
Expand Down Expand Up @@ -76,3 +77,39 @@ func verifyQuantile(t *testing.T, sketch *ddsketch.DDSketch, q float64, expected
assert.True(t, val >= expectedValue-acceptableError)
assert.True(t, val <= expectedValue+acceptableError)
}

// BenchmarkHttpRequestsNoPool generates stats requests
func BenchmarkHttpRequestsNoPool(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()

for i := 0; i < b.N; i++ {
stats := NewRequestStats()
// run over expected code range from 100 to 600
for n := 100; n < 600; n++ {
var code = uint16(n)
stats.AddRequest(code, 5.0, 1, nil)
// call twice to trigger allocation of DDSketch
stats.AddRequest(code, 10.0, 1, nil)
}
}
b.StopTimer()
}

// BenchmarkHttpRequestsWithPool generates stats requests using pool of DDSketch objects
func BenchmarkHttpRequestsWithPool(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()

pool := NewSketchPool()
for i := 0; i < b.N; i++ {
stats := NewRequestStatsWithPool(pool)
for n := 100; n < 600; n++ {
var code = uint16(n)
stats.AddRequest(code, 5.0, 1, nil)
// call twice to trigger allocation of DDSketch
stats.AddRequest(code, 10.0, 1, nil)
}
}
b.StopTimer()
}

0 comments on commit 763707c

Please sign in to comment.