Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions e2e/testTimestampForSlot.e2e.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import winston from "winston";
import { ClusterUrl } from "@solana/kit";
import { getNearestSlotTime } from "../src/arch/svm/utils";
import { QuorumFallbackSolanaRpcFactory, CachedSolanaRpcFactory } from "../src/providers";
import { MemoryCacheClient } from "../src/caching/Memory";

/**
* USAGE EXAMPLES:
Expand Down Expand Up @@ -62,7 +63,7 @@ async function testNearestSlotTime(
const startTime = Date.now();

try {
const { slot, timestamp } = await getNearestSlotTime(rpcClient, logger);
const { slot, timestamp } = await getNearestSlotTime(rpcClient, { commitment: "confirmed" }, logger);
const elapsedTime = Date.now() - startTime;

console.log(`✅ Slot ${slot} -> ${timestamp} (${new Date(timestamp * 1000).toISOString()}) (${elapsedTime}ms)`);
Expand Down Expand Up @@ -100,11 +101,12 @@ async function runTest(options: TestOptions) {

// Create the RPC factory
const allEndpoints = [options.endpoint, ...options.fallbackEndpoints];
const memoryCache = new MemoryCacheClient();
const factoryParams = allEndpoints.map(
(endpoint) =>
[
"test-timestamp-for-slot",
undefined, // redisClient
memoryCache, // redisClient
options.retries,
options.retryDelay,
10, // maxConcurrency
Expand Down
138 changes: 103 additions & 35 deletions src/providers/solana/cachedRpcFactory.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,39 @@
import { RpcTransport, GetTransactionApi, RpcFromTransport, SolanaRpcApiFromTransport } from "@solana/kit";
import {
RpcTransport,
GetTransactionApi,
RpcFromTransport,
SolanaRpcApiFromTransport,
GetBlockTimeApi,
} from "@solana/kit";
import { getThrowSolanaErrorResponseTransformer } from "@solana/rpc-transformers";
import { is, object, optional, string, tuple } from "superstruct";
import { is, number, object, optional, string, tuple } from "superstruct";
import { CachingMechanismInterface } from "../../interfaces";
import { SolanaClusterRpcFactory } from "./baseRpcFactories";
import { CacheType } from "../utils";
import { jsonReplacerWithBigInts, jsonReviverWithBigInts } from "../../utils";
import { RetrySolanaRpcFactory } from "./retryRpcFactory";
import { random } from "lodash";
import { BLOCK_NUMBER_TTL, PROVIDER_CACHE_TTL, PROVIDER_CACHE_TTL_MODIFIER as ttl_modifier } from "../constants";
import { assert } from "chai";

// A CachedFactory contains a RetryFactory and provides a caching layer that caches
// the results of the RetryFactory's RPC calls.
export class CachedSolanaRpcFactory extends SolanaClusterRpcFactory {
public readonly getTransactionCachePrefix: string;
public readonly getBlockTimeCachePrefix: string;
baseTTL = PROVIDER_CACHE_TTL;

// Holds the underlying transport that the cached transport wraps.
protected retryTransport: RpcTransport;

// RPC client based on the retry transport, used internally to check confirmation status.
protected retryRpcClient: RpcFromTransport<SolanaRpcApiFromTransport<RpcTransport>, RpcTransport>;

// Cached latest confirmed slot and its publish timestamp.
latestConfirmedSlot = Number.MAX_SAFE_INTEGER;
publishTimestampLatestConfirmedSlot = 0;
maxAgeLatestConfirmedSlot = 1000 * BLOCK_NUMBER_TTL;

constructor(
providerCacheNamespace: string,
readonly redisClient?: CachingMechanismInterface,
Expand All @@ -38,12 +54,22 @@ export class CachedSolanaRpcFactory extends SolanaClusterRpcFactory {
// Pre-compute as much of the redis key as possible.
const cachePrefix = `${providerCacheNamespace},${new URL(this.clusterUrl).hostname},${this.chainId}`;
this.getTransactionCachePrefix = `${cachePrefix}:getTransaction,`;
this.getBlockTimeCachePrefix = `${cachePrefix}:getBlockTime,`;
}

public createTransport(): RpcTransport {
return async <TResponse>(...args: Parameters<RpcTransport>): Promise<TResponse> => {
const { method, params } = args[0].payload as { method: string; params?: unknown[] };
const cacheType = this.redisClient ? this.cacheType(method) : CacheType.NONE;
if (!this.redisClient) {
return this.retryTransport<TResponse>(...args);
}

let latestConfirmedSlot = 0;
if (method === "getBlockTime") {
latestConfirmedSlot = await this.getLatestConfirmedSlot();
}

const cacheType = this.cacheType(method, params ?? [], latestConfirmedSlot);

if (cacheType === CacheType.NONE) {
return this.retryTransport<TResponse>(...args);
Expand All @@ -60,67 +86,105 @@ export class CachedSolanaRpcFactory extends SolanaClusterRpcFactory {
}

// Cache does not have the result. Query it directly and cache it if finalized.
return this.requestAndCacheFinalized<TResponse>(...args);
return this.requestAndCacheFinalized<TResponse>(cacheType, ...args);
};
}

private async requestAndCacheFinalized<TResponse>(...args: Parameters<RpcTransport>): Promise<TResponse> {
private async getLatestConfirmedSlot(): Promise<number> {
const fetchLatestConfirmedSlot = async () => {
return await this.retryRpcClient.getSlot({ commitment: "confirmed" }).send();
};
// If first time fetching, always get and set the latest confirmed slot.
if (this.latestConfirmedSlot === Number.MAX_SAFE_INTEGER) {
this.latestConfirmedSlot = Number(await fetchLatestConfirmedSlot());
this.publishTimestampLatestConfirmedSlot = Date.now();
return this.latestConfirmedSlot;
}
// If the last time we set the latest confirmed slot was more than maxAgeLatestConfirmedSlot ago,
// reset the latest confirmed slot.
if (Date.now() - this.publishTimestampLatestConfirmedSlot > this.maxAgeLatestConfirmedSlot) {
this.latestConfirmedSlot = Number(await fetchLatestConfirmedSlot());
this.publishTimestampLatestConfirmedSlot = Date.now();
}
return this.latestConfirmedSlot;
}

private async requestAndCacheFinalized<TResponse>(
cacheType: CacheType,
...args: Parameters<RpcTransport>
): Promise<TResponse> {
assert(
cacheType === CacheType.NO_TTL || cacheType === CacheType.WITH_TTL,
"requestAndCacheFinalized: Cache type must be NO_TTL or WITH_TTL"
);
const { method, params } = args[0].payload as { method: string; params?: unknown[] };

// Only handles getTransaction right now.
if (method !== "getTransaction") return this.retryTransport<TResponse>(...args);
if (method !== "getTransaction" && method !== "getBlockTime") return this.retryTransport<TResponse>(...args);

// Do not throw if params are not valid, just skip caching and pass through to the underlying transport.
if (!this.isGetTransactionParams(params)) return this.retryTransport<TResponse>(...args);

// Check the confirmation status first to avoid caching non-finalized transactions. In case of null or errors just
// skip caching and pass through to the underlying transport.
try {
const getSignatureStatusesResponse = await this.retryRpcClient
.getSignatureStatuses([params[0]], {
searchTransactionHistory: true,
})
.send();
if (getSignatureStatusesResponse.value[0]?.confirmationStatus !== "finalized") {
return this.retryTransport<TResponse>(...args);
}
} catch (error) {
return this.retryTransport<TResponse>(...args);
switch (method) {
case "getTransaction":
if (!this.isGetTransactionParams(params)) return this.retryTransport<TResponse>(...args);
// Check the confirmation status first to avoid caching non-finalized transactions. In case of null or errors just
// skip caching and pass through to the underlying transport.
try {
const getSignatureStatusesResponse = await this.retryRpcClient
.getSignatureStatuses([params[0]], {
searchTransactionHistory: true,
})
.send();
if (getSignatureStatusesResponse.value[0]?.confirmationStatus !== "finalized") {
return this.retryTransport<TResponse>(...args);
}
} catch (error) {
return this.retryTransport<TResponse>(...args);
}
break;
case "getBlockTime":
if (!this.isGetBlockTimeParams(params)) return this.retryTransport<TResponse>(...args);
break;
}

const getTransactionResponse = await this.retryTransport<TResponse>(...args);
const response = await this.retryTransport<TResponse>(...args);

// Do not cache JSON-RPC error responses, let them pass through for the RPC client to handle.
try {
getThrowSolanaErrorResponseTransformer()(getTransactionResponse, { methodName: method, params });
getThrowSolanaErrorResponseTransformer()(response, { methodName: method, params });
} catch {
return getTransactionResponse;
return response;
}

// Cache the transaction JSON-RPC response as we checked the transaction is finalized and not an error.
// Cache the transaction JSON-RPC response as we checked the response data is mature enough and not an error.
const redisKey = this.buildRedisKey(method, params);
await this.redisClient?.set(
redisKey,
JSON.stringify(getTransactionResponse, jsonReplacerWithBigInts),
Number.POSITIVE_INFINITY
);
// Apply a random margin to the standard TTL time to spread expiry over a larger time window.
const standardTtl = this.baseTTL + Math.ceil(random(-ttl_modifier, ttl_modifier, true) * this.baseTTL);
const ttl = cacheType === CacheType.WITH_TTL ? standardTtl : Number.POSITIVE_INFINITY;
await this.redisClient?.set(redisKey, JSON.stringify(response, jsonReplacerWithBigInts), ttl);

return getTransactionResponse;
return response;
}

private buildRedisKey(method: string, params?: unknown[]) {
// Only handles getTransaction right now.
switch (method) {
case "getTransaction":
return this.getTransactionCachePrefix + JSON.stringify(params, jsonReplacerWithBigInts);
case "getBlockTime":
return this.getBlockTimeCachePrefix + JSON.stringify(params, jsonReplacerWithBigInts);
default:
throw new Error(`CachedSolanaRpcFactory::buildRedisKey: invalid JSON-RPC method ${method}`);
}
}

private cacheType(method: string): CacheType {
// Today, we only cache getTransaction.
if (method === "getTransaction") {
private cacheType(method: string, params: unknown[] = [], latestConfirmedSlot = 0): CacheType {
if (method === "getBlockTime") {
const targetSlot = (params as Parameters<GetBlockTimeApi["getBlockTime"]>)[0];
if (targetSlot <= latestConfirmedSlot) {
return CacheType.WITH_TTL;
} else {
return CacheType.NONE;
}
} else if (method === "getTransaction") {
// We only store finalized transactions in the cache, hence TTL is not required.
return CacheType.NO_TTL;
} else {
Expand All @@ -137,4 +201,8 @@ export class CachedSolanaRpcFactory extends SolanaClusterRpcFactory {
])
);
}

private isGetBlockTimeParams(params: unknown): params is Parameters<GetBlockTimeApi["getBlockTime"]> {
return is(params, tuple([number()]));
}
}
Loading