Skip to content

Commit

Permalink
Merge branch 'master' into griffin/fix-scorers-autofocus
Browse files Browse the repository at this point in the history
  • Loading branch information
gtarpenning committed Dec 23, 2024
2 parents 910cfdc + 1b77d8f commit de17815
Show file tree
Hide file tree
Showing 11 changed files with 622 additions and 21 deletions.
16 changes: 14 additions & 2 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,16 @@ jobs:
env:
CI: 1
WANDB_ENABLE_TEST_CONTAINER: true
LOGGING_ENABLED: true
ports:
- '8080:8080'
- '8083:8083'
- '9015:9015'
options: --health-cmd "curl --fail http://localhost:8080/healthz || exit 1" --health-interval=5s --health-timeout=3s
options: >-
--health-cmd "wget -q -O /dev/null http://localhost:8080/healthz || exit 1"
--health-interval=5s
--health-timeout=3s
--health-start-period=10s
outputs:
tests_should_run: ${{ steps.test_check.outputs.tests_should_run }}
steps:
Expand Down Expand Up @@ -254,11 +259,16 @@ jobs:
env:
CI: 1
WANDB_ENABLE_TEST_CONTAINER: true
LOGGING_ENABLED: true
ports:
- '8080:8080'
- '8083:8083'
- '9015:9015'
options: --health-cmd "curl --fail http://localhost:8080/healthz || exit 1" --health-interval=5s --health-timeout=3s
options: >-
--health-cmd "wget -q -O /dev/null http://localhost:8080/healthz || exit 1"
--health-interval=5s
--health-timeout=3s
--health-start-period=10s
weave_clickhouse:
image: clickhouse/clickhouse-server
ports:
Expand All @@ -267,6 +277,8 @@ jobs:
steps:
- name: Checkout
uses: actions/checkout@v3
- name: Enable debug logging
run: echo "ACTIONS_STEP_DEBUG=true" >> $GITHUB_ENV
- name: Set up Python ${{ matrix.python-version-major }}.${{ matrix.python-version-minor }}
uses: actions/setup-python@v5
with:
Expand Down
4 changes: 2 additions & 2 deletions sdks/node/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion sdks/node/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "weave",
"version": "0.7.3",
"version": "0.7.4",
"description": "AI development toolkit",
"types": "dist/index.d.ts",
"main": "dist/index.js",
Expand Down
22 changes: 21 additions & 1 deletion sdks/node/src/__tests__/weaveClient.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ describe('WeaveClient', () => {
beforeEach(() => {
mockTraceServerApi = {
call: {
callStartBatchCallUpsertBatchPost: jest.fn(),
callStartBatchCallUpsertBatchPost: jest.fn().mockResolvedValue({}),
},
} as any;
mockWandbServerApi = {} as any;
Expand All @@ -121,6 +121,26 @@ describe('WeaveClient', () => {
(client as any).BATCH_INTERVAL = 10;
});

it('should handle oversized batch items', async () => {
const bigPayloadSize = 11 * 1024 * 1024;
const smallData = {mode: 'start', data: {id: '2', payload: 'small'}};
const bigData = {
mode: 'start',
data: {id: '1', payload: 'x'.repeat(bigPayloadSize)},
};
(client as any).callQueue.push(smallData, bigData);

await (client as any).processBatch();

expect(
mockTraceServerApi.call.callStartBatchCallUpsertBatchPost
).toHaveBeenCalledWith({
batch: [{mode: 'start', req: smallData.data}],
});

expect((client as any).callQueue).toContain(bigData);
});

it('should batch multiple calls together', async () => {
// Add test calls to queue
(client as any).callQueue.push(
Expand Down
56 changes: 48 additions & 8 deletions sdks/node/src/weaveClient.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import {AsyncLocalStorage} from 'async_hooks';
import * as fs from 'fs';
import {uuidv7} from 'uuidv7';

import {Dataset} from './dataset';
Expand Down Expand Up @@ -32,6 +33,8 @@ import {packageVersion} from './utils/userAgent';
import {WandbServerApi} from './wandb/wandbServerApi';
import {ObjectRef, WeaveObject, getClassChain} from './weaveObject';

const WEAVE_ERRORS_LOG_FNAME = 'weaveErrors.log';

export type CallStackEntry = {
callId: string;
traceId: string;
Expand Down Expand Up @@ -71,13 +74,18 @@ class CallStack {
type CallStartParams = StartedCallSchemaForInsert;
type CallEndParams = EndedCallSchemaForInsert;

// We count characters item by item, and try to limit batches to about this size.
const MAX_BATCH_SIZE_CHARS = 10 * 1024 * 1024;

export class WeaveClient {
private stackContext = new AsyncLocalStorage<CallStack>();
private callQueue: Array<{mode: 'start' | 'end'; data: any}> = [];
private batchProcessTimeout: NodeJS.Timeout | null = null;
private isBatchProcessing: boolean = false;
private batchProcessingPromises: Set<Promise<void>> = new Set();
private readonly BATCH_INTERVAL: number = 200;
private errorCount = 0;
private readonly MAX_ERRORS = 10;

constructor(
public traceServerApi: TraceServerApi<any>,
Expand Down Expand Up @@ -114,25 +122,43 @@ export class WeaveClient {

this.isBatchProcessing = true;

// We count characters item by item, and try to limit batches to about
// this size.
const maxBatchSizeChars = 5 * 1024 * 1024;

let batchToProcess = [];
let currentBatchSize = 0;

while (this.callQueue.length > 0 && currentBatchSize < maxBatchSizeChars) {
const item = this.callQueue[0];
while (
this.callQueue.length > 0 &&
currentBatchSize < MAX_BATCH_SIZE_CHARS
) {
const item = this.callQueue.shift();
if (item === undefined) {
throw new Error('Call queue is empty');
}

const itemSize = JSON.stringify(item).length;
if (itemSize > MAX_BATCH_SIZE_CHARS) {
fs.appendFileSync(
WEAVE_ERRORS_LOG_FNAME,
`Item size ${itemSize} exceeds max batch size ${MAX_BATCH_SIZE_CHARS}. Item: ${JSON.stringify(item)}\n`
);
}

if (currentBatchSize + itemSize <= maxBatchSizeChars) {
batchToProcess.push(this.callQueue.shift()!);
if (currentBatchSize + itemSize <= MAX_BATCH_SIZE_CHARS) {
batchToProcess.push(item);
currentBatchSize += itemSize;
} else {
// doesn't fit, put it back
this.callQueue.unshift(item);
break;
}
}

if (batchToProcess.length === 0) {
this.batchProcessTimeout = null;
return;
}

this.isBatchProcessing = true;

const batchReq = {
batch: batchToProcess.map(item => ({
mode: item.mode,
Expand All @@ -146,8 +172,20 @@ export class WeaveClient {
);
} catch (error) {
console.error('Error processing batch:', error);
this.errorCount++;
fs.appendFileSync(
WEAVE_ERRORS_LOG_FNAME,
`Error processing batch: ${error}\n`
);

// Put failed items back at the front of the queue
this.callQueue.unshift(...batchToProcess);

// Exit if we have too many errors
if (this.errorCount > this.MAX_ERRORS) {
console.error(`Exceeded max errors: ${this.MAX_ERRORS}; exiting`);
process.exit(1);
}
} finally {
this.isBatchProcessing = false;
this.batchProcessTimeout = null;
Expand Down Expand Up @@ -734,7 +772,9 @@ function mergeSummaries(left: Summary, right: Summary): Summary {
if (typeof leftValue === 'number' && typeof result[key] === 'number') {
result[key] = leftValue + result[key];
} else if (
leftValue != null &&
typeof leftValue === 'object' &&
result[key] != null &&
typeof result[key] === 'object'
) {
result[key] = mergeSummaries(leftValue, result[key]);
Expand Down
21 changes: 21 additions & 0 deletions wb_schema.gql
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,24 @@ type UpdateUserPayload {
clientMutationId: String
}

input InsertSecretInput {
entityName: String!
secretName: String!
@constraints(max: 255, pattern: "^[A-Za-z_][A-Za-z0-9_]*$")
secretValue: String!
clientMutationId: String
}

type InsertSecretPayload {
success: Boolean!
clientMutationId: String
}

type Mutation {
updateUser(input: UpdateUserInput!): UpdateUserPayload @audit
deleteView(input: DeleteViewInput!): DeleteViewPayload
upsertView(input: UpsertViewInput!): UpsertViewPayload @audit
insertSecret(input: InsertSecretInput!): InsertSecretPayload
updateArtifactSequence(
input: UpdateArtifactSequenceInput!
): UpdateArtifactCollectionPayload
Expand Down Expand Up @@ -275,6 +289,12 @@ type RowType {
row: JSON!
}

type Secret {
entityId: Int!
name: String!
createdAt: DateTime!
}

type Entity implements Node {
id: ID!
name: String!
Expand All @@ -296,6 +316,7 @@ type Entity implements Node {
filters: JSONString
collectionTypes: [ArtifactCollectionType!]
): ArtifactCollectionConnection
secrets: [Secret!]!
}

type EntityConnection {
Expand Down
121 changes: 121 additions & 0 deletions weave-js/src/common/hooks/useSecrets.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/**
* This is a GraphQL approach to querying viewer information.
* There is a query engine based approach in useViewerUserInfo.ts.
*/

import {
gql,
TypedDocumentNode,
useApolloClient,
useMutation,
} from '@apollo/client';
import {useEffect, useState} from 'react';

const SECRETS_QUERY = gql`
query secrets($entityName: String!) {
entity(name: $entityName) {
id
secrets {
entityId
name
createdAt
}
}
}
`;

const SECRETS_MUTATION = gql`
mutation insertSecret(
$entityName: String!
$secretName: String!
$secretValue: String!
) {
insertSecret(
input: {
entityName: $entityName
secretName: $secretName
secretValue: $secretValue
}
) {
success
}
}
` as TypedDocumentNode<InsertSecretResponse, InsertSecretVariables>;

type SecretResponseLoading = {
loading: true;
entityId: string;
secrets: string[];
};
type SecretResponseSuccess = {
loading: false;
entityId: string;
secrets: string[];
};
type SecretResponse = SecretResponseLoading | SecretResponseSuccess;

export const useSecrets = ({
entityName,
}: {
entityName: string;
}): SecretResponse => {
const [response, setResponse] = useState<SecretResponse>({
loading: true,
entityId: '',
secrets: [],
});

const apolloClient = useApolloClient();

useEffect(() => {
let mounted = true;
apolloClient
.query({query: SECRETS_QUERY as any, variables: {entityName}})
.then(result => {
if (!mounted) {
return;
}
const secretPayloads = result.data.entity?.secrets ?? [];
if (!secretPayloads) {
setResponse({
loading: false,
entityId: '',
secrets: [],
});
return;
}
const secrets = secretPayloads.map((secret: any) => secret.name).sort();
setResponse({
loading: false,
entityId: result.data.entity?.id ?? '',
secrets,
});
});
return () => {
mounted = false;
};
}, [apolloClient, entityName]);

return response;
};

interface InsertSecretResponse {
insertSecret: {
success: boolean;
};
}

type InsertSecretVariables = {
entityName: string;
secretName: string;
secretValue: string;
};

export const useInsertSecret = () => {
const [insertSecret] = useMutation<
InsertSecretResponse,
InsertSecretVariables
>(SECRETS_MUTATION);

return insertSecret;
};
Loading

0 comments on commit de17815

Please sign in to comment.