diff --git a/dev-packages/node-integration-tests/suites/tracing/openai/scenario.mjs b/dev-packages/node-integration-tests/suites/tracing/openai/scenario.mjs index 3958517bea40..faf554ede924 100644 --- a/dev-packages/node-integration-tests/suites/tracing/openai/scenario.mjs +++ b/dev-packages/node-integration-tests/suites/tracing/openai/scenario.mjs @@ -18,6 +18,11 @@ class MockOpenAI { throw error; } + // If stream is requested, return an async generator + if (params.stream) { + return this._createChatCompletionStream(params); + } + return { id: 'chatcmpl-mock123', object: 'chat.completion', @@ -48,14 +53,19 @@ class MockOpenAI { create: async params => { await new Promise(resolve => setTimeout(resolve, 10)); + // If stream is requested, return an async generator + if (params.stream) { + return this._createResponsesApiStream(params); + } + return { id: 'resp_mock456', object: 'response', - created: 1677652290, + created_at: 1677652290, model: params.model, input_text: params.input, output_text: `Response to: ${params.input}`, - finish_reason: 'stop', + status: 'completed', usage: { input_tokens: 5, output_tokens: 8, @@ -65,6 +75,163 @@ class MockOpenAI { }, }; } + + // Create a mock streaming response for chat completions + async *_createChatCompletionStream(params) { + // First chunk with basic info + yield { + id: 'chatcmpl-stream-123', + object: 'chat.completion.chunk', + created: 1677652300, + model: params.model, + system_fingerprint: 'fp_stream_123', + choices: [ + { + index: 0, + delta: { + role: 'assistant', + content: 'Hello', + }, + finish_reason: null, + }, + ], + }; + + // Second chunk with more content + yield { + id: 'chatcmpl-stream-123', + object: 'chat.completion.chunk', + created: 1677652300, + model: params.model, + system_fingerprint: 'fp_stream_123', + choices: [ + { + index: 0, + delta: { + content: ' from OpenAI streaming!', + }, + finish_reason: 'stop', + }, + ], + usage: { + prompt_tokens: 12, + completion_tokens: 18, + total_tokens: 30, + completion_tokens_details: { + accepted_prediction_tokens: 0, + audio_tokens: 0, + reasoning_tokens: 0, + rejected_prediction_tokens: 0, + }, + prompt_tokens_details: { + audio_tokens: 0, + cached_tokens: 0, + }, + }, + }; + } + + // Create a mock streaming response for responses API + async *_createResponsesApiStream(params) { + // Response created event + yield { + type: 'response.created', + response: { + id: 'resp_stream_456', + object: 'response', + created_at: 1677652310, + model: params.model, + status: 'in_progress', + error: null, + incomplete_details: null, + instructions: params.instructions, + max_output_tokens: 1000, + parallel_tool_calls: false, + previous_response_id: null, + reasoning: { + effort: null, + summary: null, + }, + store: false, + temperature: 0.7, + text: { + format: { + type: 'text', + }, + }, + tool_choice: 'auto', + tools: [], + top_p: 1.0, + truncation: 'disabled', + user: null, + metadata: {}, + output: [], + output_text: '', + usage: { + input_tokens: 0, + output_tokens: 0, + total_tokens: 0, + }, + }, + sequence_number: 1, + }; + + // Response in progress with output text delta + yield { + type: 'response.output_text.delta', + delta: 'Streaming response to: ', + sequence_number: 2, + }; + + yield { + type: 'response.output_text.delta', + delta: params.input, + sequence_number: 3, + }; + + // Response completed event + yield { + type: 'response.completed', + response: { + id: 'resp_stream_456', + object: 'response', + created_at: 1677652310, + model: params.model, + status: 'completed', + error: null, + incomplete_details: null, + instructions: params.instructions, + max_output_tokens: 1000, + parallel_tool_calls: false, + previous_response_id: null, + reasoning: { + effort: null, + summary: null, + }, + store: false, + temperature: 0.7, + text: { + format: { + type: 'text', + }, + }, + tool_choice: 'auto', + tools: [], + top_p: 1.0, + truncation: 'disabled', + user: null, + metadata: {}, + output: [], + output_text: params.input, + usage: { + input_tokens: 6, + output_tokens: 10, + total_tokens: 16, + }, + }, + sequence_number: 4, + }; + } } async function run() { @@ -93,7 +260,7 @@ async function run() { instructions: 'You are a translator', }); - // Third test: error handling + // Third test: error handling in chat completions try { await client.chat.completions.create({ model: 'error-model', @@ -102,6 +269,51 @@ async function run() { } catch { // Error is expected and handled } + + // Fourth test: chat completions streaming + const stream1 = await client.chat.completions.create({ + model: 'gpt-4', + messages: [ + { role: 'system', content: 'You are a helpful assistant.' }, + { role: 'user', content: 'Tell me about streaming' }, + ], + stream: true, + temperature: 0.8, + }); + + // Consume the stream to trigger span instrumentation + for await (const chunk of stream1) { + // Stream chunks are processed automatically by instrumentation + void chunk; // Prevent unused variable warning + } + + // Fifth test: responses API streaming + const stream2 = await client.responses.create({ + model: 'gpt-4', + input: 'Test streaming responses API', + instructions: 'You are a streaming assistant', + stream: true, + }); + + for await (const chunk of stream2) { + void chunk; + } + + // Sixth test: error handling in streaming context + try { + const errorStream = await client.chat.completions.create({ + model: 'error-model', + messages: [{ role: 'user', content: 'This will fail' }], + stream: true, + }); + + // Try to consume the stream (this should not execute) + for await (const chunk of errorStream) { + void chunk; + } + } catch { + // Error is expected and handled + } }); } diff --git a/dev-packages/node-integration-tests/suites/tracing/openai/test.ts b/dev-packages/node-integration-tests/suites/tracing/openai/test.ts index ec6f97a6aa00..0a0d0418da14 100644 --- a/dev-packages/node-integration-tests/suites/tracing/openai/test.ts +++ b/dev-packages/node-integration-tests/suites/tracing/openai/test.ts @@ -45,11 +45,13 @@ describe('OpenAI integration', () => { 'gen_ai.request.model': 'gpt-3.5-turbo', 'gen_ai.response.model': 'gpt-3.5-turbo', 'gen_ai.response.id': 'resp_mock456', + 'gen_ai.response.finish_reasons': '["completed"]', 'gen_ai.usage.input_tokens': 5, 'gen_ai.usage.output_tokens': 8, 'gen_ai.usage.total_tokens': 13, 'openai.response.id': 'resp_mock456', 'openai.response.model': 'gpt-3.5-turbo', + 'openai.response.timestamp': '2023-03-01T06:31:30.000Z', 'openai.usage.completion_tokens': 8, 'openai.usage.prompt_tokens': 5, }, @@ -72,6 +74,76 @@ describe('OpenAI integration', () => { origin: 'manual', status: 'unknown_error', }), + // Fourth span - chat completions streaming + expect.objectContaining({ + data: { + 'gen_ai.operation.name': 'chat', + 'sentry.op': 'gen_ai.chat', + 'sentry.origin': 'manual', + 'gen_ai.system': 'openai', + 'gen_ai.request.model': 'gpt-4', + 'gen_ai.request.temperature': 0.8, + 'gen_ai.request.stream': true, + 'gen_ai.response.model': 'gpt-4', + 'gen_ai.response.id': 'chatcmpl-stream-123', + 'gen_ai.response.finish_reasons': '["stop"]', + 'gen_ai.usage.input_tokens': 12, + 'gen_ai.usage.output_tokens': 18, + 'gen_ai.usage.total_tokens': 30, + 'openai.response.id': 'chatcmpl-stream-123', + 'openai.response.model': 'gpt-4', + 'openai.response.stream': true, + 'openai.response.timestamp': '2023-03-01T06:31:40.000Z', + 'openai.usage.completion_tokens': 18, + 'openai.usage.prompt_tokens': 12, + }, + description: 'chat gpt-4 stream-response', + op: 'gen_ai.chat', + origin: 'manual', + status: 'ok', + }), + // Fifth span - responses API streaming + expect.objectContaining({ + data: { + 'gen_ai.operation.name': 'chat', + 'sentry.op': 'gen_ai.chat', + 'sentry.origin': 'manual', + 'gen_ai.system': 'openai', + 'gen_ai.request.model': 'gpt-4', + 'gen_ai.request.stream': true, + 'gen_ai.response.model': 'gpt-4', + 'gen_ai.response.id': 'resp_stream_456', + 'gen_ai.response.finish_reasons': '["in_progress","completed"]', + 'gen_ai.usage.input_tokens': 6, + 'gen_ai.usage.output_tokens': 10, + 'gen_ai.usage.total_tokens': 16, + 'openai.response.id': 'resp_stream_456', + 'openai.response.model': 'gpt-4', + 'openai.response.stream': true, + 'openai.response.timestamp': '2023-03-01T06:31:50.000Z', + 'openai.usage.completion_tokens': 10, + 'openai.usage.prompt_tokens': 6, + }, + description: 'chat gpt-4 stream-response', + op: 'gen_ai.chat', + origin: 'manual', + status: 'ok', + }), + // Sixth span - error handling in streaming context + expect.objectContaining({ + data: { + 'gen_ai.operation.name': 'chat', + 'gen_ai.request.model': 'error-model', + 'gen_ai.request.stream': true, + 'gen_ai.system': 'openai', + 'sentry.op': 'gen_ai.chat', + 'sentry.origin': 'manual', + }, + description: 'chat error-model stream-response', + op: 'gen_ai.chat', + origin: 'manual', + status: 'internal_error', + }), ]), }; @@ -117,6 +189,7 @@ describe('OpenAI integration', () => { 'gen_ai.request.model': 'gpt-3.5-turbo', 'gen_ai.request.messages': '"Translate this to French: Hello"', 'gen_ai.response.text': 'Response to: Translate this to French: Hello', + 'gen_ai.response.finish_reasons': '["completed"]', 'gen_ai.response.model': 'gpt-3.5-turbo', 'gen_ai.response.id': 'resp_mock456', 'gen_ai.usage.input_tokens': 5, @@ -124,6 +197,7 @@ describe('OpenAI integration', () => { 'gen_ai.usage.total_tokens': 13, 'openai.response.id': 'resp_mock456', 'openai.response.model': 'gpt-3.5-turbo', + 'openai.response.timestamp': '2023-03-01T06:31:30.000Z', 'openai.usage.completion_tokens': 8, 'openai.usage.prompt_tokens': 5, }, @@ -147,6 +221,82 @@ describe('OpenAI integration', () => { origin: 'manual', status: 'unknown_error', }), + // Fourth span - chat completions streaming with PII + expect.objectContaining({ + data: expect.objectContaining({ + 'gen_ai.operation.name': 'chat', + 'sentry.op': 'gen_ai.chat', + 'sentry.origin': 'manual', + 'gen_ai.system': 'openai', + 'gen_ai.request.model': 'gpt-4', + 'gen_ai.request.temperature': 0.8, + 'gen_ai.request.stream': true, + 'gen_ai.request.messages': + '[{"role":"system","content":"You are a helpful assistant."},{"role":"user","content":"Tell me about streaming"}]', + 'gen_ai.response.text': 'Hello from OpenAI streaming!', + 'gen_ai.response.finish_reasons': '["stop"]', + 'gen_ai.response.id': 'chatcmpl-stream-123', + 'gen_ai.response.model': 'gpt-4', + 'gen_ai.usage.input_tokens': 12, + 'gen_ai.usage.output_tokens': 18, + 'gen_ai.usage.total_tokens': 30, + 'openai.response.id': 'chatcmpl-stream-123', + 'openai.response.model': 'gpt-4', + 'openai.response.stream': true, + 'openai.response.timestamp': '2023-03-01T06:31:40.000Z', + 'openai.usage.completion_tokens': 18, + 'openai.usage.prompt_tokens': 12, + }), + description: 'chat gpt-4 stream-response', + op: 'gen_ai.chat', + origin: 'manual', + status: 'ok', + }), + // Fifth span - responses API streaming with PII + expect.objectContaining({ + data: expect.objectContaining({ + 'gen_ai.operation.name': 'chat', + 'sentry.op': 'gen_ai.chat', + 'sentry.origin': 'manual', + 'gen_ai.system': 'openai', + 'gen_ai.request.model': 'gpt-4', + 'gen_ai.request.stream': true, + 'gen_ai.request.messages': '"Test streaming responses API"', + 'gen_ai.response.text': 'Streaming response to: Test streaming responses APITest streaming responses API', + 'gen_ai.response.finish_reasons': '["in_progress","completed"]', + 'gen_ai.response.id': 'resp_stream_456', + 'gen_ai.response.model': 'gpt-4', + 'gen_ai.usage.input_tokens': 6, + 'gen_ai.usage.output_tokens': 10, + 'gen_ai.usage.total_tokens': 16, + 'openai.response.id': 'resp_stream_456', + 'openai.response.model': 'gpt-4', + 'openai.response.stream': true, + 'openai.response.timestamp': '2023-03-01T06:31:50.000Z', + 'openai.usage.completion_tokens': 10, + 'openai.usage.prompt_tokens': 6, + }), + description: 'chat gpt-4 stream-response', + op: 'gen_ai.chat', + origin: 'manual', + status: 'ok', + }), + // Sixth span - error handling in streaming context with PII + expect.objectContaining({ + data: { + 'gen_ai.operation.name': 'chat', + 'gen_ai.request.model': 'error-model', + 'gen_ai.request.stream': true, + 'gen_ai.request.messages': '[{"role":"user","content":"This will fail"}]', + 'gen_ai.system': 'openai', + 'sentry.op': 'gen_ai.chat', + 'sentry.origin': 'manual', + }, + description: 'chat error-model stream-response', + op: 'gen_ai.chat', + origin: 'manual', + status: 'internal_error', + }), ]), }; @@ -160,24 +310,44 @@ describe('OpenAI integration', () => { 'gen_ai.response.text': expect.any(String), // Should include response text when recordOutputs: true }), }), + // Check that custom options are respected for streaming + expect.objectContaining({ + data: expect.objectContaining({ + 'gen_ai.request.messages': expect.any(String), // Should include messages when recordInputs: true + 'gen_ai.response.text': expect.any(String), // Should include response text when recordOutputs: true + 'gen_ai.request.stream': true, // Should be marked as stream + }), + }), ]), }; createEsmAndCjsTests(__dirname, 'scenario.mjs', 'instrument.mjs', (createRunner, test) => { test('creates openai related spans with sendDefaultPii: false', async () => { - await createRunner().expect({ transaction: EXPECTED_TRANSACTION_DEFAULT_PII_FALSE }).start().completed(); + await createRunner() + .ignore('event') + .expect({ transaction: EXPECTED_TRANSACTION_DEFAULT_PII_FALSE }) + .start() + .completed(); }); }); createEsmAndCjsTests(__dirname, 'scenario.mjs', 'instrument-with-pii.mjs', (createRunner, test) => { test('creates openai related spans with sendDefaultPii: true', async () => { - await createRunner().expect({ transaction: EXPECTED_TRANSACTION_DEFAULT_PII_TRUE }).start().completed(); + await createRunner() + .ignore('event') + .expect({ transaction: EXPECTED_TRANSACTION_DEFAULT_PII_TRUE }) + .start() + .completed(); }); }); createEsmAndCjsTests(__dirname, 'scenario.mjs', 'instrument-with-options.mjs', (createRunner, test) => { test('creates openai related spans with custom options', async () => { - await createRunner().expect({ transaction: EXPECTED_TRANSACTION_WITH_OPTIONS }).start().completed(); + await createRunner() + .ignore('event') + .expect({ transaction: EXPECTED_TRANSACTION_WITH_OPTIONS }) + .start() + .completed(); }); }); }); diff --git a/packages/core/src/utils/gen-ai-attributes.ts b/packages/core/src/utils/gen-ai-attributes.ts index cf8a073a4313..9a0fd5e28e7a 100644 --- a/packages/core/src/utils/gen-ai-attributes.ts +++ b/packages/core/src/utils/gen-ai-attributes.ts @@ -127,15 +127,20 @@ export const OPENAI_RESPONSE_MODEL_ATTRIBUTE = 'openai.response.model'; export const OPENAI_RESPONSE_TIMESTAMP_ATTRIBUTE = 'openai.response.timestamp'; /** - * The number of completion tokens used (OpenAI specific) + * The number of completion tokens used */ export const OPENAI_USAGE_COMPLETION_TOKENS_ATTRIBUTE = 'openai.usage.completion_tokens'; /** - * The number of prompt tokens used (OpenAI specific) + * The number of prompt tokens used */ export const OPENAI_USAGE_PROMPT_TOKENS_ATTRIBUTE = 'openai.usage.prompt_tokens'; +/** + * Whether the response is a stream response + */ +export const OPENAI_RESPONSE_STREAM_ATTRIBUTE = 'openai.response.stream'; + // ============================================================================= // OPENAI OPERATIONS // ============================================================================= diff --git a/packages/core/src/utils/openai/constants.ts b/packages/core/src/utils/openai/constants.ts index e552616cc1db..462f007b0585 100644 --- a/packages/core/src/utils/openai/constants.ts +++ b/packages/core/src/utils/openai/constants.ts @@ -3,3 +3,12 @@ export const OPENAI_INTEGRATION_NAME = 'OpenAI'; // https://platform.openai.com/docs/quickstart?api-mode=responses // https://platform.openai.com/docs/quickstart?api-mode=chat export const INSTRUMENTED_METHODS = ['responses.create', 'chat.completions.create'] as const; +export const RESPONSE_EVENT_TYPES = [ + 'response.created', + 'response.in_progress', + 'response.failed', + 'response.completed', + 'response.incomplete', + 'response.queued', + 'response.output_text.delta', +] as const; diff --git a/packages/core/src/utils/openai/index.ts b/packages/core/src/utils/openai/index.ts index 9bab70c2ae7c..bb8e4f983ee7 100644 --- a/packages/core/src/utils/openai/index.ts +++ b/packages/core/src/utils/openai/index.ts @@ -1,6 +1,7 @@ import { getCurrentScope } from '../../currentScopes'; import { captureException } from '../../exports'; -import { startSpan } from '../../tracing/trace'; +import { SPAN_STATUS_ERROR } from '../../tracing'; +import { startSpan, startSpanManual } from '../../tracing/trace'; import type { Span, SpanAttributeValue } from '../../types-hoist/span'; import { GEN_AI_OPERATION_NAME_ATTRIBUTE, @@ -8,24 +9,17 @@ import { GEN_AI_REQUEST_MESSAGES_ATTRIBUTE, GEN_AI_REQUEST_MODEL_ATTRIBUTE, GEN_AI_REQUEST_PRESENCE_PENALTY_ATTRIBUTE, + GEN_AI_REQUEST_STREAM_ATTRIBUTE, GEN_AI_REQUEST_TEMPERATURE_ATTRIBUTE, GEN_AI_REQUEST_TOP_P_ATTRIBUTE, GEN_AI_RESPONSE_FINISH_REASONS_ATTRIBUTE, - GEN_AI_RESPONSE_ID_ATTRIBUTE, - GEN_AI_RESPONSE_MODEL_ATTRIBUTE, GEN_AI_RESPONSE_TEXT_ATTRIBUTE, GEN_AI_SYSTEM_ATTRIBUTE, - GEN_AI_USAGE_INPUT_TOKENS_ATTRIBUTE, - GEN_AI_USAGE_OUTPUT_TOKENS_ATTRIBUTE, - GEN_AI_USAGE_TOTAL_TOKENS_ATTRIBUTE, - OPENAI_RESPONSE_ID_ATTRIBUTE, - OPENAI_RESPONSE_MODEL_ATTRIBUTE, - OPENAI_RESPONSE_TIMESTAMP_ATTRIBUTE, - OPENAI_USAGE_COMPLETION_TOKENS_ATTRIBUTE, - OPENAI_USAGE_PROMPT_TOKENS_ATTRIBUTE, } from '../gen-ai-attributes'; import { OPENAI_INTEGRATION_NAME } from './constants'; +import { instrumentStream } from './streaming'; import type { + ChatCompletionChunk, InstrumentedMethod, OpenAiChatCompletionObject, OpenAiClient, @@ -33,6 +27,8 @@ import type { OpenAiOptions, OpenAiResponse, OpenAIResponseObject, + OpenAIStream, + ResponseStreamingEvent, } from './types'; import { buildMethodPath, @@ -40,6 +36,8 @@ import { getSpanOperation, isChatCompletionResponse, isResponsesApiResponse, + setCommonResponseAttributes, + setTokenUsageAttributes, shouldInstrument, } from './utils'; @@ -61,6 +59,7 @@ function extractRequestAttributes(args: unknown[], methodPath: string): Record( const model = (requestAttributes[GEN_AI_REQUEST_MODEL_ATTRIBUTE] as string) || 'unknown'; const operationName = getOperationName(methodPath); - return startSpan( - { - name: `${operationName} ${model}`, - op: getSpanOperation(methodPath), - attributes: requestAttributes as Record, - }, - async (span: Span) => { - try { - if (finalOptions.recordInputs && args[0] && typeof args[0] === 'object') { - addRequestAttributes(span, args[0] as Record); + const params = args[0] as Record | undefined; + const isStreamRequested = params && typeof params === 'object' && params.stream === true; + + if (isStreamRequested) { + // For streaming responses, use manual span management to properly handle the async generator lifecycle + return startSpanManual( + { + name: `${operationName} ${model} stream-response`, + op: getSpanOperation(methodPath), + attributes: requestAttributes as Record, + }, + async (span: Span) => { + try { + if (finalOptions.recordInputs && args[0] && typeof args[0] === 'object') { + addRequestAttributes(span, args[0] as Record); + } + + const result = await originalMethod.apply(context, args); + + return instrumentStream( + result as OpenAIStream, + span, + finalOptions.recordOutputs ?? false, + ) as unknown as R; + } catch (error) { + // For streaming requests that fail before stream creation, we still want to record + // them as streaming requests but end the span gracefully + span.setStatus({ code: SPAN_STATUS_ERROR, message: 'internal_error' }); + captureException(error, { + mechanism: { + handled: false, + }, + }); + span.end(); + throw error; } + }, + ); + } else { + // Non-streaming responses + return startSpan( + { + name: `${operationName} ${model}`, + op: getSpanOperation(methodPath), + attributes: requestAttributes as Record, + }, + async (span: Span) => { + try { + if (finalOptions.recordInputs && args[0] && typeof args[0] === 'object') { + addRequestAttributes(span, args[0] as Record); + } - const result = await originalMethod.apply(context, args); - // TODO: Add streaming support - addResponseAttributes(span, result, finalOptions.recordOutputs); - return result; - } catch (error) { - captureException(error); - throw error; - } - }, - ); + const result = await originalMethod.apply(context, args); + addResponseAttributes(span, result, finalOptions.recordOutputs); + return result; + } catch (error) { + captureException(error); + throw error; + } + }, + ); + } }; } diff --git a/packages/core/src/utils/openai/streaming.ts b/packages/core/src/utils/openai/streaming.ts new file mode 100644 index 000000000000..88d4c6adf893 --- /dev/null +++ b/packages/core/src/utils/openai/streaming.ts @@ -0,0 +1,202 @@ +import { captureException } from '../../exports'; +import { SPAN_STATUS_ERROR } from '../../tracing'; +import type { Span } from '../../types-hoist/span'; +import { + GEN_AI_RESPONSE_FINISH_REASONS_ATTRIBUTE, + GEN_AI_RESPONSE_TEXT_ATTRIBUTE, + OPENAI_RESPONSE_STREAM_ATTRIBUTE, +} from '../gen-ai-attributes'; +import { RESPONSE_EVENT_TYPES } from './constants'; +import type { OpenAIResponseObject } from './types'; +import { type ChatCompletionChunk, type ResponseStreamingEvent } from './types'; +import { + isChatCompletionChunk, + isResponsesApiStreamEvent, + setCommonResponseAttributes, + setTokenUsageAttributes, +} from './utils'; + +/** + * State object used to accumulate information from a stream of OpenAI events/chunks. + */ +interface StreamingState { + /** Types of events encountered in the stream. */ + eventTypes: string[]; + /** Collected response text fragments (for output recording). */ + responseTexts: string[]; + /** Reasons for finishing the response, as reported by the API. */ + finishReasons: string[]; + /** The response ID. */ + responseId: string; + /** The model name. */ + responseModel: string; + /** The timestamp of the response. */ + responseTimestamp: number; + /** Number of prompt/input tokens used. */ + promptTokens: number | undefined; + /** Number of completion/output tokens used. */ + completionTokens: number | undefined; + /** Total number of tokens used (prompt + completion). */ + totalTokens: number | undefined; +} + +/** + * Processes a single OpenAI ChatCompletionChunk event, updating the streaming state. + * + * @param chunk - The ChatCompletionChunk event to process. + * @param state - The current streaming state to update. + * @param recordOutputs - Whether to record output text fragments. + */ +function processChatCompletionChunk(chunk: ChatCompletionChunk, state: StreamingState, recordOutputs: boolean): void { + state.responseId = chunk.id ?? state.responseId; + state.responseModel = chunk.model ?? state.responseModel; + state.responseTimestamp = chunk.created ?? state.responseTimestamp; + + if (chunk.usage) { + // For stream responses, the input tokens remain constant across all events in the stream. + // Output tokens, however, are only finalized in the last event. + // Since we can't guarantee that the last event will include usage data or even be a typed event, + // we update the output token values on every event that includes them. + // This ensures that output token usage is always set, even if the final event lacks it. + state.promptTokens = chunk.usage.prompt_tokens; + state.completionTokens = chunk.usage.completion_tokens; + state.totalTokens = chunk.usage.total_tokens; + } + + for (const choice of chunk.choices ?? []) { + if (recordOutputs && choice.delta?.content) { + state.responseTexts.push(choice.delta.content); + } + if (choice.finish_reason) { + state.finishReasons.push(choice.finish_reason); + } + } +} + +/** + * Processes a single OpenAI Responses API streaming event, updating the streaming state and span. + * + * @param streamEvent - The event to process (may be an error or unknown object). + * @param state - The current streaming state to update. + * @param recordOutputs - Whether to record output text fragments. + * @param span - The span to update with error status if needed. + */ +function processResponsesApiEvent( + streamEvent: ResponseStreamingEvent | unknown | Error, + state: StreamingState, + recordOutputs: boolean, + span: Span, +): void { + if (!(streamEvent && typeof streamEvent === 'object')) { + state.eventTypes.push('unknown:non-object'); + return; + } + if (streamEvent instanceof Error) { + span.setStatus({ code: SPAN_STATUS_ERROR, message: 'internal_error' }); + captureException(streamEvent, { + mechanism: { + handled: false, + }, + }); + return; + } + + if (!('type' in streamEvent)) return; + const event = streamEvent as ResponseStreamingEvent; + + if (!RESPONSE_EVENT_TYPES.includes(event.type)) { + state.eventTypes.push(event.type); + return; + } + + if (recordOutputs && event.type === 'response.output_text.delta' && 'delta' in event && event.delta) { + state.responseTexts.push(event.delta); + return; + } + + if ('response' in event) { + const { response } = event as { response: OpenAIResponseObject }; + state.responseId = response.id ?? state.responseId; + state.responseModel = response.model ?? state.responseModel; + state.responseTimestamp = response.created_at ?? state.responseTimestamp; + + if (response.usage) { + // For stream responses, the input tokens remain constant across all events in the stream. + // Output tokens, however, are only finalized in the last event. + // Since we can't guarantee that the last event will include usage data or even be a typed event, + // we update the output token values on every event that includes them. + // This ensures that output token usage is always set, even if the final event lacks it. + state.promptTokens = response.usage.input_tokens; + state.completionTokens = response.usage.output_tokens; + state.totalTokens = response.usage.total_tokens; + } + + if (response.status) { + state.finishReasons.push(response.status); + } + + if (recordOutputs && response.output_text) { + state.responseTexts.push(response.output_text); + } + } +} + +/** + * Instruments a stream of OpenAI events, updating the provided span with relevant attributes and + * optionally recording output text. This function yields each event from the input stream as it is processed. + * + * @template T - The type of events in the stream. + * @param stream - The async iterable stream of events to instrument. + * @param span - The span to add attributes to and to finish at the end of the stream. + * @param recordOutputs - Whether to record output text fragments in the span. + * @returns An async generator yielding each event from the input stream. + */ +export async function* instrumentStream( + stream: AsyncIterable, + span: Span, + recordOutputs: boolean, +): AsyncGenerator { + const state: StreamingState = { + eventTypes: [], + responseTexts: [], + finishReasons: [], + responseId: '', + responseModel: '', + responseTimestamp: 0, + promptTokens: undefined, + completionTokens: undefined, + totalTokens: undefined, + }; + + try { + for await (const event of stream) { + if (isChatCompletionChunk(event)) { + processChatCompletionChunk(event as ChatCompletionChunk, state, recordOutputs); + } else if (isResponsesApiStreamEvent(event)) { + processResponsesApiEvent(event as ResponseStreamingEvent, state, recordOutputs, span); + } + yield event; + } + } finally { + setCommonResponseAttributes(span, state.responseId, state.responseModel, state.responseTimestamp); + setTokenUsageAttributes(span, state.promptTokens, state.completionTokens, state.totalTokens); + + span.setAttributes({ + [OPENAI_RESPONSE_STREAM_ATTRIBUTE]: true, + }); + + if (state.finishReasons.length) { + span.setAttributes({ + [GEN_AI_RESPONSE_FINISH_REASONS_ATTRIBUTE]: JSON.stringify(state.finishReasons), + }); + } + + if (recordOutputs && state.responseTexts.length) { + span.setAttributes({ + [GEN_AI_RESPONSE_TEXT_ATTRIBUTE]: state.responseTexts.join(''), + }); + } + + span.end(); + } +} diff --git a/packages/core/src/utils/openai/types.ts b/packages/core/src/utils/openai/types.ts index c9a3870a959e..a5854868fbe2 100644 --- a/packages/core/src/utils/openai/types.ts +++ b/packages/core/src/utils/openai/types.ts @@ -132,6 +132,112 @@ export interface OpenAIResponseObject { export type OpenAiResponse = OpenAiChatCompletionObject | OpenAIResponseObject; +/** + * Streaming event types for the Responses API + * @see https://platform.openai.com/docs/api-reference/responses-streaming + * @see https://platform.openai.com/docs/guides/streaming-responses#read-the-responses for common events + */ +export type ResponseStreamingEvent = + | ResponseCreatedEvent + | ResponseInProgressEvent + | ResponseFailedEvent + | ResponseCompletedEvent + | ResponseIncompleteEvent + | ResponseQueuedEvent + | ResponseOutputTextDeltaEvent; + +interface ResponseCreatedEvent { + type: 'response.created'; + response: OpenAIResponseObject; + sequence_number: number; +} + +interface ResponseInProgressEvent { + type: 'response.in_progress'; + response: OpenAIResponseObject; + sequence_number: number; +} + +interface ResponseOutputTextDeltaEvent { + content_index: number; + delta: string; + item_id: string; + logprobs: object; + output_index: number; + sequence_number: number; + type: 'response.output_text.delta'; +} + +interface ResponseFailedEvent { + type: 'response.failed'; + response: OpenAIResponseObject; + sequence_number: number; +} + +interface ResponseIncompleteEvent { + type: 'response.incomplete'; + response: OpenAIResponseObject; + sequence_number: number; +} + +interface ResponseCompletedEvent { + type: 'response.completed'; + response: OpenAIResponseObject; + sequence_number: number; +} +interface ResponseQueuedEvent { + type: 'response.queued'; + response: OpenAIResponseObject; + sequence_number: number; +} + +/** + * Chat Completion streaming chunk type + * @see https://platform.openai.com/docs/api-reference/chat-streaming/streaming + */ +export interface ChatCompletionChunk { + id: string; + object: 'chat.completion.chunk'; + created: number; + model: string; + system_fingerprint: string; + service_tier?: string; + choices: Array<{ + index: number; + delta: { + content: string | null; + role: string; + function_call?: object; + refusal?: string | null; + tool_calls?: Array; + }; + logprobs?: unknown | null; + finish_reason?: string | null; + }>; + usage?: { + prompt_tokens: number; + completion_tokens: number; + total_tokens: number; + completion_tokens_details: { + accepted_prediction_tokens: number; + audio_tokens: number; + reasoning_tokens: number; + rejected_prediction_tokens: number; + }; + prompt_tokens_details: { + audio_tokens: number; + cached_tokens: number; + }; + }; +} + +/** + * Represents a stream of events from OpenAI APIs + */ +export interface OpenAIStream extends AsyncIterable { + [Symbol.asyncIterator](): AsyncIterator; +} + /** * OpenAI Integration interface for type safety */ diff --git a/packages/core/src/utils/openai/utils.ts b/packages/core/src/utils/openai/utils.ts index b7d5e12ecf62..66e727aff075 100644 --- a/packages/core/src/utils/openai/utils.ts +++ b/packages/core/src/utils/openai/utils.ts @@ -1,6 +1,25 @@ -import { OPENAI_OPERATIONS } from '../gen-ai-attributes'; +import type { Span } from '../../types-hoist/span'; +import { + GEN_AI_RESPONSE_ID_ATTRIBUTE, + GEN_AI_RESPONSE_MODEL_ATTRIBUTE, + GEN_AI_USAGE_INPUT_TOKENS_ATTRIBUTE, + GEN_AI_USAGE_OUTPUT_TOKENS_ATTRIBUTE, + GEN_AI_USAGE_TOTAL_TOKENS_ATTRIBUTE, + OPENAI_OPERATIONS, + OPENAI_RESPONSE_ID_ATTRIBUTE, + OPENAI_RESPONSE_MODEL_ATTRIBUTE, + OPENAI_RESPONSE_TIMESTAMP_ATTRIBUTE, + OPENAI_USAGE_COMPLETION_TOKENS_ATTRIBUTE, + OPENAI_USAGE_PROMPT_TOKENS_ATTRIBUTE, +} from '../gen-ai-attributes'; import { INSTRUMENTED_METHODS } from './constants'; -import type { InstrumentedMethod, OpenAiChatCompletionObject, OpenAIResponseObject } from './types'; +import type { + ChatCompletionChunk, + InstrumentedMethod, + OpenAiChatCompletionObject, + OpenAIResponseObject, + ResponseStreamingEvent, +} from './types'; /** * Maps OpenAI method paths to Sentry operation names @@ -61,3 +80,81 @@ export function isResponsesApiResponse(response: unknown): response is OpenAIRes (response as Record).object === 'response' ); } + +/** + * Check if streaming event is from the Responses API + */ +export function isResponsesApiStreamEvent(event: unknown): event is ResponseStreamingEvent { + return ( + event !== null && + typeof event === 'object' && + 'type' in event && + typeof (event as Record).type === 'string' && + ((event as Record).type as string).startsWith('response.') + ); +} + +/** + * Check if streaming event is a chat completion chunk + */ +export function isChatCompletionChunk(event: unknown): event is ChatCompletionChunk { + return ( + event !== null && + typeof event === 'object' && + 'object' in event && + (event as Record).object === 'chat.completion.chunk' + ); +} + +/** + * Set token usage attributes + * @param span - The span to add attributes to + * @param promptTokens - The number of prompt tokens + * @param completionTokens - The number of completion tokens + * @param totalTokens - The number of total tokens + */ +export function setTokenUsageAttributes( + span: Span, + promptTokens?: number, + completionTokens?: number, + totalTokens?: number, +): void { + if (promptTokens !== undefined) { + span.setAttributes({ + [OPENAI_USAGE_PROMPT_TOKENS_ATTRIBUTE]: promptTokens, + [GEN_AI_USAGE_INPUT_TOKENS_ATTRIBUTE]: promptTokens, + }); + } + if (completionTokens !== undefined) { + span.setAttributes({ + [OPENAI_USAGE_COMPLETION_TOKENS_ATTRIBUTE]: completionTokens, + [GEN_AI_USAGE_OUTPUT_TOKENS_ATTRIBUTE]: completionTokens, + }); + } + if (totalTokens !== undefined) { + span.setAttributes({ + [GEN_AI_USAGE_TOTAL_TOKENS_ATTRIBUTE]: totalTokens, + }); + } +} + +/** + * Set common response attributes + * @param span - The span to add attributes to + * @param id - The response id + * @param model - The response model + * @param timestamp - The response timestamp + */ +export function setCommonResponseAttributes(span: Span, id: string, model: string, timestamp: number): void { + span.setAttributes({ + [OPENAI_RESPONSE_ID_ATTRIBUTE]: id, + [GEN_AI_RESPONSE_ID_ATTRIBUTE]: id, + }); + span.setAttributes({ + [OPENAI_RESPONSE_MODEL_ATTRIBUTE]: model, + [GEN_AI_RESPONSE_MODEL_ATTRIBUTE]: model, + }); + span.setAttributes({ + [OPENAI_RESPONSE_TIMESTAMP_ATTRIBUTE]: new Date(timestamp * 1000).toISOString(), + }); +} diff --git a/packages/core/test/lib/utils/openai-utils.test.ts b/packages/core/test/lib/utils/openai-utils.test.ts index bcff545627ed..76748c2fe473 100644 --- a/packages/core/test/lib/utils/openai-utils.test.ts +++ b/packages/core/test/lib/utils/openai-utils.test.ts @@ -3,8 +3,10 @@ import { buildMethodPath, getOperationName, getSpanOperation, + isChatCompletionChunk, isChatCompletionResponse, isResponsesApiResponse, + isResponsesApiStreamEvent, shouldInstrument, } from '../../../src/utils/openai/utils'; @@ -101,4 +103,47 @@ describe('openai-utils', () => { expect(isResponsesApiResponse({ object: null })).toBe(false); }); }); + + describe('isResponsesApiStreamEvent', () => { + it('should return true for valid responses API stream events', () => { + expect(isResponsesApiStreamEvent({ type: 'response.created' })).toBe(true); + expect(isResponsesApiStreamEvent({ type: 'response.in_progress' })).toBe(true); + expect(isResponsesApiStreamEvent({ type: 'response.completed' })).toBe(true); + expect(isResponsesApiStreamEvent({ type: 'response.failed' })).toBe(true); + expect(isResponsesApiStreamEvent({ type: 'response.output_text.delta' })).toBe(true); + }); + + it('should return false for non-response events', () => { + expect(isResponsesApiStreamEvent(null)).toBe(false); + expect(isResponsesApiStreamEvent(undefined)).toBe(false); + expect(isResponsesApiStreamEvent('string')).toBe(false); + expect(isResponsesApiStreamEvent(123)).toBe(false); + expect(isResponsesApiStreamEvent({})).toBe(false); + expect(isResponsesApiStreamEvent({ type: 'chat.completion' })).toBe(false); + expect(isResponsesApiStreamEvent({ type: null })).toBe(false); + expect(isResponsesApiStreamEvent({ type: 123 })).toBe(false); + }); + }); + + describe('isChatCompletionChunk', () => { + it('should return true for valid chat completion chunks', () => { + const validChunk = { + object: 'chat.completion.chunk', + id: 'chatcmpl-123', + model: 'gpt-4', + choices: [], + }; + expect(isChatCompletionChunk(validChunk)).toBe(true); + }); + + it('should return false for invalid chunks', () => { + expect(isChatCompletionChunk(null)).toBe(false); + expect(isChatCompletionChunk(undefined)).toBe(false); + expect(isChatCompletionChunk('string')).toBe(false); + expect(isChatCompletionChunk(123)).toBe(false); + expect(isChatCompletionChunk({})).toBe(false); + expect(isChatCompletionChunk({ object: 'chat.completion' })).toBe(false); + expect(isChatCompletionChunk({ object: null })).toBe(false); + }); + }); });