Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Merge ExecutionQueue with LoadingQueue #2990

Draft
wants to merge 3 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 1 addition & 7 deletions packages/core/src/ceramic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,6 @@ import { AnchorRequestStore } from './store/anchor-request-store.js'
import { AnchorResumingService } from './state-management/anchor-resuming-service.js'
import { ProvidersCache } from './providers-cache.js'
import crypto from 'crypto'
import { AnchorTimestampExtractor } from './stream-loading/anchor-timestamp-extractor.js'
import { TipFetcher } from './stream-loading/tip-fetcher.js'
import { LogSyncer } from './stream-loading/log-syncer.js'
import { StateManipulator } from './stream-loading/state-manipulator.js'
import { StreamLoader } from './stream-loading/stream-loader.js'
import {
networkOptionsByName,
type CeramicNetworkOptions,
Expand All @@ -57,12 +52,11 @@ import {
makeAnchorService,
makeEthereumRpcUrl,
} from './initialization/anchoring.js'
import { StreamUpdater } from './stream-loading/stream-updater.js'
import type { AnchorService } from './anchor/anchor-service.js'
import { AnchorRequestCarBuilder } from './anchor/anchor-request-car-builder.js'
import { makeStreamLoaderAndUpdater } from './initialization/stream-loading.js'

const DEFAULT_CACHE_LIMIT = 500 // number of streams stored in the cache
const DEFAULT_CACHE_LIMIT = 1000 // number of streams stored in the cache
const DEFAULT_QPS_LIMIT = 10 // Max number of pubsub query messages that can be published per second without rate limiting
const TESTING = process.env.NODE_ENV == 'test'

Expand Down
18 changes: 5 additions & 13 deletions packages/core/src/state-management/repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,10 @@ function commitAtTime(state: StreamState, timestamp: number): CommitID {

export class Repository {
/**
* Serialize loading operations per streamId.
* Serialize operations on stream state per streamId.
*/
readonly loadingQ: ExecutionQueue

/**
* Serialize operations on state per streamId.
*/
readonly executionQ: ExecutionQueue

/**
* In-memory cache of the currently running streams.
*/
Expand Down Expand Up @@ -127,7 +122,6 @@ export class Repository {
private readonly logger: DiagnosticsLogger
) {
this.loadingQ = new ExecutionQueue('loading', concurrencyLimit, logger)
this.executionQ = new ExecutionQueue('execution', concurrencyLimit, logger)
this.inmemory = new StateCache(cacheLimit, (state$) => {
if (state$.subscriptionSet.size > 0) {
logger.debug(`Stream ${state$.id} evicted from cache while having subscriptions`)
Expand Down Expand Up @@ -411,7 +405,7 @@ export class Repository {
commitId: CommitID,
existingState$: RunningState
): Promise<SnapshotState> {
return this.executionQ.forStream(commitId).run(async () => {
return this.loadingQ.forStream(commitId).run(async () => {
const stateAtCommit = await this.streamLoader.stateAtCommit(existingState$.state, commitId)

// Since we skipped CACAO expiration checking earlier we need to make sure to do it here.
Expand Down Expand Up @@ -455,7 +449,7 @@ export class Repository {
const state$ = await this.load(streamId, opts)
this.logger.verbose(`Repository loaded state for stream ${streamId.toString()}`)

return this.executionQ.forStream(streamId).run(async () => {
return this.loadingQ.forStream(streamId).run(async () => {
const originalState = state$.state
const updatedState = await this.streamUpdater.applyCommitFromUser(originalState, commit)
if (StreamUtils.tipFromState(updatedState).equals(StreamUtils.tipFromState(originalState))) {
Expand Down Expand Up @@ -496,13 +490,12 @@ export class Repository {

/**
* Applies the given tip CID as a new commit to the given running state.
* NOTE: Must be called from inside the ExecutionQueue!
* @param state$ - State to apply tip to
* @param cid - tip CID
* @returns boolean - whether or not the tip was actually applied
*/
private async _handleTip(state$: RunningState, cid: CID): Promise<boolean> {
return this.executionQ.forStream(state$.id).run(async () => {
return this.loadingQ.forStream(state$.id).run(async () => {
this.logger.verbose(`Learned of new tip ${cid} for stream ${state$.id}`)
const next = await this.streamUpdater.applyTipFromNetwork(state$.state, cid)
if (next) {
Expand Down Expand Up @@ -670,7 +663,7 @@ export class Repository {

/**
* Takes the CID of an anchor commit received from an anchor service and applies it. Runs the
* work of loading and applying the commit on the execution queue so it gets serialized alongside
* work of loading and applying the commit on the loading queue so it gets serialized alongside
* any other updates to the same stream. Includes logic to retry up to a total of 3 attempts to
* handle transient failures of loading the anchor commit from IPFS.
*
Expand Down Expand Up @@ -973,7 +966,6 @@ export class Repository {

async close(): Promise<void> {
await this.loadingQ.close()
await this.executionQ.close()
Array.from(this.inmemory).forEach(([id, stream]) => {
this.inmemory.delete(id)
stream.complete()
Expand Down