diff --git a/packages/core/package.json b/packages/core/package.json index 5a103ce610..b0a8875b71 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -86,6 +86,7 @@ "knex": "^2.5.1", "least-recent": "^1.0.3", "level": "^8.0.1", + "lodash-es": "^4.17.21", "lodash.clonedeep": "^4.5.0", "mapmoize": "^1.2.1", "multiformats": "^13.0.0", @@ -101,6 +102,7 @@ "@ceramicnetwork/ipfs-daemon": "^5.10.0-rc.0", "@databases/pg-test": "^3.1.2", "@didtools/cacao": "^3.0.0", + "@types/lodash-es": "^4.17.12", "@types/node": "^18.0.3", "csv-parser": "^3.0.0", "did-resolver": "^4.0.1", diff --git a/packages/core/src/anchor/anchor-processing-loop.ts b/packages/core/src/anchor/anchor-processing-loop.ts index 109501bd80..ab6933ec88 100644 --- a/packages/core/src/anchor/anchor-processing-loop.ts +++ b/packages/core/src/anchor/anchor-processing-loop.ts @@ -7,12 +7,19 @@ import type { DiagnosticsLogger } from '@ceramicnetwork/common' import type { NamedTaskQueue } from '../state-management/named-task-queue.js' import type { StreamID } from '@ceramicnetwork/streamid' import { TimeableMetric, SinceField } from '@ceramicnetwork/observability' -import { ModelMetrics, Observable, Counter } from '@ceramicnetwork/model-metrics' +import { ModelMetrics, Counter } from '@ceramicnetwork/model-metrics' +import throttle from 'lodash-es/throttle' +import { CID, Version } from 'multiformats' +import { interval } from 'rxjs' +import { startWith } from 'rxjs/operators' +import { RemoteCAS } from './ethereum/remote-cas.js' const METRICS_REPORTING_INTERVAL_MS = 10000 // 10 second reporting interval const DEFAULT_CONCURRENCY = 25 +const CAS_REQUEST_POLLING_INTERVAL_MS = 1000 / 6 // 1000 ms divided by 6 calls + /** * Get anchor request entries from AnchorRequestStore one by one. For each entry, get CAS response, * and handle the response via `eventHandler.handle`. @@ -28,6 +35,59 @@ export class AnchorProcessingLoop { readonly #anchorStoreQueue: NamedTaskQueue readonly #anchorPollingMetrics: TimeableMetric + readonly #cas: CASClient + // This function is throttled to limit its execution frequency to no more than once every CAS_REQUEST_POLLING_INTERVAL_MS. + // It attempts to get the status of an anchor request from the CAS. If the request is not found, it logs a warning, + // builds a new CAR file for the request, and submits a new request to the CAS. + // The function is configured to execute only at the leading edge of the interval, + // meaning it will execute immediately when called, but subsequent calls + // within the CAS_REQUEST_POLLING_INTERVAL_MS will be ignored, ensuring that the CAS is not overwhelmed with too many + // frequent requests and helping to manage system resources efficiently. + throttledGetStatusForRequest = throttle( + async ( + streamId: StreamID, + cid: CID, + cas: CASClient, + logger: DiagnosticsLogger, + eventHandler: AnchorLoopHandler + ) => { + return cas.getStatusForRequest(streamId, cid).catch(async (error) => { + logger.warn(`No request present on CAS for ${cid} of ${streamId}: ${error}`) + const requestCAR = await eventHandler.buildRequestCar(streamId, cid) + return cas.create(new AnchorRequestCarFileReader(requestCAR)) + }) + }, + CAS_REQUEST_POLLING_INTERVAL_MS, // Set the maximum frequency of function execution + { trailing: false } // Execute only at the leading edge of the interval + ) + intervalSubscription: any + + // This method dynamically adjusts the polling interval based on the current rate of create requests. + // It calculates a new interval using a square root function to moderate the change rate, ensuring the interval + // remains within predefined maximum and minimum bounds. The adjusted interval is then applied to throttle + // the `throttledGetStatusForRequest` function, which controls the frequency of status checks and request submissions + // to the CAS, enhancing system responsiveness and stability. + private adjustPollingInterval(): void { + const maxInterval = 200 // maximum interval in ms + const minInterval = 5 // minimum interval in ms + if (this.#cas instanceof RemoteCAS) { + const currentRate = this.#cas.getCreateRequestRate() + let newInterval = 1000 / Math.sqrt(currentRate + 1) + newInterval = Math.min(Math.max(newInterval, minInterval), maxInterval) + + this.throttledGetStatusForRequest = throttle(this.throttledGetStatusForRequest, newInterval, { + trailing: false, + }) + } else { + // Handle the case where #cas is not an instance of RemoteCAS + console.warn('CAS client does not support dynamic rate adjustment.') + // Using minimum interval + this.throttledGetStatusForRequest = throttle(this.throttledGetStatusForRequest, minInterval, { + trailing: false, + }) + } + } + constructor( batchSize: number, cas: CASClient, @@ -42,7 +102,7 @@ export class AnchorProcessingLoop { 'anchorRequestAge', METRICS_REPORTING_INTERVAL_MS ) - + this.#cas = cas const concurrency = Number(process.env.CERAMIC_ANCHOR_POLLING_CONCURRENCY) || DEFAULT_CONCURRENCY this.#loop = new ProcessingLoop( @@ -55,11 +115,13 @@ export class AnchorProcessingLoop { `Loading pending anchor metadata for Stream ${streamId} from AnchorRequestStore` ) const entry = await store.load(streamId) - const event = await cas.getStatusForRequest(streamId, entry.cid).catch(async (error) => { - logger.warn(`No request present on CAS for ${entry.cid} of ${streamId}: ${error}`) - const requestCAR = await eventHandler.buildRequestCar(streamId, entry.cid) - return cas.create(new AnchorRequestCarFileReader(requestCAR)) - }) + const event = await this.throttledGetStatusForRequest( + streamId, + entry.cid, + this.#cas, + logger, + eventHandler + ) const isTerminal = await eventHandler.handle(event) logger.verbose( `Anchor event with status ${event.status} for commit CID ${entry.cid} of Stream ${streamId} handled successfully` @@ -102,6 +164,17 @@ export class AnchorProcessingLoop { start(): void { this.#anchorPollingMetrics.startPublishingStats() void this.#loop.start() + + // Set up an interval to adjust the polling interval every 10 minutes (600000 milliseconds) + const subscription = interval(600000) + .pipe( + startWith(0) // to start immediately + ) + .subscribe(() => { + this.adjustPollingInterval() + }) + + this.intervalSubscription = subscription } /** @@ -109,6 +182,7 @@ export class AnchorProcessingLoop { */ async stop(): Promise { this.#anchorPollingMetrics.stopPublishingStats() + this.intervalSubscription?.unsubscribe() return this.#loop.stop() } } diff --git a/packages/core/src/anchor/anchor-service.ts b/packages/core/src/anchor/anchor-service.ts index 0dc1568ed3..c7d77badc9 100644 --- a/packages/core/src/anchor/anchor-service.ts +++ b/packages/core/src/anchor/anchor-service.ts @@ -139,6 +139,13 @@ export interface CASClient { * Abort any fetch requests to CAS. */ close(): Promise + + /** + * Calculates the rate of anchor request creations over the last 15 minutes. + * This rate is computed as the total number of requests created divided by 15, + * providing the average number of requests per minute within this time frame. + */ + getCreateRequestRate?(): number } export class NotSingleChainError extends Error { diff --git a/packages/core/src/anchor/ethereum/remote-cas.ts b/packages/core/src/anchor/ethereum/remote-cas.ts index e3ec969b5c..d0427083a5 100644 --- a/packages/core/src/anchor/ethereum/remote-cas.ts +++ b/packages/core/src/anchor/ethereum/remote-cas.ts @@ -69,6 +69,7 @@ export class RemoteCAS implements CASClient { // does not cause this counter to increment. #numFailedRequests: number #firstFailedRequestDate: Date | null + private createRequestTimestamps: number[] = [] constructor(logger: DiagnosticsLogger, anchorServiceUrl: string, sendRequest: FetchRequest) { this.#logger = logger @@ -140,6 +141,7 @@ export class RemoteCAS implements CASClient { */ async create(carFileReader: AnchorRequestCarFileReader): Promise { const response = await firstValueFrom(this.create$(carFileReader)) + this.incrementCreateRequestCount() return parseResponse(carFileReader.streamId, carFileReader.tip, response) } @@ -207,6 +209,23 @@ export class RemoteCAS implements CASClient { return parseResponse(streamId, tip, response) } + private incrementCreateRequestCount(): void { + this.createRequestTimestamps.push(Date.now()) + } + + public getCreateRequestRate(): number { + const currentTime = Date.now() + const oneMinuteAgo = currentTime - 600000 // 1000 * 60 * 60 * 10 + + const recentTimestamps = this.createRequestTimestamps.filter( + (timestamp) => timestamp > oneMinuteAgo + ) + this.createRequestTimestamps = recentTimestamps // Update the array to only hold recent timestamps + + // get the average of the rates in the last 15 minutes + return recentTimestamps.length / 10 + } + async close() { this.#stopSignal.next() this.#stopSignal.complete()