From a2f938c23f8df5145c5de95d945387c3e0a6b569 Mon Sep 17 00:00:00 2001 From: Javi Pacheco Date: Thu, 6 Jun 2024 16:40:07 +0200 Subject: [PATCH] OpenTelemetry improved --- .../functional/xef/llm/MetricManagement.kt | 48 ++-- .../xef/llm/assistants/AssistantThread.kt | 95 +++++--- .../functional/xef/metrics/LogsMetric.kt | 52 +---- .../xebia/functional/xef/metrics/Metric.kt | 46 +--- .../xebia/functional/xef/assistants/DSL.kt | 5 +- .../OpenTelemetryAssistantState.kt | 221 +++++++----------- .../xef/opentelemetry/OpenTelemetryMetric.kt | 29 +-- 7 files changed, 207 insertions(+), 289 deletions(-) diff --git a/core/src/commonMain/kotlin/com/xebia/functional/xef/llm/MetricManagement.kt b/core/src/commonMain/kotlin/com/xebia/functional/xef/llm/MetricManagement.kt index c98eb8ae6..d6b7940b8 100644 --- a/core/src/commonMain/kotlin/com/xebia/functional/xef/llm/MetricManagement.kt +++ b/core/src/commonMain/kotlin/com/xebia/functional/xef/llm/MetricManagement.kt @@ -1,6 +1,7 @@ package com.xebia.functional.xef.llm import com.xebia.functional.openai.generated.model.CreateChatCompletionResponse +import com.xebia.functional.openai.generated.model.MessageObject import com.xebia.functional.openai.generated.model.RunObject import com.xebia.functional.openai.generated.model.RunStepObject import com.xebia.functional.xef.conversation.Conversation @@ -60,33 +61,42 @@ suspend fun Prompt.addMetrics(conversation: Conversation) { conversation.metric.parameter("openai.chat_completion.functions", functions.map { it.name }) } -suspend fun RunObject.addMetrics(metric: Metric): RunObject { - metric.assistantCreateRun(this) +suspend fun RunObject.addMetrics(metric: Metric, source: String): RunObject { + metric.assistantCreateRun(this, source) return this } -suspend fun RunStepObject.addMetrics(metric: Metric): RunStepObject { - metric.assistantCreateRunStep(this) +suspend fun RunStepObject.addMetrics(metric: Metric, source: String): RunStepObject { + metric.assistantCreateRunStep(this, source) + return this +} + +suspend fun MessageObject.addMetrics(metric: Metric, source: String): MessageObject { + metric.assistantCreatedMessage(this, source) return this } suspend fun RunDelta.addMetrics(metric: Metric): RunDelta { when (this) { - is RunDelta.RunCancelled -> run.addMetrics(metric) - is RunDelta.RunCancelling -> run.addMetrics(metric) - is RunDelta.RunCompleted -> run.addMetrics(metric) - is RunDelta.RunCreated -> run.addMetrics(metric) - is RunDelta.RunExpired -> run.addMetrics(metric) - is RunDelta.RunFailed -> run.addMetrics(metric) - is RunDelta.RunInProgress -> run.addMetrics(metric) - is RunDelta.RunQueued -> run.addMetrics(metric) - is RunDelta.RunRequiresAction -> run.addMetrics(metric) - is RunDelta.RunStepCancelled -> runStep.addMetrics(metric) - is RunDelta.RunStepCompleted -> runStep.addMetrics(metric) - is RunDelta.RunStepCreated -> runStep.addMetrics(metric) - is RunDelta.RunStepExpired -> runStep.addMetrics(metric) - is RunDelta.RunStepFailed -> runStep.addMetrics(metric) - is RunDelta.RunStepInProgress -> runStep.addMetrics(metric) + is RunDelta.RunCancelled -> run.addMetrics(metric, "RunCancelled") + is RunDelta.RunCancelling -> run.addMetrics(metric, "RunCancelling") + is RunDelta.RunCompleted -> run.addMetrics(metric, "RunCompleted") + is RunDelta.RunCreated -> run.addMetrics(metric, "RunCreated") + is RunDelta.RunExpired -> run.addMetrics(metric, "RunExpired") + is RunDelta.RunFailed -> run.addMetrics(metric, "RunFailed") + is RunDelta.RunInProgress -> run.addMetrics(metric, "RunInProgress") + is RunDelta.RunQueued -> run.addMetrics(metric, "RunQueued") + is RunDelta.RunRequiresAction -> run.addMetrics(metric, "RunRequiresAction") + is RunDelta.RunStepCancelled -> runStep.addMetrics(metric, "RunStepCancelled") + is RunDelta.RunStepCompleted -> runStep.addMetrics(metric, "RunStepCompleted") + is RunDelta.RunStepCreated -> runStep.addMetrics(metric, "RunStepCreated") + is RunDelta.RunStepExpired -> runStep.addMetrics(metric, "RunStepExpired") + is RunDelta.RunStepFailed -> runStep.addMetrics(metric, "RunStepFailed") + is RunDelta.RunStepInProgress -> runStep.addMetrics(metric, "RunStepInProgress") + is RunDelta.MessageCreated -> message.addMetrics(metric, "MessageCreated") + is RunDelta.MessageIncomplete -> message.addMetrics(metric, "MessageIncomplete") + is RunDelta.MessageCompleted -> message.addMetrics(metric, "MessageCompleted") + is RunDelta.MessageInProgress -> message.addMetrics(metric, "MessageInProgress") else -> {} // ignore other cases } return this diff --git a/core/src/commonMain/kotlin/com/xebia/functional/xef/llm/assistants/AssistantThread.kt b/core/src/commonMain/kotlin/com/xebia/functional/xef/llm/assistants/AssistantThread.kt index 37b442da9..42da7e50c 100644 --- a/core/src/commonMain/kotlin/com/xebia/functional/xef/llm/assistants/AssistantThread.kt +++ b/core/src/commonMain/kotlin/com/xebia/functional/xef/llm/assistants/AssistantThread.kt @@ -76,7 +76,7 @@ class AssistantThread( createRun(CreateRunRequest(assistantId = assistant.assistantId)) suspend fun createRun(request: CreateRunRequest): RunObject = - api.createRun(threadId, request, configure = ::defaultConfig).addMetrics(metric) + api.createRun(threadId, request, configure = ::defaultConfig).addMetrics(metric, "RunCreated") fun createRunStream(assistant: Assistant, request: CreateRunRequest): Flow = flow { api @@ -122,38 +122,69 @@ class AssistantThread( ) } ) - val run = - metric.assistantToolOutputsRun(event.run.id) { - api - .submitToolOuputsToRunStream( - threadId = threadId, - runId = event.run.id, - submitToolOutputsRunRequest = toolOutputsRequest, - configure = ::defaultConfig - ) - .collect { - val delta = RunDelta.fromServerSentEvent(it) - if (delta is RunDelta.RunStepCompleted) { - flowCollector.emit(RunDelta.RunSubmitToolOutputs(toolOutputsRequest)) - } - flowCollector.emit(delta) - } - val run = getRun(event.run.id) - val finalEvent = - when (run.status) { - RunObject.Status.queued -> RunDelta.RunQueued(run) - RunObject.Status.in_progress -> RunDelta.RunInProgress(run) - RunObject.Status.requires_action -> RunDelta.RunRequiresAction(run) - RunObject.Status.cancelling -> RunDelta.RunCancelling(run) - RunObject.Status.cancelled -> RunDelta.RunCancelled(run) - RunObject.Status.failed -> RunDelta.RunFailed(run) - RunObject.Status.completed -> RunDelta.RunCompleted(run) - RunObject.Status.expired -> RunDelta.RunExpired(run) - RunObject.Status.incomplete -> RunDelta.RunIncomplete(run) - } - flowCollector.emit(finalEvent) - run + + api + .submitToolOuputsToRunStream( + threadId = threadId, + runId = event.run.id, + submitToolOutputsRunRequest = toolOutputsRequest, + configure = ::defaultConfig + ) + .collect { + val delta = RunDelta.fromServerSentEvent(it) + + when (delta) { + is RunDelta.RunCreated -> Pair(delta.run, "RunCreated") + is RunDelta.RunQueued -> Pair(delta.run, "RunQueued") + is RunDelta.RunFailed -> Pair(delta.run, "RunFailed") + is RunDelta.RunCancelled -> Pair(delta.run, "RunCancelled") + is RunDelta.RunCancelling -> Pair(delta.run, "RunCancelling") + is RunDelta.RunExpired -> Pair(delta.run, "RunExpired") + is RunDelta.RunInProgress -> Pair(delta.run, "RunInProgress") + is RunDelta.RunIncomplete -> Pair(delta.run, "RunIncomplete") + else -> null + }?.let { metric.assistantCreateRun(it.first, it.second) } + + when (delta) { + is RunDelta.RunStepCreated -> Pair(delta.runStep, "RunStepCreated") + is RunDelta.RunStepInProgress -> Pair(delta.runStep, "RunStepInProgress") + is RunDelta.RunStepCompleted -> Pair(delta.runStep, "RunStepCompleted") + is RunDelta.RunStepFailed -> Pair(delta.runStep, "RunStepFailed") + is RunDelta.RunStepCancelled -> Pair(delta.runStep, "RunStepCancelled") + is RunDelta.RunStepExpired -> Pair(delta.runStep, "RunStepExpired") + else -> null + }?.let { metric.assistantCreateRunStep(it.first, it.second) } + + when (delta) { + is RunDelta.MessageCreated -> Pair(delta.message, "MessageCreated") + is RunDelta.MessageInProgress -> Pair(delta.message, "MessageInProgress") + is RunDelta.MessageIncomplete -> Pair(delta.message, "MessageIncomplete") + is RunDelta.MessageCompleted -> Pair(delta.message, "MessageCompleted") + else -> null + }?.let { metric.assistantCreatedMessage(it.first, it.second) } + + if (delta is RunDelta.RunStepCompleted) { + flowCollector.emit(RunDelta.RunSubmitToolOutputs(toolOutputsRequest)) + } + flowCollector.emit(delta) + } + + val run = getRun(event.run.id) + val finalEvent = + when (run.status) { + RunObject.Status.queued -> Pair(RunDelta.RunQueued(run), "RunQueued") + RunObject.Status.in_progress -> Pair(RunDelta.RunInProgress(run), "RunInProgress") + RunObject.Status.requires_action -> + Pair(RunDelta.RunRequiresAction(run), "RunRequiresAction") + RunObject.Status.cancelling -> Pair(RunDelta.RunCancelling(run), "RunCancelling") + RunObject.Status.cancelled -> Pair(RunDelta.RunCancelled(run), "RunCancelled") + RunObject.Status.failed -> Pair(RunDelta.RunFailed(run), "RunFailed") + RunObject.Status.completed -> Pair(RunDelta.RunCompleted(run), "RunCompleted") + RunObject.Status.expired -> Pair(RunDelta.RunExpired(run), "RunExpired") + RunObject.Status.incomplete -> Pair(RunDelta.RunIncomplete(run), "RunIncomplete") } + flowCollector.emit(finalEvent.first) + metric.assistantCreateRun(run, finalEvent.second) if (run.status == RunObject.Status.requires_action) { takeRequiredAction( diff --git a/core/src/commonMain/kotlin/com/xebia/functional/xef/metrics/LogsMetric.kt b/core/src/commonMain/kotlin/com/xebia/functional/xef/metrics/LogsMetric.kt index b5bd1562f..4822be2b4 100644 --- a/core/src/commonMain/kotlin/com/xebia/functional/xef/metrics/LogsMetric.kt +++ b/core/src/commonMain/kotlin/com/xebia/functional/xef/metrics/LogsMetric.kt @@ -43,7 +43,7 @@ class LogsMetric(private val level: Level = Level.INFO) : Metric { return output } - override suspend fun assistantCreateRun(runObject: RunObject) { + override suspend fun assistantCreateRun(runObject: RunObject, source: String) { logger.at(level) { this.message = "${writeIndent(numberOfBlocks.get())}|-- AssistantId: ${runObject.assistantId}" } @@ -58,16 +58,7 @@ class LogsMetric(private val level: Level = Level.INFO) : Metric { } } - override suspend fun assistantCreateRun( - runId: String, - block: suspend Metric.() -> RunObject - ): RunObject { - val output = block() - assistantCreateRun(output) - return output - } - - override suspend fun assistantCreateRunStep(runObject: RunStepObject) { + override suspend fun assistantCreateRunStep(runObject: RunStepObject, source: String) { logger.at(level) { this.message = "${writeIndent(numberOfBlocks.get())}|-- AssistantId: ${runObject.assistantId}" } @@ -82,44 +73,23 @@ class LogsMetric(private val level: Level = Level.INFO) : Metric { } } - override suspend fun assistantCreatedMessage( - runId: String, - block: suspend Metric.() -> List - ): List { - val output = block() - logger.at(level) { - this.message = "${writeIndent(numberOfBlocks.get())}|-- Size: ${output.size}" - } - return output - } - - override suspend fun assistantCreateRunStep( - runId: String, - block: suspend Metric.() -> RunStepObject - ): RunStepObject { - val output = block() + override suspend fun assistantCreatedMessage(messageObject: MessageObject, source: String) { logger.at(level) { - this.message = "${writeIndent(numberOfBlocks.get())}|-- AssistantId: ${output.assistantId}" + this.message = + "${writeIndent(numberOfBlocks.get())}|-- AssistantId: ${messageObject.assistantId}" } logger.at(level) { - this.message = "${writeIndent(numberOfBlocks.get())}|-- ThreadId: ${output.threadId}" + this.message = "${writeIndent(numberOfBlocks.get())}|-- ThreadId: ${messageObject.threadId}" } logger.at(level) { - this.message = "${writeIndent(numberOfBlocks.get())}|-- RunId: ${output.runId}" + this.message = "${writeIndent(numberOfBlocks.get())}|-- RunId: ${messageObject.id}" } logger.at(level) { - this.message = "${writeIndent(numberOfBlocks.get())}|-- Status: ${output.status.name}" + if (messageObject.status != null) { + this.message = + "${writeIndent(numberOfBlocks.get())}|-- Status: ${messageObject.status!!.name}" + } } - return output - } - - override suspend fun assistantToolOutputsRun( - runId: String, - block: suspend Metric.() -> RunObject - ): RunObject { - val output = block() - assistantCreateRun(output) - return output } override suspend fun event(message: String) { diff --git a/core/src/commonMain/kotlin/com/xebia/functional/xef/metrics/Metric.kt b/core/src/commonMain/kotlin/com/xebia/functional/xef/metrics/Metric.kt index 62661f070..aa7809f62 100644 --- a/core/src/commonMain/kotlin/com/xebia/functional/xef/metrics/Metric.kt +++ b/core/src/commonMain/kotlin/com/xebia/functional/xef/metrics/Metric.kt @@ -16,26 +16,11 @@ interface Metric { suspend fun parameter(key: String, values: List) - suspend fun assistantCreateRun(runObject: RunObject) + suspend fun assistantCreateRun(runObject: RunObject, source: String) - suspend fun assistantCreateRun(runId: String, block: suspend Metric.() -> RunObject): RunObject + suspend fun assistantCreateRunStep(runObject: RunStepObject, source: String) - suspend fun assistantCreateRunStep(runObject: RunStepObject) - - suspend fun assistantCreatedMessage( - runId: String, - block: suspend Metric.() -> List - ): List - - suspend fun assistantCreateRunStep( - runId: String, - block: suspend Metric.() -> RunStepObject - ): RunStepObject - - suspend fun assistantToolOutputsRun( - runId: String, - block: suspend Metric.() -> RunObject - ): RunObject + suspend fun assistantCreatedMessage(messageObject: MessageObject, source: String) companion object { val EMPTY: Metric = @@ -46,29 +31,14 @@ interface Metric { override suspend fun promptSpan(prompt: Prompt, block: suspend Metric.() -> A): A = block() - override suspend fun assistantCreateRun(runObject: RunObject) {} - - override suspend fun assistantCreateRun( - runId: String, - block: suspend Metric.() -> RunObject - ): RunObject = block() + override suspend fun assistantCreateRun(runObject: RunObject, source: String) {} - override suspend fun assistantCreateRunStep(runObject: RunStepObject) {} + override suspend fun assistantCreateRunStep(runObject: RunStepObject, source: String) {} override suspend fun assistantCreatedMessage( - runId: String, - block: suspend Metric.() -> List - ): List = block() - - override suspend fun assistantCreateRunStep( - runId: String, - block: suspend Metric.() -> RunStepObject - ): RunStepObject = block() - - override suspend fun assistantToolOutputsRun( - runId: String, - block: suspend Metric.() -> RunObject - ): RunObject = block() + messageObject: MessageObject, + source: String + ) {} override suspend fun event(message: String) {} diff --git a/examples/src/main/kotlin/com/xebia/functional/xef/assistants/DSL.kt b/examples/src/main/kotlin/com/xebia/functional/xef/assistants/DSL.kt index dbeed2ef6..b198de4e1 100644 --- a/examples/src/main/kotlin/com/xebia/functional/xef/assistants/DSL.kt +++ b/examples/src/main/kotlin/com/xebia/functional/xef/assistants/DSL.kt @@ -5,7 +5,6 @@ import com.xebia.functional.xef.llm.assistants.Assistant import com.xebia.functional.xef.llm.assistants.AssistantThread import com.xebia.functional.xef.llm.assistants.RunDelta import com.xebia.functional.xef.llm.assistants.Tool -import com.xebia.functional.xef.metrics.Metric import kotlinx.serialization.Serializable @Serializable data class SumInput(val left: Int, val right: Int) @@ -42,8 +41,8 @@ suspend fun main() { // - # cd server/docker/opentelemetry // - # docker-compose up - val metric = Metric.EMPTY - // val metric = com.xebia.functional.xef.opentelemetry.OpenTelemetryMetric() + // val metric = Metric.EMPTY + val metric = com.xebia.functional.xef.opentelemetry.OpenTelemetryMetric() val assistant = Assistant( diff --git a/integrations/opentelemetry/src/main/kotlin/com/xebia/functional/xef/opentelemetry/OpenTelemetryAssistantState.kt b/integrations/opentelemetry/src/main/kotlin/com/xebia/functional/xef/opentelemetry/OpenTelemetryAssistantState.kt index 787e0a871..616c17a75 100644 --- a/integrations/opentelemetry/src/main/kotlin/com/xebia/functional/xef/opentelemetry/OpenTelemetryAssistantState.kt +++ b/integrations/opentelemetry/src/main/kotlin/com/xebia/functional/xef/opentelemetry/OpenTelemetryAssistantState.kt @@ -8,149 +8,107 @@ import io.opentelemetry.context.Context class OpenTelemetryAssistantState(private val tracer: Tracer) { - private val runIds: MutableMap = mutableMapOf() + private val runStartedSource = "RunCreated" - fun runSpan(runObject: RunObject) { + private val runFinishedSources = + setOf("RunCompleted", "RunCancelled", "RunFailed", "RunIncomplete", "RunExpired") - val parentOrRoot: Context = runObject.id.getOrCreateContext() + private val runStepStartedSource = "RunStepCreated" - val currentSpan = - tracer - .spanBuilder(runObject.status.name) - .setParent(parentOrRoot) - .setSpanKind(SpanKind.CLIENT) - .startSpan() + private val runStepFinishedSources = + setOf("RunStepCompleted", "RunStepCancelled", "RunStepFailed", "RunStepExpired") - try { - currentSpan.makeCurrent().use { runObject.setParameters(currentSpan) } - } finally { - currentSpan.end() - } - } + private val messageStartedSource = "MessageCreated" - suspend fun runSpan(runId: String, block: suspend () -> RunObject): RunObject { + private val messageFinishedSources = setOf("MessageCompleted", "MessageIncomplete") - val parentOrRoot: Context = runId.getOrCreateContext() - - val currentSpan = - tracer - .spanBuilder("New Run: $runId") - .setParent(parentOrRoot) - .setSpanKind(SpanKind.CLIENT) - .startSpan() - - return try { - val output = block() - currentSpan.makeCurrent().use { - currentSpan.updateName(output.status.name) - output.setParameters(currentSpan) - } - output - } finally { - currentSpan.end() - } - } + private val runIds: MutableMap = mutableMapOf() - suspend fun toolOutputRunSpan(runId: String, block: suspend () -> RunObject): RunObject { + private val runSpans: MutableMap = mutableMapOf() - val parentOrRoot: Context = runId.getOrCreateContext() + private val runStepsSpans: MutableMap = mutableMapOf() - val currentSpan = - tracer - .spanBuilder("New ToolOutput: $runId") - .setParent(parentOrRoot) - .setSpanKind(SpanKind.CLIENT) - .startSpan() - - return try { - val output = block() - currentSpan.makeCurrent().use { - currentSpan.updateName("ToolOutput: ${output.status.name}") - output.setParameters(currentSpan) - } - output - } finally { - currentSpan.end() - } - } + private val messagesSpans: MutableMap = mutableMapOf() - fun runStepSpan(runObject: RunStepObject) { + fun runSpan(runObject: RunObject, source: String) { - val parentOrRoot: Context = runObject.runId.getOrCreateContext() + val parentOrRoot: Context = runObject.id.getOrCreateContext() val currentSpan = - tracer - .spanBuilder("step ${runObject.status.name} ${runObject.id}") - .setParent(parentOrRoot) - .setSpanKind(SpanKind.CLIENT) - .startSpan() + tracer.spanBuilder(source).setParent(parentOrRoot).setSpanKind(SpanKind.CLIENT).startSpan() + + if (source == runStartedSource) { + runSpans[runObject.id] = currentSpan + } try { currentSpan.makeCurrent().use { runObject.setParameters(currentSpan) } } finally { - currentSpan.end() + if (runFinishedSources.contains(source) && runSpans[runObject.id] != null) { + runSpans[runObject.id]!!.let { + runObject.setParameters(it) + it.updateName("RunCreated -> $source ${runObject.id}") + it.end() + runSpans.remove(runObject.id) + } + } else if (source != runStartedSource) { + currentSpan.end() + } } } - suspend fun runStepSpan(runId: String, block: suspend () -> RunStepObject): RunStepObject { + fun runStepSpan(runStepObject: RunStepObject, source: String) { - val parentOrRoot: Context = runId.getOrCreateContext() + val parentOrRoot: Context = runStepObject.runId.getOrCreateContext() val currentSpan = - tracer - .spanBuilder("New RunStep: $runId") - .setParent(parentOrRoot) - .setSpanKind(SpanKind.CLIENT) - .startSpan() - - return try { - val output = block() - currentSpan.makeCurrent().use { - when (val detail = output.stepDetails) { - is RunStepObjectStepDetails.CaseRunStepDetailsMessageCreationObject -> - currentSpan.updateName("Creating message: ${output.status.name}") - is RunStepObjectStepDetails.CaseRunStepDetailsToolCallsObject -> - currentSpan.updateName( - "Tools: ${detail.value.toolCalls.joinToString { - when (it) { - is RunStepDetailsToolCallsObjectToolCallsInner.CaseRunStepDetailsToolCallsCodeObject -> it.value.type.name - is RunStepDetailsToolCallsObjectToolCallsInner.CaseRunStepDetailsToolCallsFunctionObject -> it.value.function.name ?: "" - is RunStepDetailsToolCallsObjectToolCallsInner.CaseRunStepDetailsToolCallsFileSearchObject -> it.value.type.name - } - }}: ${output.status.name}" - ) + tracer.spanBuilder(source).setParent(parentOrRoot).setSpanKind(SpanKind.CLIENT).startSpan() + + if (source == runStepStartedSource) { + runStepsSpans[runStepObject.id] = currentSpan + } + + try { + currentSpan.makeCurrent().use { runStepObject.setParameters(currentSpan, source) } + } finally { + if (runStepFinishedSources.contains(source) && runStepsSpans[runStepObject.id] != null) { + runStepsSpans[runStepObject.id]!!.let { + runStepObject.setParameters(it, source) + it.updateName("RunStepCreated -> $source ${runStepObject.id}") + it.end() + runStepsSpans.remove(runStepObject.id) } - output.setParameters(currentSpan) + } else if (source != runStepStartedSource) { + currentSpan.end() } - output - } finally { - currentSpan.end() } } - suspend fun createdMessagesSpan( - runId: String, - block: suspend () -> List - ): List { + fun createdMessagesSpan(messageObject: MessageObject, source: String) { + val runId = messageObject.runId ?: return val parentOrRoot: Context = runId.getOrCreateContext() val currentSpan = - tracer - .spanBuilder("New Run: $runId") - .setParent(parentOrRoot) - .setSpanKind(SpanKind.CLIENT) - .startSpan() - - return try { - val output = block() - currentSpan.makeCurrent().use { - currentSpan.updateName("Messages: ${output.size}") - output.setParameters(currentSpan) - } - output + tracer.spanBuilder(source).setParent(parentOrRoot).setSpanKind(SpanKind.CLIENT).startSpan() + + if (source == messageStartedSource) { + messagesSpans[messageObject.id] = currentSpan + } + + try { + currentSpan.makeCurrent().use { messageObject.setParameters(currentSpan, source) } } finally { - currentSpan.end() + if (messageFinishedSources.contains(source) && messagesSpans[messageObject.id] != null) { + messagesSpans[messageObject.id]!!.let { + messageObject.setParameters(it, source) + it.updateName("MessageCreated -> $source ${messageObject.id}") + it.end() + messagesSpans.remove(messageObject.id) + } + } else if (source != messageStartedSource) { + currentSpan.end() + } } } @@ -179,7 +137,8 @@ class OpenTelemetryAssistantState(private val tracer: Tracer) { } } - private fun RunStepObject.setParameters(span: Span) { + private fun RunStepObject.setParameters(span: Span, source: String) { + span.setAttribute("openai.assistant.source", source) span.setAttribute("openai.assistant.type", type.name) span.setAttribute("openai.assistant.thread.id", threadId) span.setAttribute("openai.assistant.assistant.id", assistantId) @@ -224,30 +183,24 @@ class OpenTelemetryAssistantState(private val tracer: Tracer) { } } - private fun List.setParameters(span: Span) { - span.setAttribute("openai.assistant.messages.count", size.toString()) - forEach { - span.setAttribute("openai.assistant.messages.${indexOf(it)}.role", it.role.name) - when (val inner = it.content.firstOrNull()) { - is MessageObjectContentInner.CaseMessageContentImageFileObject -> { - span.setAttribute( - "openai.assistant.messages.${indexOf(it)}.content", - inner.value.imageFile.fileId - ) - } - is MessageObjectContentInner.CaseMessageContentTextObject -> { - span.setAttribute( - "openai.assistant.messages.${indexOf(it)}.content", - inner.value.text.value - ) - } - is MessageObjectContentInner.CaseMessageContentImageUrlObject -> - span.setAttribute( - "openai.assistant.messages.${indexOf(it)}.content", - inner.value.imageUrl.url - ) - null -> {} + private fun MessageObject.setParameters(span: Span, source: String) { + span.setAttribute("openai.assistant.message.source", source) + span.setAttribute("openai.assistant.message.role", role.name) + span.setAttribute("openai.assistant.message.thread.id", threadId) + assistantId?.let { span.setAttribute("openai.assistant.message.assistant.id", it) } + runId?.let { span.setAttribute("openai.assistant.message.run.id", it) } + span.setAttribute("openai.assistant.message.id", id) + status?.let { span.setAttribute("openai.assistant.message.status", it.name) } + when (val inner = content.firstOrNull()) { + is MessageObjectContentInner.CaseMessageContentImageFileObject -> { + span.setAttribute("openai.assistant.message.content", inner.value.imageFile.fileId) + } + is MessageObjectContentInner.CaseMessageContentTextObject -> { + span.setAttribute("openai.assistant.message.content", inner.value.text.value) } + is MessageObjectContentInner.CaseMessageContentImageUrlObject -> + span.setAttribute("openai.assistant.message.content", inner.value.imageUrl.url) + null -> {} } } } diff --git a/integrations/opentelemetry/src/main/kotlin/com/xebia/functional/xef/opentelemetry/OpenTelemetryMetric.kt b/integrations/opentelemetry/src/main/kotlin/com/xebia/functional/xef/opentelemetry/OpenTelemetryMetric.kt index 3ee6c4728..c1571116c 100644 --- a/integrations/opentelemetry/src/main/kotlin/com/xebia/functional/xef/opentelemetry/OpenTelemetryMetric.kt +++ b/integrations/opentelemetry/src/main/kotlin/com/xebia/functional/xef/opentelemetry/OpenTelemetryMetric.kt @@ -36,30 +36,15 @@ class OpenTelemetryMetric( state.setAttribute(key, values) } - override suspend fun assistantCreateRun(runObject: RunObject) = assistantState.runSpan(runObject) + override suspend fun assistantCreateRun(runObject: RunObject, source: String) = + assistantState.runSpan(runObject, source) - override suspend fun assistantCreateRun( - runId: String, - block: suspend Metric.() -> RunObject - ): RunObject = assistantState.runSpan(runId) { block() } + override suspend fun assistantCreateRunStep(runObject: RunStepObject, source: String) = + assistantState.runStepSpan(runObject, source) - override suspend fun assistantCreateRunStep(runObject: RunStepObject) = - assistantState.runStepSpan(runObject) - - override suspend fun assistantCreatedMessage( - runId: String, - block: suspend Metric.() -> List - ): List = assistantState.createdMessagesSpan(runId) { block() } - - override suspend fun assistantCreateRunStep( - runId: String, - block: suspend Metric.() -> RunStepObject - ): RunStepObject = assistantState.runStepSpan(runId) { block() } - - override suspend fun assistantToolOutputsRun( - runId: String, - block: suspend Metric.() -> RunObject - ): RunObject = assistantState.toolOutputRunSpan(runId) { block() } + override suspend fun assistantCreatedMessage(messageObject: MessageObject, source: String) { + assistantState.createdMessagesSpan(messageObject, source) + } private fun getTracer(scopeName: String? = null): Tracer = openTelemetry.getTracer(scopeName ?: config.defaultScopeName)