Skip to content

Commit

Permalink
OpenTelemetry improved
Browse files Browse the repository at this point in the history
  • Loading branch information
javipacheco committed Jun 6, 2024
1 parent 0920656 commit a2f938c
Show file tree
Hide file tree
Showing 7 changed files with 207 additions and 289 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.xebia.functional.xef.llm

import com.xebia.functional.openai.generated.model.CreateChatCompletionResponse
import com.xebia.functional.openai.generated.model.MessageObject
import com.xebia.functional.openai.generated.model.RunObject
import com.xebia.functional.openai.generated.model.RunStepObject
import com.xebia.functional.xef.conversation.Conversation
Expand Down Expand Up @@ -60,33 +61,42 @@ suspend fun Prompt.addMetrics(conversation: Conversation) {
conversation.metric.parameter("openai.chat_completion.functions", functions.map { it.name })
}

suspend fun RunObject.addMetrics(metric: Metric): RunObject {
metric.assistantCreateRun(this)
suspend fun RunObject.addMetrics(metric: Metric, source: String): RunObject {
metric.assistantCreateRun(this, source)
return this
}

suspend fun RunStepObject.addMetrics(metric: Metric): RunStepObject {
metric.assistantCreateRunStep(this)
suspend fun RunStepObject.addMetrics(metric: Metric, source: String): RunStepObject {
metric.assistantCreateRunStep(this, source)
return this
}

suspend fun MessageObject.addMetrics(metric: Metric, source: String): MessageObject {
metric.assistantCreatedMessage(this, source)
return this
}

suspend fun RunDelta.addMetrics(metric: Metric): RunDelta {
when (this) {
is RunDelta.RunCancelled -> run.addMetrics(metric)
is RunDelta.RunCancelling -> run.addMetrics(metric)
is RunDelta.RunCompleted -> run.addMetrics(metric)
is RunDelta.RunCreated -> run.addMetrics(metric)
is RunDelta.RunExpired -> run.addMetrics(metric)
is RunDelta.RunFailed -> run.addMetrics(metric)
is RunDelta.RunInProgress -> run.addMetrics(metric)
is RunDelta.RunQueued -> run.addMetrics(metric)
is RunDelta.RunRequiresAction -> run.addMetrics(metric)
is RunDelta.RunStepCancelled -> runStep.addMetrics(metric)
is RunDelta.RunStepCompleted -> runStep.addMetrics(metric)
is RunDelta.RunStepCreated -> runStep.addMetrics(metric)
is RunDelta.RunStepExpired -> runStep.addMetrics(metric)
is RunDelta.RunStepFailed -> runStep.addMetrics(metric)
is RunDelta.RunStepInProgress -> runStep.addMetrics(metric)
is RunDelta.RunCancelled -> run.addMetrics(metric, "RunCancelled")
is RunDelta.RunCancelling -> run.addMetrics(metric, "RunCancelling")
is RunDelta.RunCompleted -> run.addMetrics(metric, "RunCompleted")
is RunDelta.RunCreated -> run.addMetrics(metric, "RunCreated")
is RunDelta.RunExpired -> run.addMetrics(metric, "RunExpired")
is RunDelta.RunFailed -> run.addMetrics(metric, "RunFailed")
is RunDelta.RunInProgress -> run.addMetrics(metric, "RunInProgress")
is RunDelta.RunQueued -> run.addMetrics(metric, "RunQueued")
is RunDelta.RunRequiresAction -> run.addMetrics(metric, "RunRequiresAction")
is RunDelta.RunStepCancelled -> runStep.addMetrics(metric, "RunStepCancelled")
is RunDelta.RunStepCompleted -> runStep.addMetrics(metric, "RunStepCompleted")
is RunDelta.RunStepCreated -> runStep.addMetrics(metric, "RunStepCreated")
is RunDelta.RunStepExpired -> runStep.addMetrics(metric, "RunStepExpired")
is RunDelta.RunStepFailed -> runStep.addMetrics(metric, "RunStepFailed")
is RunDelta.RunStepInProgress -> runStep.addMetrics(metric, "RunStepInProgress")
is RunDelta.MessageCreated -> message.addMetrics(metric, "MessageCreated")
is RunDelta.MessageIncomplete -> message.addMetrics(metric, "MessageIncomplete")
is RunDelta.MessageCompleted -> message.addMetrics(metric, "MessageCompleted")
is RunDelta.MessageInProgress -> message.addMetrics(metric, "MessageInProgress")
else -> {} // ignore other cases
}
return this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class AssistantThread(
createRun(CreateRunRequest(assistantId = assistant.assistantId))

suspend fun createRun(request: CreateRunRequest): RunObject =
api.createRun(threadId, request, configure = ::defaultConfig).addMetrics(metric)
api.createRun(threadId, request, configure = ::defaultConfig).addMetrics(metric, "RunCreated")

fun createRunStream(assistant: Assistant, request: CreateRunRequest): Flow<RunDelta> = flow {
api
Expand Down Expand Up @@ -122,38 +122,69 @@ class AssistantThread(
)
}
)
val run =
metric.assistantToolOutputsRun(event.run.id) {
api
.submitToolOuputsToRunStream(
threadId = threadId,
runId = event.run.id,
submitToolOutputsRunRequest = toolOutputsRequest,
configure = ::defaultConfig
)
.collect {
val delta = RunDelta.fromServerSentEvent(it)
if (delta is RunDelta.RunStepCompleted) {
flowCollector.emit(RunDelta.RunSubmitToolOutputs(toolOutputsRequest))
}
flowCollector.emit(delta)
}
val run = getRun(event.run.id)
val finalEvent =
when (run.status) {
RunObject.Status.queued -> RunDelta.RunQueued(run)
RunObject.Status.in_progress -> RunDelta.RunInProgress(run)
RunObject.Status.requires_action -> RunDelta.RunRequiresAction(run)
RunObject.Status.cancelling -> RunDelta.RunCancelling(run)
RunObject.Status.cancelled -> RunDelta.RunCancelled(run)
RunObject.Status.failed -> RunDelta.RunFailed(run)
RunObject.Status.completed -> RunDelta.RunCompleted(run)
RunObject.Status.expired -> RunDelta.RunExpired(run)
RunObject.Status.incomplete -> RunDelta.RunIncomplete(run)
}
flowCollector.emit(finalEvent)
run

api
.submitToolOuputsToRunStream(
threadId = threadId,
runId = event.run.id,
submitToolOutputsRunRequest = toolOutputsRequest,
configure = ::defaultConfig
)
.collect {
val delta = RunDelta.fromServerSentEvent(it)

when (delta) {
is RunDelta.RunCreated -> Pair(delta.run, "RunCreated")
is RunDelta.RunQueued -> Pair(delta.run, "RunQueued")
is RunDelta.RunFailed -> Pair(delta.run, "RunFailed")
is RunDelta.RunCancelled -> Pair(delta.run, "RunCancelled")
is RunDelta.RunCancelling -> Pair(delta.run, "RunCancelling")
is RunDelta.RunExpired -> Pair(delta.run, "RunExpired")
is RunDelta.RunInProgress -> Pair(delta.run, "RunInProgress")
is RunDelta.RunIncomplete -> Pair(delta.run, "RunIncomplete")
else -> null
}?.let { metric.assistantCreateRun(it.first, it.second) }

when (delta) {
is RunDelta.RunStepCreated -> Pair(delta.runStep, "RunStepCreated")
is RunDelta.RunStepInProgress -> Pair(delta.runStep, "RunStepInProgress")
is RunDelta.RunStepCompleted -> Pair(delta.runStep, "RunStepCompleted")
is RunDelta.RunStepFailed -> Pair(delta.runStep, "RunStepFailed")
is RunDelta.RunStepCancelled -> Pair(delta.runStep, "RunStepCancelled")
is RunDelta.RunStepExpired -> Pair(delta.runStep, "RunStepExpired")
else -> null
}?.let { metric.assistantCreateRunStep(it.first, it.second) }

when (delta) {
is RunDelta.MessageCreated -> Pair(delta.message, "MessageCreated")
is RunDelta.MessageInProgress -> Pair(delta.message, "MessageInProgress")
is RunDelta.MessageIncomplete -> Pair(delta.message, "MessageIncomplete")
is RunDelta.MessageCompleted -> Pair(delta.message, "MessageCompleted")
else -> null
}?.let { metric.assistantCreatedMessage(it.first, it.second) }

if (delta is RunDelta.RunStepCompleted) {
flowCollector.emit(RunDelta.RunSubmitToolOutputs(toolOutputsRequest))
}
flowCollector.emit(delta)
}

val run = getRun(event.run.id)
val finalEvent =
when (run.status) {
RunObject.Status.queued -> Pair(RunDelta.RunQueued(run), "RunQueued")
RunObject.Status.in_progress -> Pair(RunDelta.RunInProgress(run), "RunInProgress")
RunObject.Status.requires_action ->
Pair(RunDelta.RunRequiresAction(run), "RunRequiresAction")
RunObject.Status.cancelling -> Pair(RunDelta.RunCancelling(run), "RunCancelling")
RunObject.Status.cancelled -> Pair(RunDelta.RunCancelled(run), "RunCancelled")
RunObject.Status.failed -> Pair(RunDelta.RunFailed(run), "RunFailed")
RunObject.Status.completed -> Pair(RunDelta.RunCompleted(run), "RunCompleted")
RunObject.Status.expired -> Pair(RunDelta.RunExpired(run), "RunExpired")
RunObject.Status.incomplete -> Pair(RunDelta.RunIncomplete(run), "RunIncomplete")
}
flowCollector.emit(finalEvent.first)
metric.assistantCreateRun(run, finalEvent.second)

if (run.status == RunObject.Status.requires_action) {
takeRequiredAction(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class LogsMetric(private val level: Level = Level.INFO) : Metric {
return output
}

override suspend fun assistantCreateRun(runObject: RunObject) {
override suspend fun assistantCreateRun(runObject: RunObject, source: String) {
logger.at(level) {
this.message = "${writeIndent(numberOfBlocks.get())}|-- AssistantId: ${runObject.assistantId}"
}
Expand All @@ -58,16 +58,7 @@ class LogsMetric(private val level: Level = Level.INFO) : Metric {
}
}

override suspend fun assistantCreateRun(
runId: String,
block: suspend Metric.() -> RunObject
): RunObject {
val output = block()
assistantCreateRun(output)
return output
}

override suspend fun assistantCreateRunStep(runObject: RunStepObject) {
override suspend fun assistantCreateRunStep(runObject: RunStepObject, source: String) {
logger.at(level) {
this.message = "${writeIndent(numberOfBlocks.get())}|-- AssistantId: ${runObject.assistantId}"
}
Expand All @@ -82,44 +73,23 @@ class LogsMetric(private val level: Level = Level.INFO) : Metric {
}
}

override suspend fun assistantCreatedMessage(
runId: String,
block: suspend Metric.() -> List<MessageObject>
): List<MessageObject> {
val output = block()
logger.at(level) {
this.message = "${writeIndent(numberOfBlocks.get())}|-- Size: ${output.size}"
}
return output
}

override suspend fun assistantCreateRunStep(
runId: String,
block: suspend Metric.() -> RunStepObject
): RunStepObject {
val output = block()
override suspend fun assistantCreatedMessage(messageObject: MessageObject, source: String) {
logger.at(level) {
this.message = "${writeIndent(numberOfBlocks.get())}|-- AssistantId: ${output.assistantId}"
this.message =
"${writeIndent(numberOfBlocks.get())}|-- AssistantId: ${messageObject.assistantId}"
}
logger.at(level) {
this.message = "${writeIndent(numberOfBlocks.get())}|-- ThreadId: ${output.threadId}"
this.message = "${writeIndent(numberOfBlocks.get())}|-- ThreadId: ${messageObject.threadId}"
}
logger.at(level) {
this.message = "${writeIndent(numberOfBlocks.get())}|-- RunId: ${output.runId}"
this.message = "${writeIndent(numberOfBlocks.get())}|-- RunId: ${messageObject.id}"
}
logger.at(level) {
this.message = "${writeIndent(numberOfBlocks.get())}|-- Status: ${output.status.name}"
if (messageObject.status != null) {
this.message =
"${writeIndent(numberOfBlocks.get())}|-- Status: ${messageObject.status!!.name}"
}
}
return output
}

override suspend fun assistantToolOutputsRun(
runId: String,
block: suspend Metric.() -> RunObject
): RunObject {
val output = block()
assistantCreateRun(output)
return output
}

override suspend fun event(message: String) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,11 @@ interface Metric {

suspend fun parameter(key: String, values: List<String>)

suspend fun assistantCreateRun(runObject: RunObject)
suspend fun assistantCreateRun(runObject: RunObject, source: String)

suspend fun assistantCreateRun(runId: String, block: suspend Metric.() -> RunObject): RunObject
suspend fun assistantCreateRunStep(runObject: RunStepObject, source: String)

suspend fun assistantCreateRunStep(runObject: RunStepObject)

suspend fun assistantCreatedMessage(
runId: String,
block: suspend Metric.() -> List<MessageObject>
): List<MessageObject>

suspend fun assistantCreateRunStep(
runId: String,
block: suspend Metric.() -> RunStepObject
): RunStepObject

suspend fun assistantToolOutputsRun(
runId: String,
block: suspend Metric.() -> RunObject
): RunObject
suspend fun assistantCreatedMessage(messageObject: MessageObject, source: String)

companion object {
val EMPTY: Metric =
Expand All @@ -46,29 +31,14 @@ interface Metric {
override suspend fun <A> promptSpan(prompt: Prompt, block: suspend Metric.() -> A): A =
block()

override suspend fun assistantCreateRun(runObject: RunObject) {}

override suspend fun assistantCreateRun(
runId: String,
block: suspend Metric.() -> RunObject
): RunObject = block()
override suspend fun assistantCreateRun(runObject: RunObject, source: String) {}

override suspend fun assistantCreateRunStep(runObject: RunStepObject) {}
override suspend fun assistantCreateRunStep(runObject: RunStepObject, source: String) {}

override suspend fun assistantCreatedMessage(
runId: String,
block: suspend Metric.() -> List<MessageObject>
): List<MessageObject> = block()

override suspend fun assistantCreateRunStep(
runId: String,
block: suspend Metric.() -> RunStepObject
): RunStepObject = block()

override suspend fun assistantToolOutputsRun(
runId: String,
block: suspend Metric.() -> RunObject
): RunObject = block()
messageObject: MessageObject,
source: String
) {}

override suspend fun event(message: String) {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import com.xebia.functional.xef.llm.assistants.Assistant
import com.xebia.functional.xef.llm.assistants.AssistantThread
import com.xebia.functional.xef.llm.assistants.RunDelta
import com.xebia.functional.xef.llm.assistants.Tool
import com.xebia.functional.xef.metrics.Metric
import kotlinx.serialization.Serializable

@Serializable data class SumInput(val left: Int, val right: Int)
Expand Down Expand Up @@ -42,8 +41,8 @@ suspend fun main() {
// - # cd server/docker/opentelemetry
// - # docker-compose up

val metric = Metric.EMPTY
// val metric = com.xebia.functional.xef.opentelemetry.OpenTelemetryMetric()
// val metric = Metric.EMPTY
val metric = com.xebia.functional.xef.opentelemetry.OpenTelemetryMetric()

val assistant =
Assistant(
Expand Down
Loading

0 comments on commit a2f938c

Please sign in to comment.