Skip to content

Commit

Permalink
profiler: add DD_PROFILING_FLUSH_ON_EXIT to upload current profiles b…
Browse files Browse the repository at this point in the history
…efore exiting (#2926)
  • Loading branch information
jinroh authored Nov 4, 2024
1 parent 4d79271 commit d846308
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 5 deletions.
3 changes: 3 additions & 0 deletions profiler/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ type config struct {
traceConfig executionTraceConfig
endpointCountEnabled bool
enabled bool
flushOnExit bool
}

// logStartup records the configuration to the configured logger in JSON format
Expand Down Expand Up @@ -148,6 +149,7 @@ func logStartup(c *config) {
"endpoint_count_enabled": c.endpointCountEnabled,
"custom_profiler_label_keys": c.customProfilerLabels,
"enabled": c.enabled,
"flush_on_exit": c.flushOnExit,
}
b, err := json.Marshal(info)
if err != nil {
Expand Down Expand Up @@ -242,6 +244,7 @@ func defaultConfig() (*config, error) {
if v := os.Getenv("DD_VERSION"); v != "" {
WithVersion(v)(&c)
}
c.flushOnExit = internal.BoolEnv("DD_PROFILING_FLUSH_ON_EXIT", false)

tags := make(map[string]string)
if v := os.Getenv("DD_TAGS"); v != "" {
Expand Down
18 changes: 14 additions & 4 deletions profiler/profiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,8 @@ func (p *profiler) collect(ticker <-chan time.Time) {
endpointCounter.GetAndReset()
}()

for {
exit := false
for !exit {
bat := batch{
seq: p.seq,
host: p.cfg.hostname,
Expand Down Expand Up @@ -384,7 +385,11 @@ func (p *profiler) collect(ticker <-chan time.Time) {
// is less than the configured profiling period, the ticker will block
// until the end of the profiling period.
case <-p.exit:
return
if !p.cfg.flushOnExit {
return
}
// If we're flushing, we enqueue the batch before exiting the loop.
exit = true
}

// Include endpoint hits from tracer in profile `event.json`.
Expand Down Expand Up @@ -457,8 +462,13 @@ func (p *profiler) send() {
for {
select {
case <-p.exit:
return
case bat := <-p.out:
if !p.cfg.flushOnExit {
return
}
case bat, ok := <-p.out:
if !ok {
return
}
if err := p.outputDir(bat); err != nil {
log.Error("Failed to output profile to dir: %v", err)
}
Expand Down
54 changes: 54 additions & 0 deletions profiler/profiler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"runtime/trace"
"strconv"
"strings"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -233,6 +234,59 @@ func TestStopLatency(t *testing.T) {
}
}

func TestFlushAndStop(t *testing.T) {
t.Setenv("DD_PROFILING_FLUSH_ON_EXIT", "1")
received := startTestProfiler(t, 1,
WithProfileTypes(CPUProfile, HeapProfile),
WithPeriod(time.Hour),
WithUploadTimeout(time.Hour))

Stop()

select {
case prof := <-received:
if len(prof.attachments["cpu.pprof"]) == 0 {
t.Errorf("expected CPU profile, got none")
}
if len(prof.attachments["delta-heap.pprof"]) == 0 {
t.Errorf("expected heap profile, got none")
}
case <-time.After(5 * time.Second):
t.Fatalf("profiler did not flush")
}
}

func TestFlushAndStopTimeout(t *testing.T) {
uploadTimeout := 1 * time.Second
var requests atomic.Int32
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if h := r.Header.Get("DD-Telemetry-Request-Type"); len(h) > 0 {
return
}
requests.Add(1)
time.Sleep(2 * uploadTimeout)
}))
defer server.Close()

t.Setenv("DD_PROFILING_FLUSH_ON_EXIT", "1")
Start(
WithAgentAddr(server.Listener.Addr().String()),
WithPeriod(time.Hour),
WithUploadTimeout(uploadTimeout),
)

start := time.Now()
Stop()

elapsed := time.Since(start)
if elapsed > (maxRetries*uploadTimeout)+1*time.Second {
t.Errorf("profiler took %v to stop", elapsed)
}
if requests.Load() != maxRetries {
t.Errorf("expected %d requests, got %d", maxRetries, requests.Load())
}
}

func TestSetProfileFraction(t *testing.T) {
t.Run("on", func(t *testing.T) {
start := runtime.SetMutexProfileFraction(0)
Expand Down
1 change: 1 addition & 0 deletions profiler/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func startTelemetry(c *config) {
{Name: "endpoint_count_enabled", Value: c.endpointCountEnabled},
{Name: "num_custom_profiler_label_keys", Value: len(c.customProfilerLabels)},
{Name: "enabled", Value: c.enabled},
{Name: "flush_on_exit", Value: c.flushOnExit},
},
)
}
7 changes: 6 additions & 1 deletion profiler/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ func (p *profiler) upload(bat batch) error {
for i := 0; i < maxRetries; i++ {
select {
case <-p.exit:
return nil
if !p.cfg.flushOnExit {
return nil
}
default:
}

Expand Down Expand Up @@ -98,6 +100,9 @@ func (p *profiler) doRequest(bat batch) error {
go func() {
select {
case <-p.exit:
if p.cfg.flushOnExit {
return
}
case <-funcExit:
}
cancel()
Expand Down

0 comments on commit d846308

Please sign in to comment.