Skip to content

feat(node): Instrument stream responses for openai #17110

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 12 commits into
base: develop
Choose a base branch
from
Open
212 changes: 212 additions & 0 deletions dev-packages/node-integration-tests/suites/tracing/openai/scenario.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ class MockOpenAI {
throw error;
}

// If stream is requested, return an async generator
if (params.stream) {
return this._createChatCompletionStream(params);
}

return {
id: 'chatcmpl-mock123',
object: 'chat.completion',
Expand Down Expand Up @@ -48,6 +53,11 @@ class MockOpenAI {
create: async params => {
await new Promise(resolve => setTimeout(resolve, 10));

// If stream is requested, return an async generator
if (params.stream) {
return this._createResponsesApiStream(params);
}

return {
id: 'resp_mock456',
object: 'response',
Expand All @@ -65,6 +75,163 @@ class MockOpenAI {
},
};
}

// Create a mock streaming response for chat completions
async *_createChatCompletionStream(params) {
// First chunk with basic info
yield {
id: 'chatcmpl-stream-123',
object: 'chat.completion.chunk',
created: 1677652300,
model: params.model,
system_fingerprint: 'fp_stream_123',
choices: [
{
index: 0,
delta: {
role: 'assistant',
content: 'Hello',
},
finish_reason: null,
},
],
};

// Second chunk with more content
yield {
id: 'chatcmpl-stream-123',
object: 'chat.completion.chunk',
created: 1677652300,
model: params.model,
system_fingerprint: 'fp_stream_123',
choices: [
{
index: 0,
delta: {
content: ' from OpenAI streaming!',
},
finish_reason: 'stop',
},
],
usage: {
prompt_tokens: 12,
completion_tokens: 18,
total_tokens: 30,
completion_tokens_details: {
accepted_prediction_tokens: 0,
audio_tokens: 0,
reasoning_tokens: 0,
rejected_prediction_tokens: 0,
},
prompt_tokens_details: {
audio_tokens: 0,
cached_tokens: 0,
},
},
};
}

// Create a mock streaming response for responses API
async *_createResponsesApiStream(params) {
// Response created event
yield {
type: 'response.created',
response: {
id: 'resp_stream_456',
object: 'response',
created_at: 1677652310,
model: params.model,
status: 'in_progress',
error: null,
incomplete_details: null,
instructions: params.instructions,
max_output_tokens: 1000,
parallel_tool_calls: false,
previous_response_id: null,
reasoning: {
effort: null,
summary: null,
},
store: false,
temperature: 0.7,
text: {
format: {
type: 'text',
},
},
tool_choice: 'auto',
tools: [],
top_p: 1.0,
truncation: 'disabled',
user: null,
metadata: {},
output: [],
output_text: '',
usage: {
input_tokens: 0,
output_tokens: 0,
total_tokens: 0,
},
},
sequence_number: 1,
};

// Response in progress with output text delta
yield {
type: 'response.output_text.delta',
delta: 'Streaming response to: ',
sequence_number: 2,
};

yield {
type: 'response.output_text.delta',
delta: params.input,
sequence_number: 3,
};

// Response completed event
yield {
type: 'response.completed',
response: {
id: 'resp_stream_456',
object: 'response',
created_at: 1677652310,
model: params.model,
status: 'completed',
error: null,
incomplete_details: null,
instructions: params.instructions,
max_output_tokens: 1000,
parallel_tool_calls: false,
previous_response_id: null,
reasoning: {
effort: null,
summary: null,
},
store: false,
temperature: 0.7,
text: {
format: {
type: 'text',
},
},
tool_choice: 'auto',
tools: [],
top_p: 1.0,
truncation: 'disabled',
user: null,
metadata: {},
output: [],
output_text: params.input,
usage: {
input_tokens: 6,
output_tokens: 10,
total_tokens: 16,
},
},
sequence_number: 4,
};
}
}

async function run() {
Expand Down Expand Up @@ -102,6 +269,51 @@ async function run() {
} catch {
// Error is expected and handled
}

// Fourth test: chat completions streaming
const stream1 = await client.chat.completions.create({
model: 'gpt-4',
messages: [
{ role: 'system', content: 'You are a helpful assistant.' },
{ role: 'user', content: 'Tell me about streaming' },
],
stream: true,
temperature: 0.8,
});

// Consume the stream to trigger span instrumentation
for await (const chunk of stream1) {
// Stream chunks are processed automatically by instrumentation
void chunk; // Prevent unused variable warning
}

// Fifth test: responses API streaming
const stream2 = await client.responses.create({
model: 'gpt-4',
input: 'Test streaming responses API',
instructions: 'You are a streaming assistant',
stream: true,
});

for await (const chunk of stream2) {
void chunk;
}

// Sixth test: error handling in streaming context
try {
const errorStream = await client.chat.completions.create({
model: 'error-model',
messages: [{ role: 'user', content: 'This will fail' }],
stream: true,
});

// Try to consume the stream (this should not execute)
for await (const chunk of errorStream) {
void chunk;
}
} catch {
// Error is expected and handled
}
});
}

Expand Down
Loading