Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

re-write fine tuning to also include openPipe #2091

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 1 addition & 10 deletions helicone-node/api/generatedTypes/public.ts
Original file line number Diff line number Diff line change
Expand Up @@ -293,15 +293,6 @@ Json: JsonObject;
status?: components["schemas"]["Partial_NumberOperators_"];
model?: components["schemas"]["Partial_TextOperators_"];
};
/** @description Make all properties in T optional */
Partial_VectorOperators_: {
contains?: string;
};
/** @description Make all properties in T optional */
Partial_RequestResponseSearchToOperators_: {
request_body_vector?: components["schemas"]["Partial_VectorOperators_"];
response_body_vector?: components["schemas"]["Partial_VectorOperators_"];
};
/** @description From T, pick a set of properties whose keys are in the union K */
"Pick_FilterLeaf.feedback-or-request-or-response-or-properties-or-values-or-request_response_search_": {
feedback?: components["schemas"]["Partial_FeedbackTableToOperators_"];
Expand All @@ -313,7 +304,7 @@ Json: JsonObject;
values?: {
[key: string]: components["schemas"]["Partial_TextOperators_"];
};
request_response_search?: components["schemas"]["Partial_RequestResponseSearchToOperators_"];
request_response_search: unknown;
};
"FilterLeafSubset_feedback-or-request-or-response-or-properties-or-values-or-request_response_search_": components["schemas"]["Pick_FilterLeaf.feedback-or-request-or-response-or-properties-or-values-or-request_response_search_"];
RequestFilterNode: components["schemas"]["FilterLeafSubset_feedback-or-request-or-response-or-properties-or-values-or-request_response_search_"] | components["schemas"]["RequestFilterBranch"] | "all";
Expand Down
2 changes: 1 addition & 1 deletion valhalla/jawn/src/controllers/private/datasetController.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import {
import { supabaseServer } from "../../lib/routers/withAuth";
import { FilterNode } from "../../lib/shared/filters/filterDefs";
import { getRequests } from "../../lib/stores/request/request";
import { FineTuningManager } from "../../managers/FineTuningManager";
import { FineTuningManager } from "../../managers/finetuning/FineTuningManager";
import { JawnAuthenticatedRequest } from "../../types/request";
import { postHogClient } from "../../lib/clients/postHogClient";

Expand Down
183 changes: 80 additions & 103 deletions valhalla/jawn/src/controllers/private/fineTuneController.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,76 @@ import {
} from "tsoa";
import { postHogClient } from "../../lib/clients/postHogClient";
import { supabaseServer } from "../../lib/routers/withAuth";
import { getRequests } from "../../lib/stores/request/request";
import { FineTuningManager } from "../../managers/FineTuningManager";
import { getRequests, fetchBodies } from "../../lib/stores/request/request";
import { FineTuningManager } from "../../managers/finetuning/FineTuningManager";
import { JawnAuthenticatedRequest } from "../../types/request";

// src/users/usersController.ts
import { hasAccessToFineTune } from "./datasetController";
import {
NewFineTuningManager,
IFineTuningJob,
IFineTuningJobEvent,
} from "../../managers/finetuning/NewFineTuningManager";
import { Result, err, ok } from "../../lib/shared/result";
import { DatasetManager } from "../../managers/dataset/DatasetManager";

export interface FineTuneBody {
providerKeyId: string;
datasetId?: string;
}

async function prepareFineTuneRequest(
request: FineTuneBody,
authParams: JawnAuthenticatedRequest["authParams"]
): Promise<
Result<{ fineTuneManager: NewFineTuningManager; datasetId: string }, string>
> {
const datasetManager = new DatasetManager(authParams);
let datasetId = request.datasetId;
if (!request.datasetId) {
const { data: requests, error: requestsError } = await getRequests(
authParams.organizationId,
"all",
0,
1000,
{}
);
if (!requests || requests.length === 0) {
return err("No requests found");
}
const newDataset = await datasetManager.addDataset({
requestIds: requests.map((r) => r.request_id),
datasetName: "Automated Dataset",
});
if (newDataset.error || !newDataset.data) {
return err(newDataset.error || "Failed to create dataset");
}
datasetId = newDataset.data;
}

const { data: key, error: keyError } = await supabaseServer.client
.from("decrypted_provider_keys")
.select("decrypted_provider_key")
.eq("id", request.providerKeyId)
.eq("org_id", authParams.organizationId)
.single();
if (keyError || !key || !key.decrypted_provider_key) {
return {
error: "No Provider Key found",
data: null,
};
}
return ok(new FineTuningManager(request.providerKeyId));
}

export interface NewFineTuneJob {
fineTuneJobId: string;
}

interface FineTuneJobStats {
job: IFineTuningJob;
events: IFineTuningJobEvent[];
}

@Route("/v1/fine-tune")
Expand All @@ -33,108 +94,32 @@ export class FineTuneMainController extends Controller {
@Body()
body: FineTuneBody,
@Request() request: JawnAuthenticatedRequest
): Promise<
| {
error: string;
}
| {
success: boolean;
data: {
fineTuneJob: string;
url: string;
};
}
> {
if (!(await hasAccessToFineTune(request.authParams.organizationId))) {
this.setStatus(403);
return {
error: "You do not have access to fine tune",
};
}
const filter = "all";
const { providerKeyId } = body;
const metrics = await getRequests(
request.authParams.organizationId,
filter,
0,
1000,
{}
);
if (metrics.error || !metrics.data || metrics.data.length === 0) {
this.setStatus(500);
return {
error: "No requests found",
};
}
const { data: key, error: keyError } = await supabaseServer.client
.from("decrypted_provider_keys")
.select("decrypted_provider_key")
.eq("id", providerKeyId)
.eq("org_id", request.authParams.organizationId)
.single();
if (keyError || !key || !key.decrypted_provider_key) {
): Promise<Result<NewFineTuneJob, string>> {
const { data: fineTuningObjects, error: requestBuildingError } =
await prepareFineTuneRequest(datasetManager, body, request.authParams);

if (requestBuildingError || !fineTuningObjects) {
this.setStatus(500);
return {
error: "No Provider Key found",
};
return err(requestBuildingError);
}
const fineTuningManager = new FineTuningManager(key.decrypted_provider_key);

const { fineTuneManager, datasetId } = fineTuningObjects;

try {
const fineTuneJob = await fineTuningManager.createFineTuneJob(
metrics.data,
"model",
"suffix"
);
if (fineTuneJob.error || !fineTuneJob.data) {
this.setStatus(500);
return {
error: "Failed to create fine tune job",
};
}
const url = `https://platform.openai.com/finetune/${fineTuneJob.data.id}?filter=all`;
Sentry.captureMessage(
`fine-tune job created - ${fineTuneJob.data.id} - ${request.authParams.organizationId}`
);
postHogClient?.capture({
distinctId: `${fineTuneJob.data.id}-${request.authParams.organizationId}`,
event: "fine_tune_job",
properties: {
id: fineTuneJob.data.id,
success: true,
org_id: request.authParams.organizationId,
},
const fineTuneJob = await fineTuneManager.createFineTuneJob(datasetId, {
_type: "OpenAIFineTuneJobOptions",
model: "gpt-3.5",
});
const dataset = await supabaseServer.client
.from("finetune_dataset")
.insert({
name: `Automated Dataset for ${fineTuneJob.data.id}`,
filters: JSON.stringify([]),
organization_id: request.authParams.organizationId,
})
.select("*")
.single();
if (dataset.error || !dataset.data) {

if (fineTuneJob.error || !fineTuneJob.data) {
this.setStatus(500);
return {
error: dataset.error.message,
};
return err("Failed to create fine tune job");
}
const fineTunedJobId = await supabaseServer.client
.from("finetune_job")
.insert({
dataset_id: dataset.data.id,
finetune_job_id: fineTuneJob.data.id,
provider_key_id: providerKeyId,
status: "created",
organization_id: request.authParams.organizationId,
})
.select("*")
.single();

return {
success: true,
data: {
fineTuneJob: fineTunedJobId.data?.id ?? "",
url: url,
},
};
} catch (e) {
Expand All @@ -159,15 +144,7 @@ export class FineTuneMainController extends Controller {
public async fineTuneJobStats(
@Path() jobId: string,
@Request() request: JawnAuthenticatedRequest
): Promise<
| {
error: string;
}
| {
job: any;
events: any;
}
> {
): Promise<Result<FineTuneJobStats, string>> {
const { data: fineTuneJob, error: fineTuneJobError } =
await supabaseServer.client
.from("finetune_job")
Expand Down
8 changes: 8 additions & 0 deletions valhalla/jawn/src/lib/shared/filters/filterDefs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,13 @@ type ExperimentHypothesisRunToOperator = {
export type ExperimentHypothesisRunScoreValue =
SingleKey<ExperimentHypothesisRunToOperator>;

export type ExperimentDatasetV2RowToOperators = {
dataset_id: SingleKey<TextOperators>;
};

export type FilterLeafExperimentDatasetV2Row =
SingleKey<ExperimentDatasetV2RowToOperators>;

export type TablesAndViews = {
user_metrics: FilterLeafUserMetrics;
user_api_keys: FilterLeafUserApiKeys;
Expand All @@ -311,6 +318,7 @@ export type TablesAndViews = {
experiment: FilterLeafExperiment;
experiment_hypothesis_run: ExperimentHypothesisRunScoreValue;
score_value: FilterLeafScoreValue;
experiment_dataset_v2_row: FilterLeafExperimentDatasetV2Row;
request_response_search: FilterLeafRequestResponseSearch;

// CLICKHOUSE TABLES
Expand Down
4 changes: 4 additions & 0 deletions valhalla/jawn/src/lib/shared/filters/filters.ts
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,9 @@ const whereKeyMappings: KeyMappings = {
experiment_hypothesis_run: easyKeyMappings<"experiment_hypothesis_run">({
result_request_id: "experiment_v2_hypothesis_run.result_request_id",
}),
experiment_dataset_v2_row: easyKeyMappings<"experiment_dataset_v2_row">({
dataset_id: "experiment_dataset_v2_row.dataset_id",
}),

// Deprecated
values: NOT_IMPLEMENTED,
Expand Down Expand Up @@ -305,6 +308,7 @@ const havingKeyMappings: KeyMappings = {
prompt_v2: NOT_IMPLEMENTED,
prompts_versions: NOT_IMPLEMENTED,
experiment: NOT_IMPLEMENTED,
experiment_dataset_v2_row: NOT_IMPLEMENTED,

// Deprecated
values: NOT_IMPLEMENTED,
Expand Down
25 changes: 25 additions & 0 deletions valhalla/jawn/src/lib/stores/request/request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,38 @@ function addJoinQueries(joinQuery: string, filter: FilterNode): string {
left join score_value ON request.id = score_value.request_id`;
}

if (JSON.stringify(filter).includes("experiment_dataset_v2_row")) {
joinQuery += `
left join prompt_input_record on request.id = prompt_input_record.source_request
left join experiment_dataset_v2_row on prompt_input_record.id = experiment_dataset_v2_row.input_record`;
}

if (JSON.stringify(filter).includes("request_response_search")) {
joinQuery += `
left join request_response_search on request.id = request_response_search.request_id`;
}
return joinQuery;
}

export async function fetchBodies(
requests: HeliconeRequest[]
): Promise<HeliconeRequest[]> {
const getAllBodies = requests.map(async (request) => {
if (!request.signed_body_url) {
throw new Error("No signed body url");
}
const bodies = await (await fetch(request.signed_body_url)).json();

return {
...request,
request_body: bodies.request_body,
response_body: bodies.response_body,
};
});

return Promise.all(getAllBodies);
}

export async function getRequests(
orgId: string,
filter: FilterNode,
Expand Down
27 changes: 27 additions & 0 deletions valhalla/jawn/src/managers/dataset/DatasetManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,25 @@ export class DatasetManager extends BaseManager {
return result;
}

private async ensureInputRecordsExist(
requestIds: string[]
): Promise<Result<null, string>> {
const res = await dbExecute(
`
INSERT INTO prompt_input_record (source_request, inputs, prompt_version)
SELECT id, '{}'::jsonb, null
FROM request
WHERE id = ANY($1)
ON CONFLICT DO NOTHING
`,
[requestIds]
);
if (res.error) {
return err(res.error);
}
return ok(null);
}

async addDataset(params: NewDatasetParams): Promise<Result<string, string>> {
const dataset = await supabaseServer.client
.from("experiment_dataset_v2")
Expand All @@ -65,6 +84,14 @@ export class DatasetManager extends BaseManager {
return err(dataset.error.message);
}

const ensureInputRecords = await this.ensureInputRecordsExist(
params.requestIds
);

if (ensureInputRecords.error) {
return err(ensureInputRecords.error);
}

const res = await dbExecute(
`
INSERT INTO experiment_dataset_v2_row (dataset_id, input_record)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ import {
FineTuningJob,
FineTuningJobEventsPage,
} from "openai/resources/fine-tuning/jobs";
import { OpenAIClient } from "../lib/clients/OpenAIClient";
import { HeliconeRequest } from "../lib/stores/request/request";
import { Result, err, ok } from "../lib/shared/result";
import { OpenAIClient } from "../../lib/clients/OpenAIClient";
import { HeliconeRequest } from "../../lib/stores/request/request";
import { Result, err, ok } from "../../lib/shared/result";
import crypto from "crypto";
import fs from "fs";
import { chatCompletionMessage } from "./types";
import { chatCompletionMessage } from "../types";
import { ChatCompletionMessageParam } from "openai/resources";
import { ChatCompletionCreateParamsBase } from "openai/resources/chat/completions";
import OpenAI from "openai";
Expand Down
Loading
Loading