diff --git a/packages/core/src/ceramic.ts b/packages/core/src/ceramic.ts index bc3a799ccd..145dfb4929 100644 --- a/packages/core/src/ceramic.ts +++ b/packages/core/src/ceramic.ts @@ -43,11 +43,6 @@ import { AnchorRequestStore } from './store/anchor-request-store.js' import { AnchorResumingService } from './state-management/anchor-resuming-service.js' import { ProvidersCache } from './providers-cache.js' import crypto from 'crypto' -import { AnchorTimestampExtractor } from './stream-loading/anchor-timestamp-extractor.js' -import { TipFetcher } from './stream-loading/tip-fetcher.js' -import { LogSyncer } from './stream-loading/log-syncer.js' -import { StateManipulator } from './stream-loading/state-manipulator.js' -import { StreamLoader } from './stream-loading/stream-loader.js' import { networkOptionsByName, type CeramicNetworkOptions, @@ -57,12 +52,11 @@ import { makeAnchorService, makeEthereumRpcUrl, } from './initialization/anchoring.js' -import { StreamUpdater } from './stream-loading/stream-updater.js' import type { AnchorService } from './anchor/anchor-service.js' import { AnchorRequestCarBuilder } from './anchor/anchor-request-car-builder.js' import { makeStreamLoaderAndUpdater } from './initialization/stream-loading.js' -const DEFAULT_CACHE_LIMIT = 500 // number of streams stored in the cache +const DEFAULT_CACHE_LIMIT = 1000 // number of streams stored in the cache const DEFAULT_QPS_LIMIT = 10 // Max number of pubsub query messages that can be published per second without rate limiting const TESTING = process.env.NODE_ENV == 'test' diff --git a/packages/core/src/state-management/repository.ts b/packages/core/src/state-management/repository.ts index de75e04cc8..0769be4517 100644 --- a/packages/core/src/state-management/repository.ts +++ b/packages/core/src/state-management/repository.ts @@ -87,15 +87,10 @@ function commitAtTime(state: StreamState, timestamp: number): CommitID { export class Repository { /** - * Serialize loading operations per streamId. + * Serialize operations on stream state per streamId. */ readonly loadingQ: ExecutionQueue - /** - * Serialize operations on state per streamId. - */ - readonly executionQ: ExecutionQueue - /** * In-memory cache of the currently running streams. */ @@ -127,7 +122,6 @@ export class Repository { private readonly logger: DiagnosticsLogger ) { this.loadingQ = new ExecutionQueue('loading', concurrencyLimit, logger) - this.executionQ = new ExecutionQueue('execution', concurrencyLimit, logger) this.inmemory = new StateCache(cacheLimit, (state$) => { if (state$.subscriptionSet.size > 0) { logger.debug(`Stream ${state$.id} evicted from cache while having subscriptions`) @@ -411,7 +405,7 @@ export class Repository { commitId: CommitID, existingState$: RunningState ): Promise { - return this.executionQ.forStream(commitId).run(async () => { + return this.loadingQ.forStream(commitId).run(async () => { const stateAtCommit = await this.streamLoader.stateAtCommit(existingState$.state, commitId) // Since we skipped CACAO expiration checking earlier we need to make sure to do it here. @@ -455,7 +449,7 @@ export class Repository { const state$ = await this.load(streamId, opts) this.logger.verbose(`Repository loaded state for stream ${streamId.toString()}`) - return this.executionQ.forStream(streamId).run(async () => { + return this.loadingQ.forStream(streamId).run(async () => { const originalState = state$.state const updatedState = await this.streamUpdater.applyCommitFromUser(originalState, commit) if (StreamUtils.tipFromState(updatedState).equals(StreamUtils.tipFromState(originalState))) { @@ -496,13 +490,12 @@ export class Repository { /** * Applies the given tip CID as a new commit to the given running state. - * NOTE: Must be called from inside the ExecutionQueue! * @param state$ - State to apply tip to * @param cid - tip CID * @returns boolean - whether or not the tip was actually applied */ private async _handleTip(state$: RunningState, cid: CID): Promise { - return this.executionQ.forStream(state$.id).run(async () => { + return this.loadingQ.forStream(state$.id).run(async () => { this.logger.verbose(`Learned of new tip ${cid} for stream ${state$.id}`) const next = await this.streamUpdater.applyTipFromNetwork(state$.state, cid) if (next) { @@ -670,7 +663,7 @@ export class Repository { /** * Takes the CID of an anchor commit received from an anchor service and applies it. Runs the - * work of loading and applying the commit on the execution queue so it gets serialized alongside + * work of loading and applying the commit on the loading queue so it gets serialized alongside * any other updates to the same stream. Includes logic to retry up to a total of 3 attempts to * handle transient failures of loading the anchor commit from IPFS. * @@ -973,7 +966,6 @@ export class Repository { async close(): Promise { await this.loadingQ.close() - await this.executionQ.close() Array.from(this.inmemory).forEach(([id, stream]) => { this.inmemory.delete(id) stream.complete()