Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OpenTelemetry improved #756

Merged
merged 4 commits into from
Jun 7, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.xebia.functional.xef.llm

import com.xebia.functional.openai.generated.model.CreateChatCompletionResponse
import com.xebia.functional.openai.generated.model.MessageObject
import com.xebia.functional.openai.generated.model.RunObject
import com.xebia.functional.openai.generated.model.RunStepObject
import com.xebia.functional.xef.conversation.Conversation
Expand Down Expand Up @@ -60,33 +61,42 @@ suspend fun Prompt.addMetrics(conversation: Conversation) {
conversation.metric.parameter("openai.chat_completion.functions", functions.map { it.name })
}

suspend fun RunObject.addMetrics(metric: Metric): RunObject {
metric.assistantCreateRun(this)
suspend fun RunObject.addMetrics(metric: Metric, source: String): RunObject {
metric.assistantCreateRun(this, source)
return this
}

suspend fun RunStepObject.addMetrics(metric: Metric): RunStepObject {
metric.assistantCreateRunStep(this)
suspend fun RunStepObject.addMetrics(metric: Metric, source: String): RunStepObject {
metric.assistantCreateRunStep(this, source)
return this
}

suspend fun MessageObject.addMetrics(metric: Metric, source: String): MessageObject {
metric.assistantCreatedMessage(this, source)
return this
}

suspend fun RunDelta.addMetrics(metric: Metric): RunDelta {
when (this) {
is RunDelta.RunCancelled -> run.addMetrics(metric)
is RunDelta.RunCancelling -> run.addMetrics(metric)
is RunDelta.RunCompleted -> run.addMetrics(metric)
is RunDelta.RunCreated -> run.addMetrics(metric)
is RunDelta.RunExpired -> run.addMetrics(metric)
is RunDelta.RunFailed -> run.addMetrics(metric)
is RunDelta.RunInProgress -> run.addMetrics(metric)
is RunDelta.RunQueued -> run.addMetrics(metric)
is RunDelta.RunRequiresAction -> run.addMetrics(metric)
is RunDelta.RunStepCancelled -> runStep.addMetrics(metric)
is RunDelta.RunStepCompleted -> runStep.addMetrics(metric)
is RunDelta.RunStepCreated -> runStep.addMetrics(metric)
is RunDelta.RunStepExpired -> runStep.addMetrics(metric)
is RunDelta.RunStepFailed -> runStep.addMetrics(metric)
is RunDelta.RunStepInProgress -> runStep.addMetrics(metric)
is RunDelta.RunCancelled -> run.addMetrics(metric, "RunCancelled")
is RunDelta.RunCancelling -> run.addMetrics(metric, "RunCancelling")
is RunDelta.RunCompleted -> run.addMetrics(metric, "RunCompleted")
is RunDelta.RunCreated -> run.addMetrics(metric, "RunCreated")
is RunDelta.RunExpired -> run.addMetrics(metric, "RunExpired")
is RunDelta.RunFailed -> run.addMetrics(metric, "RunFailed")
is RunDelta.RunInProgress -> run.addMetrics(metric, "RunInProgress")
is RunDelta.RunQueued -> run.addMetrics(metric, "RunQueued")
is RunDelta.RunRequiresAction -> run.addMetrics(metric, "RunRequiresAction")
is RunDelta.RunStepCancelled -> runStep.addMetrics(metric, "RunStepCancelled")
is RunDelta.RunStepCompleted -> runStep.addMetrics(metric, "RunStepCompleted")
is RunDelta.RunStepCreated -> runStep.addMetrics(metric, "RunStepCreated")
is RunDelta.RunStepExpired -> runStep.addMetrics(metric, "RunStepExpired")
is RunDelta.RunStepFailed -> runStep.addMetrics(metric, "RunStepFailed")
is RunDelta.RunStepInProgress -> runStep.addMetrics(metric, "RunStepInProgress")
is RunDelta.MessageCreated -> message.addMetrics(metric, "MessageCreated")
is RunDelta.MessageIncomplete -> message.addMetrics(metric, "MessageIncomplete")
is RunDelta.MessageCompleted -> message.addMetrics(metric, "MessageCompleted")
is RunDelta.MessageInProgress -> message.addMetrics(metric, "MessageInProgress")
else -> {} // ignore other cases
javipacheco marked this conversation as resolved.
Show resolved Hide resolved
}
return this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class AssistantThread(
createRun(CreateRunRequest(assistantId = assistant.assistantId))

suspend fun createRun(request: CreateRunRequest): RunObject =
api.createRun(threadId, request, configure = ::defaultConfig).addMetrics(metric)
api.createRun(threadId, request, configure = ::defaultConfig).addMetrics(metric, "RunCreated")

fun createRunStream(assistant: Assistant, request: CreateRunRequest): Flow<RunDelta> = flow {
api
Expand Down Expand Up @@ -122,38 +122,69 @@ class AssistantThread(
)
}
)
val run =
metric.assistantToolOutputsRun(event.run.id) {
api
.submitToolOuputsToRunStream(
threadId = threadId,
runId = event.run.id,
submitToolOutputsRunRequest = toolOutputsRequest,
configure = ::defaultConfig
)
.collect {
val delta = RunDelta.fromServerSentEvent(it)
if (delta is RunDelta.RunStepCompleted) {
flowCollector.emit(RunDelta.RunSubmitToolOutputs(toolOutputsRequest))
}
flowCollector.emit(delta)
}
val run = getRun(event.run.id)
val finalEvent =
when (run.status) {
RunObject.Status.queued -> RunDelta.RunQueued(run)
RunObject.Status.in_progress -> RunDelta.RunInProgress(run)
RunObject.Status.requires_action -> RunDelta.RunRequiresAction(run)
RunObject.Status.cancelling -> RunDelta.RunCancelling(run)
RunObject.Status.cancelled -> RunDelta.RunCancelled(run)
RunObject.Status.failed -> RunDelta.RunFailed(run)
RunObject.Status.completed -> RunDelta.RunCompleted(run)
RunObject.Status.expired -> RunDelta.RunExpired(run)
RunObject.Status.incomplete -> RunDelta.RunIncomplete(run)
}
flowCollector.emit(finalEvent)
run

api
.submitToolOuputsToRunStream(
threadId = threadId,
runId = event.run.id,
submitToolOutputsRunRequest = toolOutputsRequest,
configure = ::defaultConfig
)
.collect {
val delta = RunDelta.fromServerSentEvent(it)

when (delta) {
is RunDelta.RunCreated -> Pair(delta.run, "RunCreated")
is RunDelta.RunQueued -> Pair(delta.run, "RunQueued")
is RunDelta.RunFailed -> Pair(delta.run, "RunFailed")
is RunDelta.RunCancelled -> Pair(delta.run, "RunCancelled")
is RunDelta.RunCancelling -> Pair(delta.run, "RunCancelling")
is RunDelta.RunExpired -> Pair(delta.run, "RunExpired")
is RunDelta.RunInProgress -> Pair(delta.run, "RunInProgress")
is RunDelta.RunIncomplete -> Pair(delta.run, "RunIncomplete")
else -> null
}?.let { metric.assistantCreateRun(it.first, it.second) }

when (delta) {
is RunDelta.RunStepCreated -> Pair(delta.runStep, "RunStepCreated")
is RunDelta.RunStepInProgress -> Pair(delta.runStep, "RunStepInProgress")
is RunDelta.RunStepCompleted -> Pair(delta.runStep, "RunStepCompleted")
is RunDelta.RunStepFailed -> Pair(delta.runStep, "RunStepFailed")
is RunDelta.RunStepCancelled -> Pair(delta.runStep, "RunStepCancelled")
is RunDelta.RunStepExpired -> Pair(delta.runStep, "RunStepExpired")
else -> null
}?.let { metric.assistantCreateRunStep(it.first, it.second) }

when (delta) {
is RunDelta.MessageCreated -> Pair(delta.message, "MessageCreated")
is RunDelta.MessageInProgress -> Pair(delta.message, "MessageInProgress")
is RunDelta.MessageIncomplete -> Pair(delta.message, "MessageIncomplete")
is RunDelta.MessageCompleted -> Pair(delta.message, "MessageCompleted")
else -> null
}?.let { metric.assistantCreatedMessage(it.first, it.second) }

if (delta is RunDelta.RunStepCompleted) {
flowCollector.emit(RunDelta.RunSubmitToolOutputs(toolOutputsRequest))
}
flowCollector.emit(delta)
}

val run = getRun(event.run.id)
val finalEvent =
when (run.status) {
RunObject.Status.queued -> Pair(RunDelta.RunQueued(run), "RunQueued")
RunObject.Status.in_progress -> Pair(RunDelta.RunInProgress(run), "RunInProgress")
RunObject.Status.requires_action ->
Pair(RunDelta.RunRequiresAction(run), "RunRequiresAction")
RunObject.Status.cancelling -> Pair(RunDelta.RunCancelling(run), "RunCancelling")
RunObject.Status.cancelled -> Pair(RunDelta.RunCancelled(run), "RunCancelled")
RunObject.Status.failed -> Pair(RunDelta.RunFailed(run), "RunFailed")
RunObject.Status.completed -> Pair(RunDelta.RunCompleted(run), "RunCompleted")
RunObject.Status.expired -> Pair(RunDelta.RunExpired(run), "RunExpired")
RunObject.Status.incomplete -> Pair(RunDelta.RunIncomplete(run), "RunIncomplete")
}
flowCollector.emit(finalEvent.first)
metric.assistantCreateRun(run, finalEvent.second)

if (run.status == RunObject.Status.requires_action) {
takeRequiredAction(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class LogsMetric(private val level: Level = Level.INFO) : Metric {
return output
}

override suspend fun assistantCreateRun(runObject: RunObject) {
override suspend fun assistantCreateRun(runObject: RunObject, source: String) {
logger.at(level) {
this.message = "${writeIndent(numberOfBlocks.get())}|-- AssistantId: ${runObject.assistantId}"
}
Expand All @@ -58,16 +58,7 @@ class LogsMetric(private val level: Level = Level.INFO) : Metric {
}
}

override suspend fun assistantCreateRun(
runId: String,
block: suspend Metric.() -> RunObject
): RunObject {
val output = block()
assistantCreateRun(output)
return output
}

override suspend fun assistantCreateRunStep(runObject: RunStepObject) {
override suspend fun assistantCreateRunStep(runObject: RunStepObject, source: String) {
logger.at(level) {
this.message = "${writeIndent(numberOfBlocks.get())}|-- AssistantId: ${runObject.assistantId}"
}
Expand All @@ -82,44 +73,23 @@ class LogsMetric(private val level: Level = Level.INFO) : Metric {
}
}

override suspend fun assistantCreatedMessage(
runId: String,
block: suspend Metric.() -> List<MessageObject>
): List<MessageObject> {
val output = block()
logger.at(level) {
this.message = "${writeIndent(numberOfBlocks.get())}|-- Size: ${output.size}"
}
return output
}

override suspend fun assistantCreateRunStep(
runId: String,
block: suspend Metric.() -> RunStepObject
): RunStepObject {
val output = block()
override suspend fun assistantCreatedMessage(messageObject: MessageObject, source: String) {
logger.at(level) {
this.message = "${writeIndent(numberOfBlocks.get())}|-- AssistantId: ${output.assistantId}"
this.message =
"${writeIndent(numberOfBlocks.get())}|-- AssistantId: ${messageObject.assistantId}"
}
logger.at(level) {
this.message = "${writeIndent(numberOfBlocks.get())}|-- ThreadId: ${output.threadId}"
this.message = "${writeIndent(numberOfBlocks.get())}|-- ThreadId: ${messageObject.threadId}"
}
logger.at(level) {
this.message = "${writeIndent(numberOfBlocks.get())}|-- RunId: ${output.runId}"
this.message = "${writeIndent(numberOfBlocks.get())}|-- RunId: ${messageObject.id}"
}
logger.at(level) {
this.message = "${writeIndent(numberOfBlocks.get())}|-- Status: ${output.status.name}"
if (messageObject.status != null) {
this.message =
"${writeIndent(numberOfBlocks.get())}|-- Status: ${messageObject.status!!.name}"
}
}
return output
}

override suspend fun assistantToolOutputsRun(
runId: String,
block: suspend Metric.() -> RunObject
): RunObject {
val output = block()
assistantCreateRun(output)
return output
}

override suspend fun event(message: String) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,11 @@ interface Metric {

suspend fun parameter(key: String, values: List<String>)

suspend fun assistantCreateRun(runObject: RunObject)
suspend fun assistantCreateRun(runObject: RunObject, source: String)

suspend fun assistantCreateRun(runId: String, block: suspend Metric.() -> RunObject): RunObject
suspend fun assistantCreateRunStep(runObject: RunStepObject, source: String)

suspend fun assistantCreateRunStep(runObject: RunStepObject)

suspend fun assistantCreatedMessage(
runId: String,
block: suspend Metric.() -> List<MessageObject>
): List<MessageObject>

suspend fun assistantCreateRunStep(
runId: String,
block: suspend Metric.() -> RunStepObject
): RunStepObject

suspend fun assistantToolOutputsRun(
runId: String,
block: suspend Metric.() -> RunObject
): RunObject
suspend fun assistantCreatedMessage(messageObject: MessageObject, source: String)

companion object {
val EMPTY: Metric =
Expand All @@ -46,29 +31,14 @@ interface Metric {
override suspend fun <A> promptSpan(prompt: Prompt, block: suspend Metric.() -> A): A =
block()

override suspend fun assistantCreateRun(runObject: RunObject) {}

override suspend fun assistantCreateRun(
runId: String,
block: suspend Metric.() -> RunObject
): RunObject = block()
override suspend fun assistantCreateRun(runObject: RunObject, source: String) {}

override suspend fun assistantCreateRunStep(runObject: RunStepObject) {}
override suspend fun assistantCreateRunStep(runObject: RunStepObject, source: String) {}

override suspend fun assistantCreatedMessage(
runId: String,
block: suspend Metric.() -> List<MessageObject>
): List<MessageObject> = block()

override suspend fun assistantCreateRunStep(
runId: String,
block: suspend Metric.() -> RunStepObject
): RunStepObject = block()

override suspend fun assistantToolOutputsRun(
runId: String,
block: suspend Metric.() -> RunObject
): RunObject = block()
messageObject: MessageObject,
source: String
) {}

override suspend fun event(message: String) {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import com.xebia.functional.xef.llm.assistants.Assistant
import com.xebia.functional.xef.llm.assistants.AssistantThread
import com.xebia.functional.xef.llm.assistants.RunDelta
import com.xebia.functional.xef.llm.assistants.Tool
import com.xebia.functional.xef.metrics.Metric
import kotlinx.serialization.Serializable

@Serializable data class SumInput(val left: Int, val right: Int)
Expand Down Expand Up @@ -42,8 +41,8 @@ suspend fun main() {
// - # cd server/docker/opentelemetry
// - # docker-compose up

val metric = Metric.EMPTY
// val metric = com.xebia.functional.xef.opentelemetry.OpenTelemetryMetric()
// val metric = Metric.EMPTY
val metric = com.xebia.functional.xef.opentelemetry.OpenTelemetryMetric()

val assistant =
Assistant(
Expand Down
Loading
Loading