diff --git a/pkg/network/protocols/http/statkeeper.go b/pkg/network/protocols/http/statkeeper.go index 96192cc95b5de..030f49a02627d 100644 --- a/pkg/network/protocols/http/statkeeper.go +++ b/pkg/network/protocols/http/statkeeper.go @@ -38,7 +38,7 @@ type StatKeeper struct { oversizedLogLimit *log.Limit - // pool of ddsketch objects + // pool of 'DDSketch' objects ddsketchPool *ddsync.TypedPool[ddsketch.DDSketch] } @@ -65,7 +65,7 @@ func NewStatkeeper(c *config.Config, telemetry *Telemetry, incompleteBuffer Inco buffer: make([]byte, getPathBufferSize(c)), telemetry: telemetry, oversizedLogLimit: log.NewLogLimit(10, time.Minute*10), - ddsketchPool: NewSketchPool(), + ddsketchPool: newSketchPool(), } } @@ -166,6 +166,7 @@ func (h *StatKeeper) add(tx Transaction) { } h.telemetry.aggregations.Add(1) stats = NewRequestStats() + stats.SketchPool = h.ddsketchPool h.stats[key] = stats } @@ -227,8 +228,8 @@ func (h *StatKeeper) clearEphemeralPorts(aggregator *utils.ConnectionAggregator, } } -// NewSketchPool - creates new pool of DDSketch objects -func NewSketchPool() *ddsync.TypedPool[ddsketch.DDSketch] { +// newSketchPool creates new pool of DDSketch objects. +func newSketchPool() *ddsync.TypedPool[ddsketch.DDSketch] { sketchPool := ddsync.NewTypedPool(func() *ddsketch.DDSketch { sketch, err := ddsketch.NewDefaultDDSketch(RelativeAccuracy) if err != nil { @@ -239,8 +240,9 @@ func NewSketchPool() *ddsync.TypedPool[ddsketch.DDSketch] { return sketchPool } +// releaseSketchPool put DDSketch objects to pool. func (h *StatKeeper) releaseSketchPool() { for _, stats := range h.stats { - stats.ReleaseSketches() + stats.PutSketches() } } diff --git a/pkg/network/protocols/http/statkeeper_test.go b/pkg/network/protocols/http/statkeeper_test.go index 8ae60da8b5859..773c208d343ed 100644 --- a/pkg/network/protocols/http/statkeeper_test.go +++ b/pkg/network/protocols/http/statkeeper_test.go @@ -322,3 +322,78 @@ func TestHTTPCorrectness(t *testing.T) { require.Len(t, stats, 0) }) } + +func makeStatkeeper() *StatKeeper { + cfg := config.New() + cfg.MaxHTTPStatsBuffered = 100000 + tel := NewTelemetry("http") + return NewStatkeeper(cfg, tel, NewIncompleteBuffer(cfg, tel)) +} + +// BenchmarkHTTPStatkeeperWithPool benchmark allocations with pool of 'DDSketch' objects +func BenchmarkHTTPStatkeeperWithPool(b *testing.B) { + sk := makeStatkeeper() + + sourceIP := util.AddressFromString("1.1.1.1") + sourcePort := 1234 + destIP := util.AddressFromString("2.2.2.2") + destPort := 8080 + + const numPaths = 10000 + const uniqPaths = 50 + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + sk.GetAndResetAllStats() + for p := 0; p < numPaths; p++ { + b.StopTimer() + //we use subset of unique endpoints, but those will occur over and over again like in regular target application + path := "/testpath/blablabla/dsadas/isdaasd/asdasadsadasd" + strconv.Itoa(p%uniqPaths) + //we simulate different conn tuples by increasing the port number + newSourcePort := sourcePort + (p % 30) + statusCode := (i%5 + 1) * 100 + latency := time.Duration(i%5+1) * time.Millisecond + tx := generateIPv4HTTPTransaction(sourceIP, destIP, newSourcePort, destPort, path, statusCode, latency) + b.StartTimer() + sk.Process(tx) + } + } + b.StopTimer() +} + +// BenchmarkHTTPStatkeeperNoPool benchmark allocations without pool of 'DDSketch' objects +func BenchmarkHTTPStatkeeperNoPool(b *testing.B) { + sk := makeStatkeeper() + sk.DisableSketchPool() + + sourceIP := util.AddressFromString("1.1.1.1") + sourcePort := 1234 + destIP := util.AddressFromString("2.2.2.2") + destPort := 8080 + + const numPaths = 10000 + const uniqPaths = 50 + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + sk.GetAndResetAllStats() + for p := 0; p < numPaths; p++ { + b.StopTimer() + //we use subset of unique endpoints, but those will occur over and over again like in regular target application + path := "/testpath/blablabla/dsadas/isdaasd/asdasadsadasd" + strconv.Itoa(p%uniqPaths) + //we simulate different conn tuples by increasing the port number + newSourcePort := sourcePort + (p % 30) + statusCode := (i%5 + 1) * 100 + latency := time.Duration(i%5+1) * time.Millisecond + tx := generateIPv4HTTPTransaction(sourceIP, destIP, newSourcePort, destPort, path, statusCode, latency) + b.StartTimer() + sk.Process(tx) + } + } + b.StopTimer() +} + +// DisableSketchPool disable pool of 'DDSketch' objects for testing purpose. +func (h *StatKeeper) DisableSketchPool() { + h.ddsketchPool = nil +} diff --git a/pkg/network/protocols/http/stats.go b/pkg/network/protocols/http/stats.go index 1dd262cbde9ce..a6eef775ca69d 100644 --- a/pkg/network/protocols/http/stats.go +++ b/pkg/network/protocols/http/stats.go @@ -139,8 +139,8 @@ func (r *RequestStat) initSketch() (err error) { // RequestStats stores HTTP request statistics. type RequestStats struct { - Data map[uint16]*RequestStat - ddsketchPool *ddsync.TypedPool[ddsketch.DDSketch] + Data map[uint16]*RequestStat + SketchPool *ddsync.TypedPool[ddsketch.DDSketch] } // NewRequestStats creates a new RequestStats object. @@ -150,14 +150,6 @@ func NewRequestStats() *RequestStats { } } -// NewRequestStatsWithPool creates a new RequestStats object. -func NewRequestStatsWithPool(sketchPool *ddsync.TypedPool[ddsketch.DDSketch]) *RequestStats { - return &RequestStats{ - Data: make(map[uint16]*RequestStat), - ddsketchPool: sketchPool, - } -} - // isValid checks is the status code is in the range of valid HTTP responses. func (r *RequestStats) isValid(status uint16) bool { return status >= 100 && status < 600 @@ -230,8 +222,8 @@ func (r *RequestStats) AddRequest(statusCode uint16, latency float64, staticTags return } if stats.Latencies == nil { - if r.ddsketchPool != nil { - stats.Latencies = r.ddsketchPool.Get() + if r.SketchPool != nil { + stats.Latencies = r.SketchPool.Get() } else if err := stats.initSketch(); err != nil { return } @@ -257,11 +249,11 @@ func (r *RequestStats) HalfAllCounts() { } } -// ReleaseSketches adds all obtained sketch objects to the pool. -func (r *RequestStats) ReleaseSketches() { - if r.ddsketchPool != nil { +// PutSketches adds all obtained sketch objects to the pool. +func (r *RequestStats) PutSketches() { + if r.SketchPool != nil { for _, stats := range r.Data { - r.ddsketchPool.Put(stats.Latencies) + r.SketchPool.Put(stats.Latencies) stats.Latencies = nil } } diff --git a/pkg/network/protocols/http/stats_test.go b/pkg/network/protocols/http/stats_test.go index 03ea955b6cb6c..521381ef60c75 100644 --- a/pkg/network/protocols/http/stats_test.go +++ b/pkg/network/protocols/http/stats_test.go @@ -77,39 +77,3 @@ func verifyQuantile(t *testing.T, sketch *ddsketch.DDSketch, q float64, expected assert.True(t, val >= expectedValue-acceptableError) assert.True(t, val <= expectedValue+acceptableError) } - -// BenchmarkHttpRequestsNoPool generates stats requests -func BenchmarkHttpRequestsNoPool(b *testing.B) { - b.ResetTimer() - b.ReportAllocs() - - for i := 0; i < b.N; i++ { - stats := NewRequestStats() - // run over expected code range from 100 to 600 - for n := 100; n < 600; n++ { - var code = uint16(n) - stats.AddRequest(code, 5.0, 1, nil) - // call twice to trigger allocation of DDSketch - stats.AddRequest(code, 10.0, 1, nil) - } - } - b.StopTimer() -} - -// BenchmarkHttpRequestsWithPool generates stats requests using pool of DDSketch objects -func BenchmarkHttpRequestsWithPool(b *testing.B) { - b.ResetTimer() - b.ReportAllocs() - - pool := NewSketchPool() - for i := 0; i < b.N; i++ { - stats := NewRequestStatsWithPool(pool) - for n := 100; n < 600; n++ { - var code = uint16(n) - stats.AddRequest(code, 5.0, 1, nil) - // call twice to trigger allocation of DDSketch - stats.AddRequest(code, 10.0, 1, nil) - } - } - b.StopTimer() -}