diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 3cabbb4f4..9a9d73afe 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -1,7 +1,7 @@ [versions] arrow = "1.2.1" arrowGradle = "0.12.0-rc.5" -exposed = "0.43.0" +exposed = "0.44.0" kotlin = "1.9.10" kotlinx-json = "1.6.0" kotlinx-datetime = "0.4.1" @@ -10,7 +10,7 @@ spotless = "6.21.0" okio = "3.5.0" kotest = "5.7.2" kotest-testcontainers = "2.0.2" -kotest-arrow = "1.3.3" +kotest-arrow = "1.4.0" klogging = "5.1.0" uuid = "0.0.21" postgresql = "42.6.0" @@ -33,7 +33,7 @@ junit = "5.10.0" pdfbox = "3.0.0" mysql = "8.0.33" semverGradle = "0.5.0-rc.5" -scala = "3.3.0" +scala = "3.3.1" openai-client-version = "3.4.1" gpt4all-java = "1.1.5" ai-djl = "0.23.0" @@ -42,7 +42,7 @@ jsonschema = "4.31.1" jakarta = "3.0.2" suspend-transform = "0.5.1" suspendApp = "0.4.0" -flyway = "9.22.1" +flyway = "9.22.2" resources-kmp = "0.4.0" detekt = "1.23.1" @@ -83,6 +83,7 @@ ktor-server-contentNegotiation = { module = "io.ktor:ktor-server-content-negotia ktor-server-resources = { module = "io.ktor:ktor-server-resources", version.ref = "ktor" } ktor-server-cors = { module = "io.ktor:ktor-server-cors", version.ref = "ktor" } ktor-server-request-validation = { module = "io.ktor:ktor-server-request-validation", version.ref = "ktor" } +ktor-server-double-receive = { module = "io.ktor:ktor-server-double-receive", version.ref = "ktor" } ktor-server-status-pages = { module = "io.ktor:ktor-server-status-pages", version.ref = "ktor" } okio = { module = "com.squareup.okio:okio", version.ref = "okio" } okio-fakefilesystem = { module = "com.squareup.okio:okio-fakefilesystem", version.ref = "okio" } diff --git a/server/build.gradle.kts b/server/build.gradle.kts index 933be9d1c..add79858a 100644 --- a/server/build.gradle.kts +++ b/server/build.gradle.kts @@ -48,6 +48,7 @@ dependencies { implementation(libs.ktor.server.cors) implementation(libs.ktor.server.request.validation) implementation(libs.ktor.server.status.pages) + implementation(libs.ktor.server.double.receive) implementation(libs.logback) implementation(libs.openai.client) implementation(libs.suspendApp.core) @@ -57,6 +58,8 @@ dependencies { implementation(projects.xefCore) implementation(projects.xefLucene) implementation(projects.xefPostgresql) + implementation("io.ktor:ktor-server-core-jvm:2.3.4") + implementation("io.ktor:ktor-server-double-receive-jvm:2.3.4") testImplementation(libs.junit.jupiter.api) testImplementation(libs.junit.jupiter.engine) diff --git a/server/src/main/kotlin/com/xebia/functional/xef/server/Server.kt b/server/src/main/kotlin/com/xebia/functional/xef/server/Server.kt index 98e1238fc..0d2881cca 100644 --- a/server/src/main/kotlin/com/xebia/functional/xef/server/Server.kt +++ b/server/src/main/kotlin/com/xebia/functional/xef/server/Server.kt @@ -9,14 +9,9 @@ import com.xebia.functional.xef.server.db.psql.XefDatabaseConfig import com.xebia.functional.xef.server.db.psql.XefVectorStoreConfig import com.xebia.functional.xef.server.db.psql.XefVectorStoreConfig.Companion.getVectorStoreService import com.xebia.functional.xef.server.exceptions.exceptionsHandler -import com.xebia.functional.xef.server.http.routes.genAIRoutes -import com.xebia.functional.xef.server.http.routes.organizationRoutes -import com.xebia.functional.xef.server.http.routes.projectsRoutes -import com.xebia.functional.xef.server.http.routes.userRoutes -import com.xebia.functional.xef.server.services.OrganizationRepositoryService -import com.xebia.functional.xef.server.services.ProjectRepositoryService +import com.xebia.functional.xef.server.http.routes.* +import com.xebia.functional.xef.server.http.routes.providers.oaiRoutes import com.xebia.functional.xef.server.services.RepositoryService -import com.xebia.functional.xef.server.services.UserRepositoryService import io.ktor.client.* import io.ktor.client.engine.cio.* import io.ktor.client.plugins.auth.* @@ -28,6 +23,7 @@ import io.ktor.server.auth.* import io.ktor.server.netty.* import io.ktor.server.plugins.contentnegotiation.* import io.ktor.server.plugins.cors.routing.* +import io.ktor.server.plugins.doublereceive.* import io.ktor.server.resources.* import io.ktor.server.routing.* import kotlinx.coroutines.awaitCancellation @@ -75,6 +71,7 @@ object Server { anyHost() } install(ContentNegotiation) { json() } + install(DoubleReceive) install(Resources) install(Authentication) { bearer("auth-bearer") { @@ -85,10 +82,8 @@ object Server { } exceptionsHandler() routing { - genAIRoutes(ktorClient, vectorStoreService) - userRoutes(UserRepositoryService(logger)) - organizationRoutes(OrganizationRepositoryService(logger)) - projectsRoutes(ProjectRepositoryService(logger)) + aiRoutes(ktorClient) + oaiRoutes(ktorClient) } } awaitCancellation() diff --git a/server/src/main/kotlin/com/xebia/functional/xef/server/db/tables/XefTokensTable.kt b/server/src/main/kotlin/com/xebia/functional/xef/server/db/tables/XefTokensTable.kt index c877a2a83..7fa52fcbc 100644 --- a/server/src/main/kotlin/com/xebia/functional/xef/server/db/tables/XefTokensTable.kt +++ b/server/src/main/kotlin/com/xebia/functional/xef/server/db/tables/XefTokensTable.kt @@ -1,29 +1,19 @@ package com.xebia.functional.xef.server.db.tables import com.xebia.functional.xef.server.models.ProvidersConfig -import kotlinx.datetime.Instant -import kotlinx.serialization.SerialName import kotlinx.serialization.json.Json +import org.jetbrains.exposed.dao.IntEntity +import org.jetbrains.exposed.dao.IntEntityClass +import org.jetbrains.exposed.dao.id.EntityID +import org.jetbrains.exposed.dao.id.IntIdTable import org.jetbrains.exposed.sql.ReferenceOption -import org.jetbrains.exposed.sql.ResultRow -import org.jetbrains.exposed.sql.Table import org.jetbrains.exposed.sql.json.jsonb import org.jetbrains.exposed.sql.kotlin.datetime.CurrentTimestamp import org.jetbrains.exposed.sql.kotlin.datetime.timestamp val format = Json { prettyPrint = true } -data class XefTokens( - @SerialName("user_id") val userId: Int, - @SerialName("project_id") val projectId: Int, - @SerialName("name") val name: String, - @SerialName("created_at") val createdAt: Instant, - @SerialName("updated_at") val updatedAt: Instant, - @SerialName("token") val token: String, - @SerialName("providers_config") val providersConfig: ProvidersConfig -) - -object XefTokensTable : Table("xef_tokens") { +object XefTokensTable : IntIdTable("xef_tokens") { val userId = reference( name = "user_id", foreign = UsersTable, @@ -40,17 +30,16 @@ object XefTokensTable : Table("xef_tokens") { val token = varchar("token", 128).uniqueIndex() val providersConfig = jsonb("providers_config", format) - override val primaryKey = PrimaryKey(userId, projectId, name) } -fun ResultRow.toXefTokens(): XefTokens { - return XefTokens( - userId = this[XefTokensTable.userId].value, - projectId = this[XefTokensTable.projectId].value, - name = this[XefTokensTable.name], - createdAt = this[XefTokensTable.createdAt], - updatedAt = this[XefTokensTable.updatedAt], - token = this[XefTokensTable.token], - providersConfig = this[XefTokensTable.providersConfig] - ) +class XefTokens(id: EntityID) : IntEntity(id) { + companion object : IntEntityClass(XefTokensTable) + + var userId by XefTokensTable.userId + var projectId by XefTokensTable.projectId + var name by XefTokensTable.name + var createdAt by XefTokensTable.createdAt + var updatedAt by XefTokensTable.updatedAt + var token by XefTokensTable.token + var providersConfig by XefTokensTable.providersConfig } diff --git a/server/src/main/kotlin/com/xebia/functional/xef/server/http/routes/XefRoutes.kt b/server/src/main/kotlin/com/xebia/functional/xef/server/http/routes/AIRoutes.kt similarity index 55% rename from server/src/main/kotlin/com/xebia/functional/xef/server/http/routes/XefRoutes.kt rename to server/src/main/kotlin/com/xebia/functional/xef/server/http/routes/AIRoutes.kt index 2cbc60857..c0afe9189 100644 --- a/server/src/main/kotlin/com/xebia/functional/xef/server/http/routes/XefRoutes.kt +++ b/server/src/main/kotlin/com/xebia/functional/xef/server/http/routes/AIRoutes.kt @@ -1,20 +1,16 @@ package com.xebia.functional.xef.server.http.routes import com.aallam.openai.api.BetaOpenAI +import com.xebia.functional.xef.server.http.routes.providers.forwardToProvider +import com.xebia.functional.xef.server.http.routes.providers.makeRequest +import com.xebia.functional.xef.server.http.routes.providers.makeStreaming import com.xebia.functional.xef.server.models.Token import com.xebia.functional.xef.server.models.exceptions.XefExceptions -import com.xebia.functional.xef.server.services.VectorStoreService import io.ktor.client.* -import io.ktor.client.call.* -import io.ktor.client.request.* -import io.ktor.client.statement.* -import io.ktor.http.* import io.ktor.server.application.* import io.ktor.server.auth.* import io.ktor.server.request.* -import io.ktor.server.response.* import io.ktor.server.routing.* -import io.ktor.utils.io.jvm.javaio.* import kotlinx.serialization.json.Json import kotlinx.serialization.json.JsonObject import kotlinx.serialization.json.boolean @@ -32,9 +28,8 @@ fun String.toProvider(): Provider? = when (this) { } @OptIn(BetaOpenAI::class) -fun Routing.genAIRoutes( - client: HttpClient, - vectorStoreService: VectorStoreService +fun Routing.aiRoutes( + client: HttpClient ) { val openAiUrl = "https://api.openai.com/v1" @@ -51,64 +46,20 @@ fun Routing.genAIRoutes( } else { client.makeStreaming(call, "$openAiUrl/chat/completions", body, token) } + + //forwardToProvider(client, stream = isStream) //TODO } post("/embeddings") { val token = call.getToken() val context = call.receive() client.makeRequest(call, "$openAiUrl/embeddings", context, token) - } - } -} -private suspend fun HttpClient.makeRequest( - call: ApplicationCall, - url: String, - body: String, - token: Token -) { - val response = this.request(url) { - headers { - bearerAuth(token.value) + //forwardToProvider(client, stream = isStream) //TODO } - contentType(ContentType.Application.Json) - method = HttpMethod.Post - setBody(body) } - call.response.headers.copyFrom(response.headers) - call.respond(response.status, response.body()) } -private suspend fun HttpClient.makeStreaming( - call: ApplicationCall, - url: String, - body: String, - token: Token -) { - this.preparePost(url) { - headers { - bearerAuth(token.value) - } - contentType(ContentType.Application.Json) - method = HttpMethod.Post - setBody(body) - }.execute { httpResponse -> - call.response.headers.copyFrom(httpResponse.headers) - call.respondOutputStream { - httpResponse - .bodyAsChannel() - .copyTo(this@respondOutputStream) - } - } -} - -private fun ResponseHeaders.copyFrom(headers: Headers) = headers - .entries() - .filter { (key, _) -> !HttpHeaders.isUnsafe(key) } // setting unsafe headers results in exception - .forEach { (key, values) -> - values.forEach { value -> this.appendIfAbsent(key, value) } - } - private fun ApplicationCall.getProvider(): Provider = request.headers["xef-provider"]?.toProvider() ?: Provider.OPENAI diff --git a/server/src/main/kotlin/com/xebia/functional/xef/server/http/routes/providers/RequestForwardingUtil.kt b/server/src/main/kotlin/com/xebia/functional/xef/server/http/routes/providers/RequestForwardingUtil.kt new file mode 100644 index 000000000..b4a1d9c4b --- /dev/null +++ b/server/src/main/kotlin/com/xebia/functional/xef/server/http/routes/providers/RequestForwardingUtil.kt @@ -0,0 +1,138 @@ +package com.xebia.functional.xef.server.http.routes.providers + +import com.xebia.functional.xef.server.models.Token +import io.ktor.client.* +import io.ktor.client.call.* +import io.ktor.client.request.* +import io.ktor.client.statement.* +import io.ktor.http.* +import io.ktor.http.content.* +import io.ktor.server.application.* +import io.ktor.server.request.* +import io.ktor.server.response.* +import io.ktor.util.* +import io.ktor.http.URLBuilder +import io.ktor.util.pipeline.* +import io.ktor.utils.io.jvm.javaio.* + +private const val OAI_URL = "https://api.openai.com" + +/** + * Retrieves the path from the incoming request and + * combines it with the provider specific host url. + * As we are copying the API from the providers, + * everything including the path structure has follow the API contract. + */ +private fun buildProviderUrlFromRequest(request: ApplicationRequest) = + URLBuilder().takeFrom(OAI_URL).apply { + encodedPath = request.path() + }.build() + +@Deprecated("") +internal suspend fun HttpClient.makeRequest( + call: ApplicationCall, + url: String, + body: String, + token: Token +) { + val response = this.request(url) { + headers { + bearerAuth(token.value) + } + contentType(ContentType.Application.Json) + method = HttpMethod.Post + setBody(body) + } + call.response.headers.copyFrom(response.headers) + call.respond(response.status, response.body()) +} + +@Deprecated("") +internal suspend fun HttpClient.makeStreaming( + call: ApplicationCall, + url: String, + body: String, + token: Token +) { + this.preparePost(url) { + headers { + bearerAuth(token.value) + } + contentType(ContentType.Application.Json) + method = HttpMethod.Post + setBody(body) + }.execute { httpResponse -> + call.response.headers.copyFrom(httpResponse.headers) + call.respondOutputStream { + httpResponse + .bodyAsChannel() + .copyTo(this@respondOutputStream) + } + } +} + +/** + * Makes a request to the provider forwarding headers and request body + * from the incoming request. + * The provider's response is then forwarded as a response of the server. + * + * Takes in [body] as Bytes and responds in Bytes. + * No messing around with char sets. Just forwarding raw bytes. + * + * @return provider's response + */ +internal suspend fun PipelineContext.forwardToProvider( + client: HttpClient, + stream: Boolean = false, +): HttpResponse = if (stream) { + client + .prepareRequest { prepareRequest(call) } + .execute { res -> call.forwardResponse(res, stream = stream); res } +} else { + val response = client.request { prepareRequest(call) } + call.forwardResponse(response, stream = stream) + response +} + +private suspend fun HttpRequestBuilder.prepareRequest( + call: ApplicationCall, +) { + url(buildProviderUrlFromRequest(call.request)) + method = call.request.httpMethod + headers.copyFrom(call.request.headers) // copy headers + url.parameters.appendAll(call.request.queryParameters) // copy parameters + + val body = call + .receiveChannel() + .toByteArray() + .let { ByteArrayContent(it, contentType = null) } + setBody(body) +} + +private suspend fun ApplicationCall.forwardResponse( + providerResponse: HttpResponse, + stream: Boolean, +) { + response.headers.copyFrom(providerResponse.headers) + + if(stream) { + respondOutputStream { + providerResponse + .bodyAsChannel() + .copyTo(this@respondOutputStream) + } + } else { + respond(providerResponse.status, providerResponse.readBytes()) + } +} + +private fun ResponseHeaders.copyFrom(headers: Headers) = headers + .entries() + .filter { (key, _) -> !HttpHeaders.isUnsafe(key) } // setting unsafe headers results in exception + .forEach { (key, values) -> + values.forEach { value -> this.appendIfAbsent(key, value) } + } + +private fun HeadersBuilder.copyFrom(headers: Headers) = headers + .filter { key, value -> !key.equals("HOST", ignoreCase = true) } + .forEach { key, values -> appendAll(key, values) } diff --git a/server/src/main/kotlin/com/xebia/functional/xef/server/http/routes/providers/Routes.kt b/server/src/main/kotlin/com/xebia/functional/xef/server/http/routes/providers/Routes.kt new file mode 100644 index 000000000..6fbf837bc --- /dev/null +++ b/server/src/main/kotlin/com/xebia/functional/xef/server/http/routes/providers/Routes.kt @@ -0,0 +1,16 @@ +package com.xebia.functional.xef.server.http.routes.providers + +import com.xebia.functional.xef.server.http.routes.providers.openai.oaiFiles +import com.xebia.functional.xef.server.http.routes.providers.openai.oaiFineTuning +import io.ktor.client.* +import io.ktor.server.auth.* +import io.ktor.server.routing.* + +fun Routing.oaiRoutes( + client: HttpClient, +) { + authenticate("auth-bearer") { + oaiFineTuning(client) + oaiFiles(client) + } +} diff --git a/server/src/main/kotlin/com/xebia/functional/xef/server/http/routes/providers/openai/FilesRoutes.kt b/server/src/main/kotlin/com/xebia/functional/xef/server/http/routes/providers/openai/FilesRoutes.kt new file mode 100644 index 000000000..153edd077 --- /dev/null +++ b/server/src/main/kotlin/com/xebia/functional/xef/server/http/routes/providers/openai/FilesRoutes.kt @@ -0,0 +1,47 @@ +package com.xebia.functional.xef.server.http.routes.providers.openai + +import com.xebia.functional.xef.server.http.routes.providers.forwardToProvider +import io.ktor.client.* +import io.ktor.http.content.* +import io.ktor.server.application.* +import io.ktor.server.request.* +import io.ktor.server.routing.* +import io.ktor.server.util.* + +/** + * https://platform.openai.com/docs/api-reference/files + */ +fun Route.oaiFiles( + client: HttpClient, +) = route("v1/files") { + get { + val response = forwardToProvider(client) + } + + post { + val multipartData = call.receiveMultipart() + val parts = multipartData.readAllParts() + val file = parts.filterIsInstance().find { it.name == "file" } + val purpose = parts.filterIsInstance().find { it.name == "purpose" } + + val response = forwardToProvider(client) + } + + get("{id}") { + val id = call.parameters.getOrFail("id") + + val response = forwardToProvider(client) + } + + delete("{id}") { + val id = call.parameters.getOrFail("id") + + val response = forwardToProvider(client) + } + + get("{id}/content") { + val id = call.parameters.getOrFail("id") + + val response = forwardToProvider(client) + } +} diff --git a/server/src/main/kotlin/com/xebia/functional/xef/server/http/routes/providers/openai/FineTuningRoutes.kt b/server/src/main/kotlin/com/xebia/functional/xef/server/http/routes/providers/openai/FineTuningRoutes.kt new file mode 100644 index 000000000..be5dfdac1 --- /dev/null +++ b/server/src/main/kotlin/com/xebia/functional/xef/server/http/routes/providers/openai/FineTuningRoutes.kt @@ -0,0 +1,46 @@ +package com.xebia.functional.xef.server.http.routes.providers.openai + +import com.xebia.functional.xef.server.http.routes.providers.forwardToProvider +import io.ktor.client.* +import io.ktor.server.application.* +import io.ktor.server.request.* +import io.ktor.server.routing.* +import io.ktor.server.util.* +import kotlinx.serialization.json.JsonObject + +/** + * https://platform.openai.com/docs/api-reference/fine-tuning + */ +fun Route.oaiFineTuning( + client: HttpClient, +) = route("v1/fine_tuning") { + get("jobs") { + // has query parameters btw + val response = forwardToProvider(client) + } + + post("jobs") { + val bodyJson = call.receive() + + val response = forwardToProvider(client) + } + + get("jobs/{id}") { + val id = call.parameters.getOrFail("id") + + val response = forwardToProvider(client) + } + + post("jobs/{id}/cancel") { + val id = call.parameters.getOrFail("id") + + val response = forwardToProvider(client) + } + + get("jobs/{id}/events") { + // has query parameters btw + val id = call.parameters.getOrFail("id") + + val response = forwardToProvider(client) + } +} diff --git a/server/src/main/resources/db/migrations/psql/V1__Initial.sql b/server/src/main/resources/db/migrations/psql/V1__Initial.sql index 2b0bc2b78..5d632771d 100644 --- a/server/src/main/resources/db/migrations/psql/V1__Initial.sql +++ b/server/src/main/resources/db/migrations/psql/V1__Initial.sql @@ -53,16 +53,15 @@ CREATE TABLE IF NOT EXISTS users_org( ); CREATE TABLE IF NOT EXISTS xef_tokens( - user_id INT, - project_id INT, + id SERIAL PRIMARY KEY, + user_id INT NOT NULL, + project_id INT NOT NULL, name VARCHAR(20) NOT NULL, created_at TIMESTAMP NOT NULL DEFAULT NOW(), updated_at TIMESTAMP NOT NULL DEFAULT NOW(), token VARCHAR(128) UNIQUE, providers_config JSONB, - PRIMARY KEY (user_id, project_id, name), - CONSTRAINT fk_user_id FOREIGN KEY (user_id) REFERENCES users(id) MATCH SIMPLE diff --git a/server/src/test/kotlin/com/xebia/functional/xef/server/postgresql/XefDatabaseTest.kt b/server/src/test/kotlin/com/xebia/functional/xef/server/postgresql/XefDatabaseTest.kt index 42d929cd7..c15354048 100644 --- a/server/src/test/kotlin/com/xebia/functional/xef/server/postgresql/XefDatabaseTest.kt +++ b/server/src/test/kotlin/com/xebia/functional/xef/server/postgresql/XefDatabaseTest.kt @@ -139,17 +139,17 @@ class XefDatabaseTest { ), gcp = null ) - XefTokensTable.insert { - it[userId] = user.id.value - it[projectId] = project.id.value - it[name] = "testEnv" - it[token] = "testToken" - it[providersConfig] = config + XefTokens.new { + userId = user.id + projectId = project.id + name = "testEnv" + token = "testToken" + providersConfig = config } } transaction { - val tokens = XefTokensTable.selectAll().map { it.toXefTokens() } - assertEquals("testToken", tokens[0].token) + val token = XefTokens.all().first() + assertEquals("testToken", token.token) } }