Skip to content

Commit

Permalink
Batch Query - node health validation by block height (#128)
Browse files Browse the repository at this point in the history
* test: mocks troubleshooting

* feat: got something working

* test: batch query tests include node health check

* chore: resolve type issues related to block height batch query

* feat: clean up types and add to docs

* docs: notes on callback function

* chore: missed a comma

* feat: batch query height implementation

* feat: node health validation on stkd scrt query

* feat: remove unneeded check due to typescript requiring property

* chore: add changeset

* refactor: change to min block height validation options name

* docs: update docs site for name change

* docs: missed save on one line for last commit

* feat: callback fires only once, remove extra first
  • Loading branch information
AustinWoetzel authored Apr 18, 2024
1 parent 4eeb514 commit 555f33e
Show file tree
Hide file tree
Showing 33 changed files with 307 additions and 23 deletions.
5 changes: 5 additions & 0 deletions .changeset/rare-jobs-enjoy.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@shadeprotocol/shadejs": minor
---

add node health validation to query router and functions using the query router
2 changes: 1 addition & 1 deletion docs/contracts.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ This page contains a list of deployed contracts.
|-------------------- |----------------------------------------------- |------------------------------------------------------------------ |
| ShadeSwap Factory | secret1ja0hcwvy76grqkpgwznxukgd7t8a8anmmx05pp | 2ad4ed2a4a45fd6de3daca9541ba82c26bb66c76d1c3540de39b509abd26538e |
| ShadeSwap Router | secret1pjhdug87nxzv0esxasmeyfsucaj98pw4334wyc | 448e3f6d801e453e838b7a5fbaa4dd93b84d0f1011245f0d5745366dadaf3e85 |
| Batch Query Router | secret17gnlxnwux0szd7qhl90ym8lw22qvedjz4v09dm | 72a09535b77b76862f7b568baf1ddbe158a2e4bbd0f0879c69ada9b398e31c1f |
| Batch Query Router | secret15mkmad8ac036v4nrpcc7nk8wyr578egt077syt | 1c7e86ba4fdb6760e70bf08a7df7f44b53eb0b23290e3e69ca96140810d4f432 |
| Oracle | secret10n2xl5jmez6r9umtdrth78k0vwmce0l5m9f5dm | 32c4710842b97a526c243a68511b15f58d6e72a388af38a7221ff3244c754e91 |
| stkd-scrt | secret1k6u0cy4feepm6pehnz804zmwakuwdapm69tuc4 | f6be719b3c6feb498d3554ca0398eb6b7e7db262acb33f84a8f12106da6bbb09 |
| Shade Staking | secret1y6px5x7jzrk8hyvy67f06ytn8v0jwculypwxws | 2a1ae7fd2be82931cb11d0ce82b2e243507f2006074e2f316da661beb1abe3c3 |
Expand Down
22 changes: 21 additions & 1 deletion docs/queries/batch-query.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,38 @@ type BatchQueryParams = {
queryMsg: any,
}

// MinBlockHeightValidationOptions is an optional property that is used to validate the
// accuracy of the data in the batch response using an expected minimum
// block height associated with the data. The query will be retried until
// non-stale data is found (up to a max number of retries before error is thrown).
// The assumption is that you are working with a node cluster where one or more
// stale nodes are mixed into healthy nodes, and eventually the query will be
// tried with a healthy node and meet the minimum block height threshold.
// onStaleNodeDetected is a callback function for when stale nodes are found. This
// can be useful for error/node monitoring services.
type MinBlockHeightValidationOptions = {
minBlockHeight: number, // data must come from this block height or newer block
maxRetries: number,
onStaleNodeDetected?: () => void
}


async function batchQuery({
contractAddress,
codeHash,
lcdEndpoint,
chainId,
queries,
batchSize, // defaults to all queries in single batch
minBlockHeightValidationOptions,
}:{
contractAddress: string,
codeHash?: string,
lcdEndpoint?: string,
chainId?: string,
queries: BatchQueryParams[],
batchSize?: number,
minBlockHeightValidationOptions?: MinBlockHeightValidationOptions,
}): Promise<BatchQueryParsedResponse>
```

Expand All @@ -52,7 +70,8 @@ enum BatchItemResponseStatus {
type BatchQueryParsedResponseItem = {
id: string | number,
response: any,
status?: BatchItemResponseStatus
status?: BatchItemResponseStatus,
blockHeight: number, // the block height that the data is from
}
Expand Down Expand Up @@ -121,5 +140,6 @@ console.log(output)
},
},
},
blockHeight: 123456789,
}];
```
90 changes: 90 additions & 0 deletions src/contracts/services/batchQuery.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ test('it can call the single batch query service', async () => {
queries: ['BATCH_QUERY' as unknown as BatchQueryParams],
client: 'SECRET_CLIENT' as unknown as SecretNetworkClient,
};

// observables function
sendSecretClientContractQuery$.mockReturnValueOnce(of(batchPairConfigResponse));

Expand All @@ -80,11 +81,100 @@ test('it can call the single batch query service', async () => {

// async/await function
sendSecretClientContractQuery$.mockReturnValueOnce(of(batchPairConfigResponse));

const response = await batchQuery(input);
expect(msgBatchQuery).toHaveBeenNthCalledWith(2, input.queries);
expect(response).toStrictEqual(batchPairConfigParsed);
});

test('it can call the single batch query service and retry on stale node found', async () => {
const input = {
contractAddress: 'CONTRACT_ADDRESS',
codeHash: 'CODE_HASH',
queries: ['BATCH_QUERY' as unknown as BatchQueryParams],
client: 'SECRET_CLIENT' as unknown as SecretNetworkClient,
minBlockHeightValidationOptions: {
minBlockHeight: 3,
maxRetries: 3,
},
};

const batchPairResponse1 = {
batch: {
...batchPairConfigResponse.batch,
block_height: 2, // simulate stale node
},
};

// observables function
sendSecretClientContractQuery$.mockReturnValueOnce(
of(batchPairResponse1),
).mockReturnValueOnce(
of(batchPairResponse1),
).mockReturnValueOnce(
of(batchPairResponse1),
).mockReturnValueOnce(
of(batchPairConfigResponse),
);

let output;
batchQuerySingleBatch$(input).subscribe({
next: (response) => {
output = response;
},
});

expect(msgBatchQuery).toHaveBeenNthCalledWith(1, input.queries);
expect(output).toStrictEqual(batchPairConfigParsed);

// async/await function
sendSecretClientContractQuery$.mockReturnValueOnce(
of(batchPairResponse1),
).mockReturnValueOnce(
of(batchPairResponse1),
).mockReturnValueOnce(
of(batchPairResponse1),
).mockReturnValueOnce(
of(batchPairConfigResponse),
);
const response = await batchQuery(input);
expect(msgBatchQuery).toHaveBeenNthCalledWith(2, input.queries);
expect(response).toStrictEqual(batchPairConfigParsed);
});

test('it can call the single batch query service and detect query retry limit exceeded', async () => {
const input = {
contractAddress: 'CONTRACT_ADDRESS',
codeHash: 'CODE_HASH',
queries: ['BATCH_QUERY' as unknown as BatchQueryParams],
client: 'SECRET_CLIENT' as unknown as SecretNetworkClient,
minBlockHeightValidationOptions: {
minBlockHeight: 3,
maxRetries: 2,
},
};

const batchPairResponse1 = {
batch: {
...batchPairConfigResponse.batch,
block_height: 2, // simulate stale node
},
};

// async/await function
sendSecretClientContractQuery$.mockReturnValueOnce(
of(batchPairResponse1),
).mockReturnValueOnce(
of(batchPairResponse1),
).mockReturnValueOnce(
of(batchPairResponse1),
).mockReturnValueOnce(
of(batchPairConfigResponse), // will never reach final case due to retry limit
);

await expect(() => batchQuery(input)).rejects.toThrowError('Reached maximum retry attempts for stale node error.');
});

test('it can call the multi-batch query service on a single batch', async () => {
const input = {
contractAddress: 'CONTRACT_ADDRESS',
Expand Down
78 changes: 61 additions & 17 deletions src/contracts/services/batchQuery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import {
concatAll,
reduce,
catchError,
of,
throwError,
} from 'rxjs';
import { sendSecretClientContractQuery$ } from '~/client/services/clientServices';
import { getActiveQueryClient$ } from '~/client';
Expand All @@ -20,6 +22,7 @@ import {
import { BatchQueryResponse } from '~/types/contracts/batchQuery/response';
import { decodeB64ToJson } from '~/lib/utils';
import { SecretNetworkClient } from 'secretjs';
import { MinBlockHeightValidationOptions } from '~/types/contracts/batchQuery/service';

/**
* a parses the batch query response into a usable data model
Expand All @@ -33,6 +36,7 @@ function parseBatchQuery(response: BatchQueryResponse): BatchQueryParsedResponse
id: decodeB64ToJson(item.id),
response: item.response.system_err, // response is not B64 encoded
status: BatchItemResponseStatus.ERROR,
blockHeight: response.batch.block_height,
};
}

Expand All @@ -43,6 +47,7 @@ function parseBatchQuery(response: BatchQueryResponse): BatchQueryParsedResponse
// a response available.
response: decodeB64ToJson(item.response.response!),
status: BatchItemResponseStatus.SUCCESS,
blockHeight: response.batch.block_height,
};
});
}
Expand All @@ -64,27 +69,60 @@ const batchQuerySingleBatch$ = ({
codeHash,
queries,
client,
minBlockHeightValidationOptions,
}:{
contractAddress: string,
codeHash?: string,
queries: BatchQueryParams[],
client: SecretNetworkClient
}) => sendSecretClientContractQuery$({
queryMsg: msgBatchQuery(queries),
client,
contractAddress,
codeHash,
}).pipe(
map((response) => parseBatchQuery(response as BatchQueryResponse)),
first(),
catchError((err) => {
if (err.message.includes('{wasm contract}')) {
throw new Error('{wasm contract} error that typically occurs when batch size is too large and node gas query limits are exceeded. Consider reducing the batch size.');
} else {
throw new Error(err);
}
}),
);
client: SecretNetworkClient,
minBlockHeightValidationOptions?: MinBlockHeightValidationOptions,
}) => {
let retryCount = 0;
return of(1).pipe( // placeholder observable of(1) used here so that we can start a data stream
// and retry from this level when certain error conditions are reached
switchMap(() => sendSecretClientContractQuery$({
queryMsg: msgBatchQuery(queries),
client,
contractAddress,
codeHash,
}).pipe(
map((response) => response as BatchQueryResponse), // map used for typecast only
switchMap((response) => {
// create an error if stale node is detected
if (minBlockHeightValidationOptions
&& response.batch.block_height < minBlockHeightValidationOptions.minBlockHeight
) {
// callback for when stale node is detected. Useful for error logging.
// check the retryCount to ensure that this only fires one time
if (retryCount === 0
&& typeof minBlockHeightValidationOptions.onStaleNodeDetected === 'function') {
minBlockHeightValidationOptions.onStaleNodeDetected();
}
return throwError(() => new Error('Stale node detected'));
}
return of(response);
}),
map(parseBatchQuery),
)),
first(),
catchError((error, caught) => {
if (error.message === 'Stale node detected') {
retryCount += 1;
if (
minBlockHeightValidationOptions
&& retryCount <= minBlockHeightValidationOptions?.maxRetries
) {
// retry the query
return caught;
}
return throwError(() => new Error('Reached maximum retry attempts for stale node error.'));
} if (error.message.includes('{wasm contract}')) {
return throwError(() => new Error('{wasm contract} error that typically occurs when batch size is too large and node gas query limits are exceeded. Consider reducing the batch size.'));
}
return throwError(() => error);
}),
);
};

/**
* batch query of multiple contracts/message at a time
Expand All @@ -98,13 +136,15 @@ const batchQuery$ = ({
chainId,
queries,
batchSize,
minBlockHeightValidationOptions,
}:{
contractAddress: string,
codeHash?: string,
lcdEndpoint?: string,
chainId?: string,
queries: BatchQueryParams[],
batchSize?: number,
minBlockHeightValidationOptions?: MinBlockHeightValidationOptions,
}) => {
// if batch size is passed in, convert single batch into multiple batches,
// otherwise process all data in a single batch
Expand All @@ -119,6 +159,7 @@ const batchQuery$ = ({
codeHash,
queries: batch,
client,
minBlockHeightValidationOptions,
})),
).pipe(
concatAll(),
Expand All @@ -143,13 +184,15 @@ async function batchQuery({
chainId,
queries,
batchSize,
minBlockHeightValidationOptions,
}:{
contractAddress: string,
codeHash?: string,
lcdEndpoint?: string,
chainId?: string,
queries: BatchQueryParams[],
batchSize?: number,
minBlockHeightValidationOptions?: MinBlockHeightValidationOptions,
}) {
return lastValueFrom(batchQuery$({
contractAddress,
Expand All @@ -158,6 +201,7 @@ async function batchQuery({
chainId,
queries,
batchSize,
minBlockHeightValidationOptions,
}));
}

Expand Down
5 changes: 5 additions & 0 deletions src/contracts/services/derivativeScrt.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@ const batchQueryResponse = [
{
id: BatchRouterKeys.STAKING_INFO,
response: stakingInfoResponse,
blockHeight: 1,
},
{
id: BatchRouterKeys.FEE_INFO,
response: feeInfoResponse,
blockHeight: 1,
},
];

Expand Down Expand Up @@ -55,6 +57,7 @@ test('it can parse the batch query resonse', () => {
)).toStrictEqual({
...feeInfoResponseParsed,
...stakingInfoResponseParsed,
blockHeight: 1,
});
});

Expand All @@ -78,6 +81,7 @@ test('it can call the query all info service', async () => {
expect(output).toStrictEqual({
...feeInfoResponseParsed,
...stakingInfoResponseParsed,
blockHeight: 1,
});

// async/await function
Expand All @@ -87,5 +91,6 @@ test('it can call the query all info service', async () => {
expect(output2).toStrictEqual({
...feeInfoResponseParsed,
...stakingInfoResponseParsed,
blockHeight: 1,
});
});
Loading

0 comments on commit 555f33e

Please sign in to comment.