Skip to content

Commit 797fe14

Browse files
metrics: TTFT in streaming mode
Signed-off-by: Jintao Zhang <[email protected]>
1 parent 7a0221b commit 797fe14

File tree

3 files changed

+107
-3
lines changed

3 files changed

+107
-3
lines changed

src/semantic-router/pkg/extproc/metrics_integration_test.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,4 +123,42 @@ var _ = Describe("Metrics recording", func() {
123123
Expect(afterPrompt).To(BeNumerically(">", beforePrompt))
124124
Expect(afterCompletion).To(BeNumerically(">", beforeCompletion))
125125
})
126+
127+
It("records TTFT on first streamed body chunk for SSE responses", func() {
128+
ctx := &RequestContext{
129+
RequestModel: "model-stream",
130+
ProcessingStartTime: time.Now().Add(-120 * time.Millisecond),
131+
Headers: map[string]string{"accept": "text/event-stream"},
132+
}
133+
134+
// Simulate header phase: SSE content-type indicates streaming
135+
respHeaders := &ext_proc.ProcessingRequest_ResponseHeaders{
136+
ResponseHeaders: &ext_proc.HttpHeaders{
137+
Headers: &core.HeaderMap{Headers: []*core.HeaderValue{{Key: "content-type", Value: "text/event-stream"}}},
138+
},
139+
}
140+
141+
before := getHistogramSampleCount("llm_model_ttft_seconds", ctx.RequestModel)
142+
143+
// Handle response headers (should NOT record TTFT for streaming)
144+
response1, err := router.handleResponseHeaders(respHeaders, ctx)
145+
Expect(err).NotTo(HaveOccurred())
146+
Expect(response1.GetResponseHeaders()).NotTo(BeNil())
147+
Expect(ctx.IsStreamingResponse).To(BeTrue())
148+
Expect(ctx.TTFTRecorded).To(BeFalse())
149+
150+
// Now simulate the first streamed body chunk
151+
respBody := &ext_proc.ProcessingRequest_ResponseBody{
152+
ResponseBody: &ext_proc.HttpBody{Body: []byte("data: chunk-1\n")},
153+
}
154+
155+
response2, err := router.handleResponseBody(respBody, ctx)
156+
Expect(err).NotTo(HaveOccurred())
157+
Expect(response2.GetResponseBody()).NotTo(BeNil())
158+
159+
after := getHistogramSampleCount("llm_model_ttft_seconds", ctx.RequestModel)
160+
Expect(after).To(BeNumerically(">", before))
161+
Expect(ctx.TTFTRecorded).To(BeTrue())
162+
Expect(ctx.TTFTSeconds).To(BeNumerically(">", 0))
163+
})
126164
})

src/semantic-router/pkg/extproc/request_handler.go

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,10 @@ type RequestContext struct {
108108
StartTime time.Time
109109
ProcessingStartTime time.Time
110110

111+
// Streaming detection
112+
ExpectStreamingResponse bool // set from request Accept header
113+
IsStreamingResponse bool // set from response Content-Type
114+
111115
// TTFT tracking
112116
TTFTRecorded bool
113117
TTFTSeconds float64
@@ -136,7 +140,14 @@ func (r *OpenAIRouter) handleRequestHeaders(v *ext_proc.ProcessingRequest_Reques
136140
}
137141
}
138142

139-
// Allow the request to continue
143+
// Detect if the client expects a streaming response (SSE)
144+
if accept, ok := ctx.Headers["accept"]; ok {
145+
if strings.Contains(strings.ToLower(accept), "text/event-stream") {
146+
ctx.ExpectStreamingResponse = true
147+
}
148+
}
149+
150+
// Prepare base response
140151
response := &ext_proc.ProcessingResponse{
141152
Response: &ext_proc.ProcessingResponse_RequestHeaders{
142153
RequestHeaders: &ext_proc.HeadersResponse{
@@ -148,6 +159,10 @@ func (r *OpenAIRouter) handleRequestHeaders(v *ext_proc.ProcessingRequest_Reques
148159
},
149160
}
150161

162+
// If streaming is expected, we rely on Envoy config to set response_body_mode: STREAMED for SSE.
163+
// Some Envoy/control-plane versions may not support per-message ModeOverride; avoid compile-time coupling here.
164+
// The Accept header is still recorded on context for downstream logic.
165+
151166
return response, nil
152167
}
153168

src/semantic-router/pkg/extproc/response_handler.go

Lines changed: 53 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package extproc
33
import (
44
"encoding/json"
55
"strconv"
6+
"strings"
67
"time"
78

89
core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
@@ -17,6 +18,9 @@ import (
1718
func (r *OpenAIRouter) handleResponseHeaders(v *ext_proc.ProcessingRequest_ResponseHeaders, ctx *RequestContext) (*ext_proc.ProcessingResponse, error) {
1819
// Detect upstream HTTP status and record non-2xx as errors
1920
if v != nil && v.ResponseHeaders != nil && v.ResponseHeaders.Headers != nil {
21+
// Determine if the response is streaming based on Content-Type
22+
ctx.IsStreamingResponse = isStreamingContentType(v.ResponseHeaders.Headers)
23+
2024
if statusCode := getStatusFromHeaders(v.ResponseHeaders.Headers); statusCode != 0 {
2125
if statusCode >= 500 {
2226
metrics.RecordRequestError(getModelFromCtx(ctx), "upstream_5xx")
@@ -26,8 +30,10 @@ func (r *OpenAIRouter) handleResponseHeaders(v *ext_proc.ProcessingRequest_Respo
2630
}
2731
}
2832

29-
// Best-effort TTFT measurement: record on first response headers if we have a start time and model
30-
if ctx != nil && !ctx.TTFTRecorded && !ctx.ProcessingStartTime.IsZero() && ctx.RequestModel != "" {
33+
// Best-effort TTFT measurement:
34+
// - For non-streaming responses, record on first response headers (approx TTFB ~= TTFT)
35+
// - For streaming responses (SSE), defer TTFT until the first response body chunk arrives
36+
if ctx != nil && !ctx.IsStreamingResponse && !ctx.TTFTRecorded && !ctx.ProcessingStartTime.IsZero() && ctx.RequestModel != "" {
3137
ttft := time.Since(ctx.ProcessingStartTime).Seconds()
3238
if ttft > 0 {
3339
metrics.RecordModelTTFT(ctx.RequestModel, ttft)
@@ -79,13 +85,58 @@ func getModelFromCtx(ctx *RequestContext) string {
7985
return ctx.RequestModel
8086
}
8187

88+
// isStreamingContentType checks if the response content-type indicates streaming (SSE)
89+
func isStreamingContentType(headerMap *core.HeaderMap) bool {
90+
if headerMap == nil {
91+
return false
92+
}
93+
for _, hv := range headerMap.Headers {
94+
if strings.ToLower(hv.Key) == "content-type" {
95+
val := hv.Value
96+
if val == "" && len(hv.RawValue) > 0 {
97+
val = string(hv.RawValue)
98+
}
99+
if strings.Contains(strings.ToLower(val), "text/event-stream") {
100+
return true
101+
}
102+
}
103+
}
104+
return false
105+
}
106+
82107
// handleResponseBody processes the response body
83108
func (r *OpenAIRouter) handleResponseBody(v *ext_proc.ProcessingRequest_ResponseBody, ctx *RequestContext) (*ext_proc.ProcessingResponse, error) {
84109
completionLatency := time.Since(ctx.StartTime)
85110

86111
// Process the response for caching
87112
responseBody := v.ResponseBody.Body
88113

114+
// If this is a streaming response (e.g., SSE), record TTFT on the first body chunk
115+
// and skip JSON parsing/caching which are not applicable for SSE chunks.
116+
if ctx.IsStreamingResponse {
117+
if ctx != nil && !ctx.TTFTRecorded && !ctx.ProcessingStartTime.IsZero() && ctx.RequestModel != "" {
118+
ttft := time.Since(ctx.ProcessingStartTime).Seconds()
119+
if ttft > 0 {
120+
metrics.RecordModelTTFT(ctx.RequestModel, ttft)
121+
ctx.TTFTSeconds = ttft
122+
ctx.TTFTRecorded = true
123+
observability.Infof("Recorded TTFT on first streamed body chunk: %.3fs", ttft)
124+
}
125+
}
126+
127+
// For streaming chunks, just continue (no token parsing or cache update)
128+
response := &ext_proc.ProcessingResponse{
129+
Response: &ext_proc.ProcessingResponse_ResponseBody{
130+
ResponseBody: &ext_proc.BodyResponse{
131+
Response: &ext_proc.CommonResponse{
132+
Status: ext_proc.CommonResponse_CONTINUE,
133+
},
134+
},
135+
},
136+
}
137+
return response, nil
138+
}
139+
89140
// Parse tokens from the response JSON using OpenAI SDK types
90141
var parsed openai.ChatCompletion
91142
if err := json.Unmarshal(responseBody, &parsed); err != nil {

0 commit comments

Comments
 (0)