Skip to content

Commit

Permalink
[usm] reliability, http protocol, benchmark StatKeeper with pool of D…
Browse files Browse the repository at this point in the history
…DSketch.
  • Loading branch information
yuri-lipnesh committed Dec 9, 2024
1 parent 763707c commit d6b8218
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 57 deletions.
12 changes: 7 additions & 5 deletions pkg/network/protocols/http/statkeeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type StatKeeper struct {

oversizedLogLimit *log.Limit

// pool of ddsketch objects
// pool of 'DDSketch' objects
ddsketchPool *ddsync.TypedPool[ddsketch.DDSketch]
}

Expand All @@ -65,7 +65,7 @@ func NewStatkeeper(c *config.Config, telemetry *Telemetry, incompleteBuffer Inco
buffer: make([]byte, getPathBufferSize(c)),
telemetry: telemetry,
oversizedLogLimit: log.NewLogLimit(10, time.Minute*10),
ddsketchPool: NewSketchPool(),
ddsketchPool: newSketchPool(),
}
}

Expand Down Expand Up @@ -166,6 +166,7 @@ func (h *StatKeeper) add(tx Transaction) {
}
h.telemetry.aggregations.Add(1)
stats = NewRequestStats()
stats.SketchPool = h.ddsketchPool
h.stats[key] = stats
}

Expand Down Expand Up @@ -227,8 +228,8 @@ func (h *StatKeeper) clearEphemeralPorts(aggregator *utils.ConnectionAggregator,
}
}

// NewSketchPool - creates new pool of DDSketch objects
func NewSketchPool() *ddsync.TypedPool[ddsketch.DDSketch] {
// newSketchPool creates new pool of DDSketch objects.
func newSketchPool() *ddsync.TypedPool[ddsketch.DDSketch] {
sketchPool := ddsync.NewTypedPool(func() *ddsketch.DDSketch {
sketch, err := ddsketch.NewDefaultDDSketch(RelativeAccuracy)
if err != nil {
Expand All @@ -239,8 +240,9 @@ func NewSketchPool() *ddsync.TypedPool[ddsketch.DDSketch] {
return sketchPool
}

// releaseSketchPool put DDSketch objects to pool.
func (h *StatKeeper) releaseSketchPool() {
for _, stats := range h.stats {
stats.ReleaseSketches()
stats.PutSketches()
}
}
75 changes: 75 additions & 0 deletions pkg/network/protocols/http/statkeeper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,3 +322,78 @@ func TestHTTPCorrectness(t *testing.T) {
require.Len(t, stats, 0)
})
}

func makeStatkeeper() *StatKeeper {
cfg := config.New()
cfg.MaxHTTPStatsBuffered = 100000
tel := NewTelemetry("http")
return NewStatkeeper(cfg, tel, NewIncompleteBuffer(cfg, tel))
}

// BenchmarkHTTPStatkeeperWithPool benchmark allocations with pool of 'DDSketch' objects
func BenchmarkHTTPStatkeeperWithPool(b *testing.B) {
sk := makeStatkeeper()

sourceIP := util.AddressFromString("1.1.1.1")
sourcePort := 1234
destIP := util.AddressFromString("2.2.2.2")
destPort := 8080

const numPaths = 10000
const uniqPaths = 50
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
sk.GetAndResetAllStats()
for p := 0; p < numPaths; p++ {
b.StopTimer()
//we use subset of unique endpoints, but those will occur over and over again like in regular target application
path := "/testpath/blablabla/dsadas/isdaasd/asdasadsadasd" + strconv.Itoa(p%uniqPaths)
//we simulate different conn tuples by increasing the port number
newSourcePort := sourcePort + (p % 30)
statusCode := (i%5 + 1) * 100
latency := time.Duration(i%5+1) * time.Millisecond
tx := generateIPv4HTTPTransaction(sourceIP, destIP, newSourcePort, destPort, path, statusCode, latency)
b.StartTimer()
sk.Process(tx)
}
}
b.StopTimer()
}

// BenchmarkHTTPStatkeeperNoPool benchmark allocations without pool of 'DDSketch' objects
func BenchmarkHTTPStatkeeperNoPool(b *testing.B) {
sk := makeStatkeeper()
sk.DisableSketchPool()

sourceIP := util.AddressFromString("1.1.1.1")
sourcePort := 1234
destIP := util.AddressFromString("2.2.2.2")
destPort := 8080

const numPaths = 10000
const uniqPaths = 50
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
sk.GetAndResetAllStats()
for p := 0; p < numPaths; p++ {
b.StopTimer()
//we use subset of unique endpoints, but those will occur over and over again like in regular target application
path := "/testpath/blablabla/dsadas/isdaasd/asdasadsadasd" + strconv.Itoa(p%uniqPaths)
//we simulate different conn tuples by increasing the port number
newSourcePort := sourcePort + (p % 30)
statusCode := (i%5 + 1) * 100
latency := time.Duration(i%5+1) * time.Millisecond
tx := generateIPv4HTTPTransaction(sourceIP, destIP, newSourcePort, destPort, path, statusCode, latency)
b.StartTimer()
sk.Process(tx)
}
}
b.StopTimer()
}

// DisableSketchPool disable pool of 'DDSketch' objects for testing purpose.
func (h *StatKeeper) DisableSketchPool() {
h.ddsketchPool = nil
}
24 changes: 8 additions & 16 deletions pkg/network/protocols/http/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,8 @@ func (r *RequestStat) initSketch() (err error) {

// RequestStats stores HTTP request statistics.
type RequestStats struct {
Data map[uint16]*RequestStat
ddsketchPool *ddsync.TypedPool[ddsketch.DDSketch]
Data map[uint16]*RequestStat
SketchPool *ddsync.TypedPool[ddsketch.DDSketch]
}

// NewRequestStats creates a new RequestStats object.
Expand All @@ -150,14 +150,6 @@ func NewRequestStats() *RequestStats {
}
}

// NewRequestStatsWithPool creates a new RequestStats object.
func NewRequestStatsWithPool(sketchPool *ddsync.TypedPool[ddsketch.DDSketch]) *RequestStats {
return &RequestStats{
Data: make(map[uint16]*RequestStat),
ddsketchPool: sketchPool,
}
}

// isValid checks is the status code is in the range of valid HTTP responses.
func (r *RequestStats) isValid(status uint16) bool {
return status >= 100 && status < 600
Expand Down Expand Up @@ -230,8 +222,8 @@ func (r *RequestStats) AddRequest(statusCode uint16, latency float64, staticTags
return
}
if stats.Latencies == nil {
if r.ddsketchPool != nil {
stats.Latencies = r.ddsketchPool.Get()
if r.SketchPool != nil {
stats.Latencies = r.SketchPool.Get()
} else if err := stats.initSketch(); err != nil {
return
}
Expand All @@ -257,11 +249,11 @@ func (r *RequestStats) HalfAllCounts() {
}
}

// ReleaseSketches adds all obtained sketch objects to the pool.
func (r *RequestStats) ReleaseSketches() {
if r.ddsketchPool != nil {
// PutSketches adds all obtained sketch objects to the pool.
func (r *RequestStats) PutSketches() {
if r.SketchPool != nil {
for _, stats := range r.Data {
r.ddsketchPool.Put(stats.Latencies)
r.SketchPool.Put(stats.Latencies)
stats.Latencies = nil
}
}
Expand Down
36 changes: 0 additions & 36 deletions pkg/network/protocols/http/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,39 +77,3 @@ func verifyQuantile(t *testing.T, sketch *ddsketch.DDSketch, q float64, expected
assert.True(t, val >= expectedValue-acceptableError)
assert.True(t, val <= expectedValue+acceptableError)
}

// BenchmarkHttpRequestsNoPool generates stats requests
func BenchmarkHttpRequestsNoPool(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()

for i := 0; i < b.N; i++ {
stats := NewRequestStats()
// run over expected code range from 100 to 600
for n := 100; n < 600; n++ {
var code = uint16(n)
stats.AddRequest(code, 5.0, 1, nil)
// call twice to trigger allocation of DDSketch
stats.AddRequest(code, 10.0, 1, nil)
}
}
b.StopTimer()
}

// BenchmarkHttpRequestsWithPool generates stats requests using pool of DDSketch objects
func BenchmarkHttpRequestsWithPool(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()

pool := NewSketchPool()
for i := 0; i < b.N; i++ {
stats := NewRequestStatsWithPool(pool)
for n := 100; n < 600; n++ {
var code = uint16(n)
stats.AddRequest(code, 5.0, 1, nil)
// call twice to trigger allocation of DDSketch
stats.AddRequest(code, 10.0, 1, nil)
}
}
b.StopTimer()
}

0 comments on commit d6b8218

Please sign in to comment.