diff --git a/.changeset/collection-indexes.md b/.changeset/collection-indexes.md new file mode 100644 index 00000000..f38a7a97 --- /dev/null +++ b/.changeset/collection-indexes.md @@ -0,0 +1,7 @@ +--- +"@tanstack/db": patch +--- + +Add collection index system for optimized queries and subscriptions + +This release introduces a comprehensive index system for collections that enables fast lookups and query optimization: diff --git a/packages/db/src/collection.ts b/packages/db/src/collection.ts index 9b5308a7..d4ea9b6b 100644 --- a/packages/db/src/collection.ts +++ b/packages/db/src/collection.ts @@ -1,5 +1,13 @@ import { withArrayChangeTracking, withChangeTracking } from "./proxy" import { SortedMap } from "./SortedMap" +import { + createSingleRowRefProxy, + toExpression, +} from "./query/builder/ref-proxy" +import { compileSingleRowExpression } from "./query/compiler/evaluators.js" +import { OrderedIndex } from "./indexes/ordered-index.js" +import { IndexProxy, LazyIndexWrapper } from "./indexes/lazy-index.js" +import { optimizeExpressionWithIndexes } from "./utils/index-optimization.js" import { createTransaction, getActiveTransaction } from "./transactions" import type { Transaction } from "./transactions" import type { StandardSchemaV1 } from "@standard-schema/spec" @@ -8,6 +16,7 @@ import type { ChangeMessage, CollectionConfig, CollectionStatus, + CurrentStateAsChangesOptions, Fn, InsertConfig, OperationConfig, @@ -16,10 +25,14 @@ import type { ResolveInsertInput, ResolveType, StandardSchema, + SubscribeChangesOptions, Transaction as TransactionType, TransactionWithMutations, UtilsRecord, } from "./types" +import type { IndexOptions } from "./indexes/index-options.js" +import type { BaseIndex, IndexResolver } from "./indexes/base-index.js" +import type { SingleRowRefProxy } from "./query/builder/ref-proxy" // Store collections in memory export const collectionsStore = new Map>() @@ -214,6 +227,12 @@ export class CollectionImpl< // Cached size for performance private _size = 0 + // Index storage + private lazyIndexes = new Map>() + private resolvedIndexes = new Map>() + private isIndexesResolved = false + private indexCounter = 0 + // Event system private changeListeners = new Set>() private changeKeyListeners = new Map>>() @@ -373,6 +392,14 @@ export class CollectionImpl< private setStatus(newStatus: CollectionStatus): void { this.validateStatusTransition(this._status, newStatus) this._status = newStatus + + // Resolve indexes when collection becomes ready + if (newStatus === `ready` && !this.isIndexesResolved) { + // Resolve indexes asynchronously without blocking + this.resolveAllIndexes().catch((error) => { + console.warn(`Failed to resolve indexes:`, error) + }) + } } /** @@ -772,8 +799,16 @@ export class CollectionImpl< return true }) + // Update indexes for the filtered events + if (filteredEvents.length > 0) { + this.updateIndexes(filteredEvents) + } this.emitEvents(filteredEvents) } else { + // Update indexes for all events + if (filteredEventsBySyncStatus.length > 0) { + this.updateIndexes(filteredEventsBySyncStatus) + } // Emit all events if no pending sync transactions this.emitEvents(filteredEventsBySyncStatus) } @@ -1217,6 +1252,11 @@ export class CollectionImpl< // Update cached size after synced data changes this._size = this.calculateSize() + // Update indexes for all events before emitting + if (events.length > 0) { + this.updateIndexes(events) + } + // End batching and emit all events (combines any batched events with sync events) this.emitEvents(events, true) @@ -1263,6 +1303,157 @@ export class CollectionImpl< return `KEY::${this.id}/${key}` } + /** + * Creates an index on a collection for faster queries. + * Indexes significantly improve query performance by allowing binary search + * and range queries instead of full scans. + * + * @template TResolver - The type of the index resolver (constructor or async loader) + * @param indexCallback - Function that extracts the indexed value from each item + * @param config - Configuration including index type and type-specific options + * @returns An index proxy that provides access to the index when ready + * + * @example + * // Create a default ordered index + * const ageIndex = collection.createIndex((row) => row.age) + * + * // Create a ordered index with custom options + * const ageIndex = collection.createIndex((row) => row.age, { + * indexType: OrderedIndex, + * options: { compareFn: customComparator }, + * name: 'age_btree' + * }) + * + * // Create an async-loaded index + * const textIndex = collection.createIndex((row) => row.content, { + * indexType: async () => { + * const { FullTextIndex } = await import('./indexes/fulltext.js') + * return FullTextIndex + * }, + * options: { language: 'en' } + * }) + */ + public createIndex< + TResolver extends IndexResolver = typeof OrderedIndex, + >( + indexCallback: (row: SingleRowRefProxy) => any, + config: IndexOptions = {} + ): IndexProxy { + this.validateCollectionUsable(`createIndex`) + + const indexId = `${++this.indexCounter}` + const singleRowRefProxy = createSingleRowRefProxy() + const indexExpression = indexCallback(singleRowRefProxy) + const expression = toExpression(indexExpression) + + // Default to OrderedIndex if no type specified + const resolver = config.indexType ?? (OrderedIndex as unknown as TResolver) + + // Create lazy wrapper + const lazyIndex = new LazyIndexWrapper( + indexId, + expression, + config.name, + resolver, + config.options, + this.entries() + ) + + this.lazyIndexes.set(indexId, lazyIndex) + + // For synchronous constructors (classes), resolve immediately + // For async loaders, wait for collection to be ready + if (typeof resolver === `function` && resolver.prototype) { + // This is a constructor - resolve immediately and synchronously + try { + const resolvedIndex = lazyIndex.getResolved() // This should work since constructor resolved it + this.resolvedIndexes.set(indexId, resolvedIndex) + } catch { + // Fallback to async resolution + this.resolveSingleIndex(indexId, lazyIndex).catch((error) => { + console.warn(`Failed to resolve single index:`, error) + }) + } + } else if (this.isIndexesResolved) { + // Async loader but indexes are already resolved - resolve this one + this.resolveSingleIndex(indexId, lazyIndex).catch((error) => { + console.warn(`Failed to resolve single index:`, error) + }) + } + + return new IndexProxy(indexId, lazyIndex) + } + + /** + * Resolve all lazy indexes (called when collection first syncs) + * @private + */ + private async resolveAllIndexes(): Promise { + if (this.isIndexesResolved) return + + const resolutionPromises = Array.from(this.lazyIndexes.entries()).map( + async ([indexId, lazyIndex]) => { + const resolvedIndex = await lazyIndex.resolve() + + // Build index with current data + resolvedIndex.build(this.entries()) + + this.resolvedIndexes.set(indexId, resolvedIndex) + return { indexId, resolvedIndex } + } + ) + + await Promise.all(resolutionPromises) + this.isIndexesResolved = true + } + + /** + * Resolve a single index immediately + * @private + */ + private async resolveSingleIndex( + indexId: string, + lazyIndex: LazyIndexWrapper + ): Promise> { + const resolvedIndex = await lazyIndex.resolve() + resolvedIndex.build(this.entries()) + this.resolvedIndexes.set(indexId, resolvedIndex) + return resolvedIndex + } + + /** + * Get resolved indexes for query optimization + */ + get indexes(): Map> { + return this.resolvedIndexes + } + + /** + * Updates all indexes when the collection changes + * @private + */ + private updateIndexes(changes: Array>): void { + for (const index of this.resolvedIndexes.values()) { + for (const change of changes) { + switch (change.type) { + case `insert`: + index.add(change.key, change.value) + break + case `update`: + if (change.previousValue) { + index.update(change.key, change.previousValue, change.value) + } else { + index.add(change.key, change.value) + } + break + case `delete`: + index.remove(change.key, change.value) + break + } + } + } + } + private deepEqual(a: any, b: any): boolean { if (a === b) return true if (a == null || b == null) return false @@ -1903,20 +2094,142 @@ export class CollectionImpl< /** * Returns the current state of the collection as an array of changes + * @param options - Options including optional where filter * @returns An array of changes + * @example + * // Get all items as changes + * const allChanges = collection.currentStateAsChanges() + * + * // Get only items matching a condition + * const activeChanges = collection.currentStateAsChanges({ + * where: (row) => row.status === 'active' + * }) */ - public currentStateAsChanges(): Array> { - return Array.from(this.entries()).map(([key, value]) => ({ - type: `insert`, - key, - value, - })) + public currentStateAsChanges( + options: CurrentStateAsChangesOptions = {} + ): Array> { + if (!options.where) { + // No filtering, return all items + const result: Array> = [] + for (const [key, value] of this.entries()) { + result.push({ + type: `insert`, + key, + value, + }) + } + return result + } + + // There's a where clause, let's see if we can use an index + const result: Array> = [] + + try { + // Create the single-row refProxy for the callback + const singleRowRefProxy = createSingleRowRefProxy() + + // Execute the callback to get the expression + const whereExpression = options.where(singleRowRefProxy) + + // Convert the result to a BasicExpression + const expression = toExpression(whereExpression) + + // Try to optimize the query using indexes + const optimizationResult = optimizeExpressionWithIndexes( + expression, + this.indexes + ) + + if (optimizationResult.canOptimize) { + // Use index optimization + for (const key of optimizationResult.matchingKeys) { + const value = this.get(key) + if (value !== undefined) { + result.push({ + type: `insert`, + key, + value, + }) + } + } + } else { + // No index found or complex expression, fall back to full scan with filter + const filterFn = this.createFilterFunction(options.where) + + for (const [key, value] of this.entries()) { + if (filterFn(value)) { + result.push({ + type: `insert`, + key, + value, + }) + } + } + } + } catch (error) { + // If anything goes wrong with the where clause, fall back to full scan + console.warn( + `Error processing where clause, falling back to full scan:`, + error + ) + + const filterFn = this.createFilterFunction(options.where) + + for (const [key, value] of this.entries()) { + if (filterFn(value)) { + result.push({ + type: `insert`, + key, + value, + }) + } + } + } + + return result + } + + /** + * Creates a filter function from a where callback + * @private + */ + private createFilterFunction( + whereCallback: (row: SingleRowRefProxy) => any + ): (item: T) => boolean { + return (item: T): boolean => { + try { + // First try the RefProxy approach for query builder functions + const singleRowRefProxy = createSingleRowRefProxy() + const whereExpression = whereCallback(singleRowRefProxy) + const expression = toExpression(whereExpression) + const evaluator = compileSingleRowExpression(expression) + const result = evaluator(item as Record) + // WHERE clauses should always evaluate to boolean predicates (Kevin's feedback) + return result + } catch { + // If RefProxy approach fails (e.g., arithmetic operations), fall back to direct evaluation + try { + // Create a simple proxy that returns actual values for arithmetic operations + const simpleProxy = new Proxy(item as any, { + get(target, prop) { + return target[prop] + }, + }) as SingleRowRefProxy + + const result = whereCallback(simpleProxy) + return result + } catch { + // If both approaches fail, exclude the item + return false + } + } + } } /** * Subscribe to changes in the collection * @param callback - Function called when items change - * @param options.includeInitialState - If true, immediately calls callback with current data + * @param options - Subscription options including includeInitialState and where filter * @returns Unsubscribe function - Call this to stop listening for changes * @example * // Basic subscription @@ -1933,28 +2246,104 @@ export class CollectionImpl< * const unsubscribe = collection.subscribeChanges((changes) => { * updateUI(changes) * }, { includeInitialState: true }) + * + * @example + * // Subscribe only to changes matching a condition + * const unsubscribe = collection.subscribeChanges((changes) => { + * updateUI(changes) + * }, { + * includeInitialState: true, + * where: (row) => row.status === 'active' + * }) */ public subscribeChanges( callback: (changes: Array>) => void, - { includeInitialState = false }: { includeInitialState?: boolean } = {} + options: SubscribeChangesOptions = {} ): () => void { // Start sync and track subscriber this.addSubscriber() - if (includeInitialState) { - // First send the current state as changes - callback(this.currentStateAsChanges()) + // Create a filtered callback if where clause is provided + const filteredCallback = options.where + ? this.createFilteredCallback(callback, options.where) + : callback + + if (options.includeInitialState) { + // First send the current state as changes (filtered if needed) + const initialChanges = this.currentStateAsChanges({ + where: options.where, + }) + filteredCallback(initialChanges) } // Add to batched listeners - this.changeListeners.add(callback) + this.changeListeners.add(filteredCallback) return () => { - this.changeListeners.delete(callback) + this.changeListeners.delete(filteredCallback) this.removeSubscriber() } } + /** + * Creates a filtered callback that only calls the original callback with changes that match the where clause + * @private + */ + private createFilteredCallback( + originalCallback: (changes: Array>) => void, + whereCallback: (row: SingleRowRefProxy) => any + ): (changes: Array>) => void { + const filterFn = this.createFilterFunction(whereCallback) + + return (changes: Array>) => { + const filteredChanges: Array> = [] + + for (const change of changes) { + if (change.type === `insert`) { + // For inserts, check if the new value matches the filter + if (filterFn(change.value)) { + filteredChanges.push(change) + } + } else if (change.type === `update`) { + // For updates, we need to check both old and new values + const newValueMatches = filterFn(change.value) + const oldValueMatches = change.previousValue + ? filterFn(change.previousValue) + : false + + if (newValueMatches && oldValueMatches) { + // Both old and new match: emit update + filteredChanges.push(change) + } else if (newValueMatches && !oldValueMatches) { + // New matches but old didn't: emit insert + filteredChanges.push({ + ...change, + type: `insert`, + }) + } else if (!newValueMatches && oldValueMatches) { + // Old matched but new doesn't: emit delete + filteredChanges.push({ + ...change, + type: `delete`, + value: change.previousValue!, // Use the previous value for the delete + }) + } + // If neither matches, don't emit anything + } else { + // For deletes, include if the previous value would have matched + // (so subscribers know something they were tracking was deleted) + if (filterFn(change.value)) { + filteredChanges.push(change) + } + } + } + + if (filteredChanges.length > 0) { + originalCallback(filteredChanges) + } + } + } + /** * Subscribe to changes for a specific key */ diff --git a/packages/db/src/index.ts b/packages/db/src/index.ts index 3e74ba5d..145bd87b 100644 --- a/packages/db/src/index.ts +++ b/packages/db/src/index.ts @@ -10,5 +10,11 @@ export * from "./optimistic-action" export * from "./local-only" export * from "./local-storage" +// Index system exports +export * from "./indexes/base-index.js" +export * from "./indexes/ordered-index.js" +export * from "./indexes/lazy-index.js" +export { type IndexOptions } from "./indexes/index-options.js" + // Re-export some stuff explicitly to ensure the type & value is exported export type { Collection } from "./collection" diff --git a/packages/db/src/indexes/base-index.ts b/packages/db/src/indexes/base-index.ts new file mode 100644 index 00000000..ddb351bb --- /dev/null +++ b/packages/db/src/indexes/base-index.ts @@ -0,0 +1,119 @@ +import { compileSingleRowExpression } from "../query/compiler/evaluators.js" +import { comparisonFunctions } from "../query/builder/functions.js" +import type { BasicExpression } from "../query/ir.js" + +/** + * Operations that indexes can support, imported from available comparison functions + */ +export const IndexOperation = comparisonFunctions + +/** + * Type for index operation values + */ +export type IndexOperation = (typeof comparisonFunctions)[number] + +/** + * Statistics about index usage and performance + */ +export interface IndexStats { + readonly entryCount: number + readonly lookupCount: number + readonly averageLookupTime: number + readonly lastUpdated: Date +} + +/** + * Base abstract class that all index types extend + */ +export abstract class BaseIndex< + TKey extends string | number = string | number, +> { + public readonly id: string + public readonly name?: string + public readonly expression: BasicExpression + public abstract readonly supportedOperations: Set + + protected lookupCount = 0 + protected totalLookupTime = 0 + protected lastUpdated = new Date() + + constructor( + id: string, + expression: BasicExpression, + name?: string, + options?: any + ) { + this.id = id + this.expression = expression + this.name = name + this.initialize(options) + } + + // Abstract methods that each index type must implement + abstract add(key: TKey, item: any): void + abstract remove(key: TKey, item: any): void + abstract update(key: TKey, oldItem: any, newItem: any): void + abstract build(entries: Iterable<[TKey, any]>): void + abstract clear(): void + abstract lookup(operation: IndexOperation, value: any): Set + abstract get keyCount(): number + + // Common methods + supports(operation: IndexOperation): boolean { + return this.supportedOperations.has(operation) + } + + matchesField(fieldPath: Array): boolean { + return ( + this.expression.type === `ref` && + this.expression.path.length === fieldPath.length && + this.expression.path.every((part, i) => part === fieldPath[i]) + ) + } + + getStats(): IndexStats { + return { + entryCount: this.keyCount, + lookupCount: this.lookupCount, + averageLookupTime: + this.lookupCount > 0 ? this.totalLookupTime / this.lookupCount : 0, + lastUpdated: this.lastUpdated, + } + } + + // Protected methods for subclasses + protected abstract initialize(options?: any): void + + protected evaluateIndexExpression(item: any): any { + const evaluator = compileSingleRowExpression(this.expression) + return evaluator(item as Record) + } + + protected trackLookup(startTime: number): void { + const duration = performance.now() - startTime + this.lookupCount++ + this.totalLookupTime += duration + } + + protected updateTimestamp(): void { + this.lastUpdated = new Date() + } +} + +/** + * Type for index constructor + */ +export type IndexConstructor = + new ( + id: string, + expression: BasicExpression, + name?: string, + options?: any + ) => BaseIndex + +/** + * Index resolver can be either a class constructor or async loader + */ +export type IndexResolver = + | IndexConstructor + | (() => Promise>) diff --git a/packages/db/src/indexes/index-options.ts b/packages/db/src/indexes/index-options.ts new file mode 100644 index 00000000..e60b41dd --- /dev/null +++ b/packages/db/src/indexes/index-options.ts @@ -0,0 +1,42 @@ +import type { IndexConstructor, IndexResolver } from "./base-index.js" + +/** + * Enhanced index options that support both sync and async resolvers + */ +export interface IndexOptions { + name?: string + indexType?: TResolver + options?: TResolver extends IndexConstructor + ? TResolver extends new ( + id: string, + expr: any, + name?: string, + options?: infer O + ) => any + ? O + : never + : TResolver extends () => Promise + ? TCtor extends new ( + id: string, + expr: any, + name?: string, + options?: infer O + ) => any + ? O + : never + : never +} + +/** + * Utility type to extract the constructed index type from a resolver + */ +export type ResolvedIndexType = + TResolver extends IndexConstructor + ? InstanceType + : TResolver extends () => Promise> + ? TResolver extends () => Promise + ? TCtor extends IndexConstructor + ? InstanceType + : never + : never + : never diff --git a/packages/db/src/indexes/lazy-index.ts b/packages/db/src/indexes/lazy-index.ts new file mode 100644 index 00000000..f92d6685 --- /dev/null +++ b/packages/db/src/indexes/lazy-index.ts @@ -0,0 +1,251 @@ +import type { + BaseIndex, + IndexConstructor, + IndexResolver, +} from "./base-index.js" +import type { BasicExpression } from "../query/ir.js" + +/** + * Utility to determine if a resolver is a constructor or async loader + */ +function isConstructor( + resolver: IndexResolver +): resolver is IndexConstructor { + // Check if it's a function with a prototype (constructor) + return ( + typeof resolver === `function` && + resolver.prototype !== undefined && + resolver.prototype.constructor === resolver + ) +} + +/** + * Resolve index constructor from resolver + */ +async function resolveIndexConstructor( + resolver: IndexResolver +): Promise> { + if (isConstructor(resolver)) { + return resolver + } else { + // It's an async loader function + return await resolver() + } +} + +/** + * Wrapper that defers index creation until first sync + */ +export class LazyIndexWrapper { + private indexPromise: Promise> | null = null + private resolvedIndex: BaseIndex | null = null + + constructor( + private id: string, + private expression: BasicExpression, + private name: string | undefined, + private resolver: IndexResolver, + private options: any, + private collectionEntries?: Iterable<[TKey, any]> + ) { + // For synchronous constructors, resolve immediately + if (isConstructor(this.resolver)) { + this.resolvedIndex = new this.resolver( + this.id, + this.expression, + this.name, + this.options + ) + // Build with initial data if provided + if (this.collectionEntries) { + this.resolvedIndex.build(this.collectionEntries) + } + } + } + + /** + * Resolve the actual index + */ + async resolve(): Promise> { + if (this.resolvedIndex) { + return this.resolvedIndex + } + + if (!this.indexPromise) { + this.indexPromise = this.createIndex() + } + + this.resolvedIndex = await this.indexPromise + return this.resolvedIndex + } + + /** + * Check if already resolved + */ + isResolved(): boolean { + return this.resolvedIndex !== null + } + + /** + * Get resolved index (throws if not ready) + */ + getResolved(): BaseIndex { + if (!this.resolvedIndex) { + throw new Error( + `Index ${this.id} has not been resolved yet. Ensure collection is synced.` + ) + } + return this.resolvedIndex + } + + /** + * Get the index ID + */ + getId(): string { + return this.id + } + + /** + * Get the index name + */ + getName(): string | undefined { + return this.name + } + + /** + * Get the index expression + */ + getExpression(): BasicExpression { + return this.expression + } + + private async createIndex(): Promise> { + const IndexClass = await resolveIndexConstructor(this.resolver) + return new IndexClass(this.id, this.expression, this.name, this.options) + } +} + +/** + * Proxy that provides synchronous interface while index loads asynchronously + */ +export class IndexProxy { + constructor( + private indexId: string, + private lazyIndex: LazyIndexWrapper + ) {} + + /** + * Get the resolved index (throws if not ready) + */ + get index(): BaseIndex { + return this.lazyIndex.getResolved() + } + + /** + * Check if index is ready + */ + get isReady(): boolean { + return this.lazyIndex.isResolved() + } + + /** + * Wait for index to be ready + */ + async whenReady(): Promise> { + return await this.lazyIndex.resolve() + } + + /** + * Get the index ID + */ + get id(): string { + return this.indexId + } + + /** + * Get the index name (throws if not ready) + */ + get name(): string | undefined { + if (this.isReady) { + return this.index.name + } + return this.lazyIndex.getName() + } + + /** + * Get the index expression (available immediately) + */ + get expression(): BasicExpression { + return this.lazyIndex.getExpression() + } + + /** + * Check if index supports an operation (throws if not ready) + */ + supports(operation: any): boolean { + return this.index.supports(operation) + } + + /** + * Get index statistics (throws if not ready) + */ + getStats() { + return this.index.getStats() + } + + /** + * Check if index matches a field path (available immediately) + */ + matchesField(fieldPath: Array): boolean { + const expr = this.expression + return ( + expr.type === `ref` && + expr.path.length === fieldPath.length && + expr.path.every((part, i) => part === fieldPath[i]) + ) + } + + /** + * Get the key count (throws if not ready) + */ + get keyCount(): number { + return this.index.keyCount + } + + // Test compatibility properties - delegate to resolved index + get indexedKeysSet(): Set { + const resolved = this.index as any + return resolved.indexedKeysSet + } + + get orderedEntriesArray(): Array<[any, Set]> { + const resolved = this.index as any + return resolved.orderedEntriesArray + } + + get valueMapData(): Map> { + const resolved = this.index as any + return resolved.valueMapData + } + + // BTreeIndex compatibility methods + equalityLookup(value: any): Set { + const resolved = this.index as any + return resolved.equalityLookup?.(value) ?? new Set() + } + + rangeQuery(operation: string, value: any): Set { + const resolved = this.index as any + return resolved.rangeQuery?.(operation, value) ?? new Set() + } + + inArrayLookup(values: Array): Set { + const resolved = this.index as any + return resolved.inArrayLookup?.(values) ?? new Set() + } + + // Internal method for the collection to get the lazy wrapper + _getLazyWrapper(): LazyIndexWrapper { + return this.lazyIndex + } +} diff --git a/packages/db/src/indexes/ordered-index.ts b/packages/db/src/indexes/ordered-index.ts new file mode 100644 index 00000000..c9f760ba --- /dev/null +++ b/packages/db/src/indexes/ordered-index.ts @@ -0,0 +1,268 @@ +import { ascComparator } from "../utils/comparison.js" +import { findInsertPosition } from "../utils/array-utils.js" +import { BaseIndex } from "./base-index.js" +import type { IndexOperation } from "./base-index.js" + +/** + * Options for Ordered index + */ +export interface OrderedIndexOptions { + compareFn?: (a: any, b: any) => number +} + +/** + * Ordered index for sorted data with range queries + * This maintains items in sorted order and provides efficient range operations + */ +export class OrderedIndex< + TKey extends string | number = string | number, +> extends BaseIndex { + public readonly supportedOperations = new Set([ + `eq`, + `gt`, + `gte`, + `lt`, + `lte`, + `in`, + ]) + + // Internal data structures - private to hide implementation details + private orderedEntries: Array<[any, Set]> = [] + private valueMap = new Map>() + private indexedKeys = new Set() + private compareFn: (a: any, b: any) => number = ascComparator + + protected initialize(options?: OrderedIndexOptions): void { + this.compareFn = options?.compareFn ?? ascComparator + } + + /** + * Adds a value to the index + */ + add(key: TKey, item: any): void { + let indexedValue: any + try { + indexedValue = this.evaluateIndexExpression(item) + } catch (error) { + console.warn(`Failed to evaluate index expression for key ${key}:`, error) + return + } + + // Check if this value already exists + if (this.valueMap.has(indexedValue)) { + // Add to existing set + this.valueMap.get(indexedValue)!.add(key) + } else { + // Create new set for this value + const keySet = new Set([key]) + this.valueMap.set(indexedValue, keySet) + + // Find correct position in ordered entries using binary search + const insertIndex = findInsertPosition( + this.orderedEntries, + indexedValue, + this.compareFn + ) + this.orderedEntries.splice(insertIndex, 0, [indexedValue, keySet]) + } + + this.indexedKeys.add(key) + this.updateTimestamp() + } + + /** + * Removes a value from the index + */ + remove(key: TKey, item: any): void { + let indexedValue: any + try { + indexedValue = this.evaluateIndexExpression(item) + } catch (error) { + console.warn( + `Failed to evaluate index expression for key ${key} during removal:`, + error + ) + return + } + + if (this.valueMap.has(indexedValue)) { + const keySet = this.valueMap.get(indexedValue)! + keySet.delete(key) + + // If set is now empty, remove the entry entirely + if (keySet.size === 0) { + this.valueMap.delete(indexedValue) + + // Find and remove from ordered entries + const index = this.orderedEntries.findIndex( + ([value]) => this.compareFn(value, indexedValue) === 0 + ) + if (index !== -1) { + this.orderedEntries.splice(index, 1) + } + } + } + + this.indexedKeys.delete(key) + this.updateTimestamp() + } + + /** + * Updates a value in the index + */ + update(key: TKey, oldItem: any, newItem: any): void { + this.remove(key, oldItem) + this.add(key, newItem) + } + + /** + * Builds the index from a collection of entries + */ + build(entries: Iterable<[TKey, any]>): void { + this.clear() + + for (const [key, item] of entries) { + this.add(key, item) + } + } + + /** + * Clears all data from the index + */ + clear(): void { + this.orderedEntries = [] + this.valueMap.clear() + this.indexedKeys.clear() + this.updateTimestamp() + } + + /** + * Performs a lookup operation + */ + lookup(operation: IndexOperation, value: any): Set { + const startTime = performance.now() + + let result: Set + + switch (operation) { + case `eq`: + result = this.equalityLookup(value) + break + case `gt`: + case `gte`: + case `lt`: + case `lte`: + result = this.rangeQuery(operation, value) + break + case `in`: + result = this.inArrayLookup(value) + break + default: + throw new Error(`Operation ${operation} not supported by OrderedIndex`) + } + + this.trackLookup(startTime) + return result + } + + /** + * Gets the number of indexed keys + */ + get keyCount(): number { + return this.indexedKeys.size + } + + // Public methods for backward compatibility (used by tests) + + /** + * Performs an equality lookup + */ + equalityLookup(value: any): Set { + return new Set(this.valueMap.get(value) ?? []) + } + + /** + * Performs a range query + */ + rangeQuery(operation: `gt` | `gte` | `lt` | `lte`, value: any): Set { + const result = new Set() + + // Use binary search to find the starting position + const insertIndex = findInsertPosition( + this.orderedEntries, + value, + this.compareFn + ) + + let startIndex = 0 + let endIndex = this.orderedEntries.length + + switch (operation) { + case `lt`: + endIndex = insertIndex + break + case `lte`: + endIndex = insertIndex + // Include the value if it exists + if ( + insertIndex < this.orderedEntries.length && + this.compareFn(this.orderedEntries[insertIndex]![0], value) === 0 + ) { + endIndex = insertIndex + 1 + } + break + case `gt`: + startIndex = insertIndex + // Skip the value if it exists + if ( + insertIndex < this.orderedEntries.length && + this.compareFn(this.orderedEntries[insertIndex]![0], value) === 0 + ) { + startIndex = insertIndex + 1 + } + endIndex = this.orderedEntries.length + break + case `gte`: + startIndex = insertIndex + endIndex = this.orderedEntries.length + break + } + + // Collect keys from the range + for (let i = startIndex; i < endIndex; i++) { + const keys = this.orderedEntries[i]![1] + keys.forEach((key) => result.add(key)) + } + + return result + } + + /** + * Performs an IN array lookup + */ + inArrayLookup(values: Array): Set { + const result = new Set() + + for (const value of values) { + const keys = this.valueMap.get(value) + if (keys) { + keys.forEach((key) => result.add(key)) + } + } + + return result + } + + // Getter methods for testing compatibility + get indexedKeysSet(): Set { + return this.indexedKeys + } + + get orderedEntriesArray(): Array<[any, Set]> { + return this.orderedEntries + } + + get valueMapData(): Map> { + return this.valueMap + } +} diff --git a/packages/db/src/query/builder/functions.ts b/packages/db/src/query/builder/functions.ts index 9a16318e..b5902bd6 100644 --- a/packages/db/src/query/builder/functions.ts +++ b/packages/db/src/query/builder/functions.ts @@ -265,3 +265,17 @@ export function max( ): Aggregate { return new Aggregate(`max`, [toExpression(arg)]) } + +/** + * List of comparison function names that can be used with indexes + */ +export const comparisonFunctions = [ + `eq`, + `gt`, + `gte`, + `lt`, + `lte`, + `in`, + `like`, + `ilike`, +] as const diff --git a/packages/db/src/query/builder/ref-proxy.ts b/packages/db/src/query/builder/ref-proxy.ts index 87300d23..1fc195f3 100644 --- a/packages/db/src/query/builder/ref-proxy.ts +++ b/packages/db/src/query/builder/ref-proxy.ts @@ -10,6 +10,71 @@ export interface RefProxy { readonly __type: T } +/** + * Type for creating a RefProxy for a single row/type without namespacing + * Used in collection indexes and where clauses + */ +export type SingleRowRefProxy = + T extends Record + ? { + [K in keyof T]: T[K] extends Record + ? SingleRowRefProxy & RefProxy + : RefProxy + } & RefProxy + : RefProxy + +/** + * Creates a proxy object that records property access paths for a single row + * Used in collection indexes and where clauses + */ +export function createSingleRowRefProxy< + T extends Record, +>(): SingleRowRefProxy { + const cache = new Map() + + function createProxy(path: Array): any { + const pathKey = path.join(`.`) + if (cache.has(pathKey)) { + return cache.get(pathKey) + } + + const proxy = new Proxy({} as any, { + get(target, prop, receiver) { + if (prop === `__refProxy`) return true + if (prop === `__path`) return path + if (prop === `__type`) return undefined // Type is only for TypeScript inference + if (typeof prop === `symbol`) return Reflect.get(target, prop, receiver) + + const newPath = [...path, String(prop)] + return createProxy(newPath) + }, + + has(target, prop) { + if (prop === `__refProxy` || prop === `__path` || prop === `__type`) + return true + return Reflect.has(target, prop) + }, + + ownKeys(target) { + return Reflect.ownKeys(target) + }, + + getOwnPropertyDescriptor(target, prop) { + if (prop === `__refProxy` || prop === `__path` || prop === `__type`) { + return { enumerable: false, configurable: true } + } + return Reflect.getOwnPropertyDescriptor(target, prop) + }, + }) + + cache.set(pathKey, proxy) + return proxy + } + + // Return the root proxy that starts with an empty path + return createProxy([]) as SingleRowRefProxy +} + /** * Creates a proxy object that records property access paths * Used in callbacks like where, select, etc. to create type-safe references diff --git a/packages/db/src/query/compiler/evaluators.ts b/packages/db/src/query/compiler/evaluators.ts index d2fcf2ab..86bef532 100644 --- a/packages/db/src/query/compiler/evaluators.ts +++ b/packages/db/src/query/compiler/evaluators.ts @@ -6,11 +6,37 @@ import type { NamespacedRow } from "../../types.js" */ export type CompiledExpression = (namespacedRow: NamespacedRow) => any +/** + * Compiled single-row expression evaluator function type + */ +export type CompiledSingleRowExpression = (item: Record) => any + /** * Compiles an expression into an optimized evaluator function. * This eliminates branching during evaluation by pre-compiling the expression structure. */ export function compileExpression(expr: BasicExpression): CompiledExpression { + const compiledFn = compileExpressionInternal(expr, false) + return compiledFn as CompiledExpression +} + +/** + * Compiles a single-row expression into an optimized evaluator function. + */ +export function compileSingleRowExpression( + expr: BasicExpression +): CompiledSingleRowExpression { + const compiledFn = compileExpressionInternal(expr, true) + return compiledFn as CompiledSingleRowExpression +} + +/** + * Internal unified expression compiler that handles both namespaced and single-row evaluation + */ +function compileExpressionInternal( + expr: BasicExpression, + isSingleRow: boolean +): (data: any) => any { switch (expr.type) { case `val`: { // For constant values, return a function that just returns the value @@ -19,13 +45,13 @@ export function compileExpression(expr: BasicExpression): CompiledExpression { } case `ref`: { - // For references, pre-compile the property path navigation - return compileRef(expr) + // For references, compile based on evaluation mode + return isSingleRow ? compileSingleRowRef(expr) : compileRef(expr) } case `func`: { - // For functions, pre-compile the function and its arguments - return compileFunction(expr) + // For functions, use the unified compiler + return compileFunction(expr, isSingleRow) } default: @@ -75,74 +101,95 @@ function compileRef(ref: PropRef): CompiledExpression { } /** - * Compiles a function expression into an optimized evaluator + * Compiles a reference expression for single-row evaluation + */ +function compileSingleRowRef(ref: PropRef): CompiledSingleRowExpression { + const propertyPath = ref.path + + // This function works for all path lengths including empty path + return (item) => { + let value: any = item + for (const prop of propertyPath) { + if (value == null) { + return value + } + value = value[prop] + } + return value + } +} + +/** + * Compiles a function expression for both namespaced and single-row evaluation */ -function compileFunction(func: Func): CompiledExpression { - // Pre-compile all arguments - const compiledArgs = func.args.map(compileExpression) +function compileFunction(func: Func, isSingleRow: boolean): (data: any) => any { + // Pre-compile all arguments using the appropriate compiler + const compiledArgs = func.args.map((arg) => + compileExpressionInternal(arg, isSingleRow) + ) switch (func.name) { // Comparison operators case `eq`: { const argA = compiledArgs[0]! const argB = compiledArgs[1]! - return (namespacedRow) => { - const a = argA(namespacedRow) - const b = argB(namespacedRow) + return (data) => { + const a = argA(data) + const b = argB(data) return a === b } } case `gt`: { const argA = compiledArgs[0]! const argB = compiledArgs[1]! - return (namespacedRow) => { - const a = argA(namespacedRow) - const b = argB(namespacedRow) + return (data) => { + const a = argA(data) + const b = argB(data) return a > b } } case `gte`: { const argA = compiledArgs[0]! const argB = compiledArgs[1]! - return (namespacedRow) => { - const a = argA(namespacedRow) - const b = argB(namespacedRow) + return (data) => { + const a = argA(data) + const b = argB(data) return a >= b } } case `lt`: { const argA = compiledArgs[0]! const argB = compiledArgs[1]! - return (namespacedRow) => { - const a = argA(namespacedRow) - const b = argB(namespacedRow) + return (data) => { + const a = argA(data) + const b = argB(data) return a < b } } case `lte`: { const argA = compiledArgs[0]! const argB = compiledArgs[1]! - return (namespacedRow) => { - const a = argA(namespacedRow) - const b = argB(namespacedRow) + return (data) => { + const a = argA(data) + const b = argB(data) return a <= b } } // Boolean operators case `and`: - return (namespacedRow) => { + return (data) => { for (const compiledArg of compiledArgs) { - if (!compiledArg(namespacedRow)) { + if (!compiledArg(data)) { return false } } return true } case `or`: - return (namespacedRow) => { + return (data) => { for (const compiledArg of compiledArgs) { - if (compiledArg(namespacedRow)) { + if (compiledArg(data)) { return true } } @@ -150,16 +197,16 @@ function compileFunction(func: Func): CompiledExpression { } case `not`: { const arg = compiledArgs[0]! - return (namespacedRow) => !arg(namespacedRow) + return (data) => !arg(data) } // Array operators case `in`: { const valueEvaluator = compiledArgs[0]! const arrayEvaluator = compiledArgs[1]! - return (namespacedRow) => { - const value = valueEvaluator(namespacedRow) - const array = arrayEvaluator(namespacedRow) + return (data) => { + const value = valueEvaluator(data) + const array = arrayEvaluator(data) if (!Array.isArray(array)) { return false } @@ -171,18 +218,18 @@ function compileFunction(func: Func): CompiledExpression { case `like`: { const valueEvaluator = compiledArgs[0]! const patternEvaluator = compiledArgs[1]! - return (namespacedRow) => { - const value = valueEvaluator(namespacedRow) - const pattern = patternEvaluator(namespacedRow) + return (data) => { + const value = valueEvaluator(data) + const pattern = patternEvaluator(data) return evaluateLike(value, pattern, false) } } case `ilike`: { const valueEvaluator = compiledArgs[0]! const patternEvaluator = compiledArgs[1]! - return (namespacedRow) => { - const value = valueEvaluator(namespacedRow) - const pattern = patternEvaluator(namespacedRow) + return (data) => { + const value = valueEvaluator(data) + const pattern = patternEvaluator(data) return evaluateLike(value, pattern, true) } } @@ -190,22 +237,22 @@ function compileFunction(func: Func): CompiledExpression { // String functions case `upper`: { const arg = compiledArgs[0]! - return (namespacedRow) => { - const value = arg(namespacedRow) + return (data) => { + const value = arg(data) return typeof value === `string` ? value.toUpperCase() : value } } case `lower`: { const arg = compiledArgs[0]! - return (namespacedRow) => { - const value = arg(namespacedRow) + return (data) => { + const value = arg(data) return typeof value === `string` ? value.toLowerCase() : value } } case `length`: { const arg = compiledArgs[0]! - return (namespacedRow) => { - const value = arg(namespacedRow) + return (data) => { + const value = arg(data) if (typeof value === `string`) { return value.length } @@ -216,10 +263,10 @@ function compileFunction(func: Func): CompiledExpression { } } case `concat`: - return (namespacedRow) => { + return (data) => { return compiledArgs .map((evaluator) => { - const arg = evaluator(namespacedRow) + const arg = evaluator(data) try { return String(arg ?? ``) } catch { @@ -233,9 +280,9 @@ function compileFunction(func: Func): CompiledExpression { .join(``) } case `coalesce`: - return (namespacedRow) => { + return (data) => { for (const evaluator of compiledArgs) { - const value = evaluator(namespacedRow) + const value = evaluator(data) if (value !== null && value !== undefined) { return value } @@ -247,36 +294,36 @@ function compileFunction(func: Func): CompiledExpression { case `add`: { const argA = compiledArgs[0]! const argB = compiledArgs[1]! - return (namespacedRow) => { - const a = argA(namespacedRow) - const b = argB(namespacedRow) + return (data) => { + const a = argA(data) + const b = argB(data) return (a ?? 0) + (b ?? 0) } } case `subtract`: { const argA = compiledArgs[0]! const argB = compiledArgs[1]! - return (namespacedRow) => { - const a = argA(namespacedRow) - const b = argB(namespacedRow) + return (data) => { + const a = argA(data) + const b = argB(data) return (a ?? 0) - (b ?? 0) } } case `multiply`: { const argA = compiledArgs[0]! const argB = compiledArgs[1]! - return (namespacedRow) => { - const a = argA(namespacedRow) - const b = argB(namespacedRow) + return (data) => { + const a = argA(data) + const b = argB(data) return (a ?? 0) * (b ?? 0) } } case `divide`: { const argA = compiledArgs[0]! const argB = compiledArgs[1]! - return (namespacedRow) => { - const a = argA(namespacedRow) - const b = argB(namespacedRow) + return (data) => { + const a = argA(data) + const b = argB(data) const divisor = b ?? 0 return divisor !== 0 ? (a ?? 0) / divisor : null } diff --git a/packages/db/src/query/compiler/order-by.ts b/packages/db/src/query/compiler/order-by.ts index 9fb7f9b5..f702bc6f 100644 --- a/packages/db/src/query/compiler/order-by.ts +++ b/packages/db/src/query/compiler/order-by.ts @@ -1,4 +1,5 @@ import { orderByWithFractionalIndex } from "@electric-sql/d2mini" +import { ascComparator, descComparator } from "../../utils/comparison.js" import { compileExpression } from "./evaluators.js" import type { OrderByClause } from "../ir.js" import type { NamespacedAndKeyedStream, NamespacedRow } from "../../types.js" @@ -51,51 +52,6 @@ export function processOrderBy( return null } - // Create comparator functions - const ascComparator = (a: any, b: any): number => { - // Handle null/undefined - if (a == null && b == null) return 0 - if (a == null) return -1 - if (b == null) return 1 - - // if a and b are both strings, compare them based on locale - if (typeof a === `string` && typeof b === `string`) { - return a.localeCompare(b) - } - - // if a and b are both arrays, compare them element by element - if (Array.isArray(a) && Array.isArray(b)) { - for (let i = 0; i < Math.min(a.length, b.length); i++) { - const result = ascComparator(a[i], b[i]) - if (result !== 0) { - return result - } - } - // All elements are equal up to the minimum length - return a.length - b.length - } - - // If both are dates, compare them - if (a instanceof Date && b instanceof Date) { - return a.getTime() - b.getTime() - } - - // If at least one of the values is an object, convert to strings - const bothObjects = typeof a === `object` && typeof b === `object` - const notNull = a !== null && b !== null - if (bothObjects && notNull) { - return a.toString().localeCompare(b.toString()) - } - - if (a < b) return -1 - if (a > b) return 1 - return 0 - } - - const descComparator = (a: unknown, b: unknown): number => { - return ascComparator(b, a) - } - // Create a multi-property comparator that respects the order and direction of each property const makeComparator = () => { return (a: unknown, b: unknown) => { diff --git a/packages/db/src/types.ts b/packages/db/src/types.ts index c122f0b1..5c18b5bc 100644 --- a/packages/db/src/types.ts +++ b/packages/db/src/types.ts @@ -3,6 +3,8 @@ import type { Collection } from "./collection" import type { StandardSchemaV1 } from "@standard-schema/spec" import type { Transaction } from "./transactions" +import type { SingleRowRefProxy } from "./query/builder/ref-proxy" + /** * Helper type to extract the output type from a standard schema * @@ -555,6 +557,28 @@ export type KeyedNamespacedRow = [unknown, NamespacedRow] */ export type NamespacedAndKeyedStream = IStreamBuilder +/** + * Options for subscribing to collection changes + */ +export interface SubscribeChangesOptions< + T extends object = Record, +> { + /** Whether to include the current state as initial changes */ + includeInitialState?: boolean + /** Filter changes using a where expression */ + where?: (row: SingleRowRefProxy) => any +} + +/** + * Options for getting current state as changes + */ +export interface CurrentStateAsChangesOptions< + T extends object = Record, +> { + /** Filter the current state using a where expression */ + where?: (row: SingleRowRefProxy) => any +} + /** * Function type for listening to collection changes * @param changes - Array of change messages describing what happened diff --git a/packages/db/src/utils/array-utils.ts b/packages/db/src/utils/array-utils.ts new file mode 100644 index 00000000..55847d91 --- /dev/null +++ b/packages/db/src/utils/array-utils.ts @@ -0,0 +1,28 @@ +/** + * Finds the correct insert position for a value in a sorted array using binary search + * @param sortedArray The sorted array to search in + * @param value The value to find the position for + * @param compareFn Comparison function to use for ordering + * @returns The index where the value should be inserted to maintain order + */ +export function findInsertPosition( + sortedArray: Array<[T, any]>, + value: T, + compareFn: (a: T, b: T) => number +): number { + let left = 0 + let right = sortedArray.length + + while (left < right) { + const mid = Math.floor((left + right) / 2) + const comparison = compareFn(sortedArray[mid]![0], value) + + if (comparison < 0) { + left = mid + 1 + } else { + right = mid + } + } + + return left +} diff --git a/packages/db/src/utils/comparison.ts b/packages/db/src/utils/comparison.ts new file mode 100644 index 00000000..5a90e524 --- /dev/null +++ b/packages/db/src/utils/comparison.ts @@ -0,0 +1,52 @@ +/** + * Universal comparison function for all data types + * Handles null/undefined, strings, arrays, dates, objects, and primitives + * Always sorts null/undefined values first + */ +export const ascComparator = (a: any, b: any): number => { + // Handle null/undefined + if (a == null && b == null) return 0 + if (a == null) return -1 + if (b == null) return 1 + + // if a and b are both strings, compare them based on locale + if (typeof a === `string` && typeof b === `string`) { + return a.localeCompare(b) + } + + // if a and b are both arrays, compare them element by element + if (Array.isArray(a) && Array.isArray(b)) { + for (let i = 0; i < Math.min(a.length, b.length); i++) { + const result = ascComparator(a[i], b[i]) + if (result !== 0) { + return result + } + } + // All elements are equal up to the minimum length + return a.length - b.length + } + + // If both are dates, compare them + if (a instanceof Date && b instanceof Date) { + return a.getTime() - b.getTime() + } + + // If at least one of the values is an object, convert to strings + const bothObjects = typeof a === `object` && typeof b === `object` + const notNull = a !== null && b !== null + if (bothObjects && notNull) { + return a.toString().localeCompare(b.toString()) + } + + if (a < b) return -1 + if (a > b) return 1 + return 0 +} + +/** + * Descending comparator function for ordering values + * Handles null/undefined as largest values (opposite of ascending) + */ +export const descComparator = (a: unknown, b: unknown): number => { + return ascComparator(b, a) +} diff --git a/packages/db/src/utils/index-optimization.ts b/packages/db/src/utils/index-optimization.ts new file mode 100644 index 00000000..d160eef9 --- /dev/null +++ b/packages/db/src/utils/index-optimization.ts @@ -0,0 +1,409 @@ +/** + * # Index-Based Query Optimization + * + * This module provides utilities for optimizing query expressions by leveraging + * available indexes to quickly find matching keys instead of scanning all data. + * + * This is different from the query structure optimizer in `query/optimizer.ts` + * which rewrites query IR structure. This module focuses on using indexes during + * query execution to speed up data filtering. + * + * ## Key Features: + * - Uses indexes to find matching keys for WHERE conditions + * - Supports AND/OR logic with set operations + * - Handles range queries (eq, gt, gte, lt, lte) + * - Optimizes IN array expressions + */ + +import type { BaseIndex, IndexOperation } from "../indexes/base-index.js" +import type { BasicExpression } from "../query/ir.js" + +/** + * Result of index-based query optimization + */ +export interface OptimizationResult { + canOptimize: boolean + matchingKeys: Set +} + +/** + * Finds an index that matches a given field path + */ +export function findIndexForField( + indexes: Map>, + fieldPath: Array +): BaseIndex | undefined { + for (const index of indexes.values()) { + if (index.matchesField(fieldPath)) { + return index + } + } + return undefined +} + +/** + * Intersects multiple sets (AND logic) + */ +export function intersectSets(sets: Array>): Set { + if (sets.length === 0) return new Set() + if (sets.length === 1) return new Set(sets[0]) + + let result = new Set(sets[0]) + for (let i = 1; i < sets.length; i++) { + const newResult = new Set() + for (const item of result) { + if (sets[i]!.has(item)) { + newResult.add(item) + } + } + result = newResult + } + return result +} + +/** + * Unions multiple sets (OR logic) + */ +export function unionSets(sets: Array>): Set { + const result = new Set() + for (const set of sets) { + for (const item of set) { + result.add(item) + } + } + return result +} + +/** + * Optimizes a query expression using available indexes to find matching keys + */ +export function optimizeExpressionWithIndexes( + expression: BasicExpression, + indexes: Map> +): OptimizationResult { + return optimizeQueryRecursive(expression, indexes) +} + +/** + * Recursively optimizes query expressions + */ +function optimizeQueryRecursive( + expression: BasicExpression, + indexes: Map> +): OptimizationResult { + if (expression.type === `func`) { + switch (expression.name) { + case `eq`: + case `gt`: + case `gte`: + case `lt`: + case `lte`: + return optimizeSimpleComparison(expression, indexes) + + case `and`: + return optimizeAndExpression(expression, indexes) + + case `or`: + return optimizeOrExpression(expression, indexes) + + case `in`: + return optimizeInArrayExpression(expression, indexes) + } + } + + return { canOptimize: false, matchingKeys: new Set() } +} + +/** + * Checks if an expression can be optimized + */ +export function canOptimizeExpression( + expression: BasicExpression, + indexes: Map> +): boolean { + if (expression.type === `func`) { + switch (expression.name) { + case `eq`: + case `gt`: + case `gte`: + case `lt`: + case `lte`: + return canOptimizeSimpleComparison(expression, indexes) + + case `and`: + return canOptimizeAndExpression(expression, indexes) + + case `or`: + return canOptimizeOrExpression(expression, indexes) + + case `in`: + return canOptimizeInArrayExpression(expression, indexes) + } + } + + return false +} + +/** + * Optimizes simple comparison expressions (eq, gt, gte, lt, lte) + */ +function optimizeSimpleComparison( + expression: BasicExpression, + indexes: Map> +): OptimizationResult { + if (expression.type !== `func` || expression.args.length !== 2) { + return { canOptimize: false, matchingKeys: new Set() } + } + + const leftArg = expression.args[0]! + const rightArg = expression.args[1]! + + // Check both directions: field op value AND value op field + let fieldArg: BasicExpression | null = null + let valueArg: BasicExpression | null = null + let operation = expression.name as `eq` | `gt` | `gte` | `lt` | `lte` + + if (leftArg.type === `ref` && rightArg.type === `val`) { + // field op value + fieldArg = leftArg + valueArg = rightArg + } else if (leftArg.type === `val` && rightArg.type === `ref`) { + // value op field - need to flip the operation + fieldArg = rightArg + valueArg = leftArg + + // Flip the operation for reverse comparison + switch (operation) { + case `gt`: + operation = `lt` + break + case `gte`: + operation = `lte` + break + case `lt`: + operation = `gt` + break + case `lte`: + operation = `gte` + break + // eq stays the same + } + } + + if (fieldArg && valueArg) { + const fieldPath = (fieldArg as any).path + const index = findIndexForField(indexes, fieldPath) + + if (index) { + const queryValue = (valueArg as any).value + + // Map operation to IndexOperation enum + const indexOperation = operation as IndexOperation + + // Check if the index supports this operation + if (!index.supports(indexOperation)) { + return { canOptimize: false, matchingKeys: new Set() } + } + + const matchingKeys = index.lookup(indexOperation, queryValue) + return { canOptimize: true, matchingKeys } + } + } + + return { canOptimize: false, matchingKeys: new Set() } +} + +/** + * Checks if a simple comparison can be optimized + */ +function canOptimizeSimpleComparison( + expression: BasicExpression, + indexes: Map> +): boolean { + if (expression.type !== `func` || expression.args.length !== 2) { + return false + } + + const leftArg = expression.args[0]! + const rightArg = expression.args[1]! + + // Check both directions: field op value AND value op field + let fieldPath: Array | null = null + + if (leftArg.type === `ref` && rightArg.type === `val`) { + fieldPath = (leftArg as any).path + } else if (leftArg.type === `val` && rightArg.type === `ref`) { + fieldPath = (rightArg as any).path + } + + if (fieldPath) { + const index = findIndexForField(indexes, fieldPath) + return index !== undefined + } + + return false +} + +/** + * Optimizes AND expressions + */ +function optimizeAndExpression( + expression: BasicExpression, + indexes: Map> +): OptimizationResult { + if (expression.type !== `func` || expression.args.length < 2) { + return { canOptimize: false, matchingKeys: new Set() } + } + + const results: Array> = [] + + // Try to optimize each part, keep the optimizable ones + for (const arg of expression.args) { + const result = optimizeQueryRecursive(arg, indexes) + if (result.canOptimize) { + results.push(result) + } + } + + if (results.length > 0) { + // Use intersectSets utility for AND logic + const allMatchingSets = results.map((r) => r.matchingKeys) + const intersectedKeys = intersectSets(allMatchingSets) + return { canOptimize: true, matchingKeys: intersectedKeys } + } + + return { canOptimize: false, matchingKeys: new Set() } +} + +/** + * Checks if an AND expression can be optimized + */ +function canOptimizeAndExpression( + expression: BasicExpression, + indexes: Map> +): boolean { + if (expression.type !== `func` || expression.args.length < 2) { + return false + } + + // If any argument can be optimized, we can gain some speedup + return expression.args.some((arg) => canOptimizeExpression(arg, indexes)) +} + +/** + * Optimizes OR expressions + */ +function optimizeOrExpression( + expression: BasicExpression, + indexes: Map> +): OptimizationResult { + if (expression.type !== `func` || expression.args.length < 2) { + return { canOptimize: false, matchingKeys: new Set() } + } + + const results: Array> = [] + + // Try to optimize each part, keep the optimizable ones + for (const arg of expression.args) { + const result = optimizeQueryRecursive(arg, indexes) + if (result.canOptimize) { + results.push(result) + } + } + + if (results.length > 0) { + // Use unionSets utility for OR logic + const allMatchingSets = results.map((r) => r.matchingKeys) + const unionedKeys = unionSets(allMatchingSets) + return { canOptimize: true, matchingKeys: unionedKeys } + } + + return { canOptimize: false, matchingKeys: new Set() } +} + +/** + * Checks if an OR expression can be optimized + */ +function canOptimizeOrExpression( + expression: BasicExpression, + indexes: Map> +): boolean { + if (expression.type !== `func` || expression.args.length < 2) { + return false + } + + // If any argument can be optimized, we can gain some speedup + return expression.args.some((arg) => canOptimizeExpression(arg, indexes)) +} + +/** + * Optimizes IN array expressions + */ +function optimizeInArrayExpression( + expression: BasicExpression, + indexes: Map> +): OptimizationResult { + if (expression.type !== `func` || expression.args.length !== 2) { + return { canOptimize: false, matchingKeys: new Set() } + } + + const fieldArg = expression.args[0]! + const arrayArg = expression.args[1]! + + if ( + fieldArg.type === `ref` && + arrayArg.type === `val` && + Array.isArray((arrayArg as any).value) + ) { + const fieldPath = (fieldArg as any).path + const values = (arrayArg as any).value + const index = findIndexForField(indexes, fieldPath) + + if (index) { + // Check if the index supports IN operation + if (index.supports(`in`)) { + const matchingKeys = index.lookup(`in`, values) + return { canOptimize: true, matchingKeys } + } else if (index.supports(`eq`)) { + // Fallback to multiple equality lookups + const matchingKeys = new Set() + for (const value of values) { + const keysForValue = index.lookup(`eq`, value) + for (const key of keysForValue) { + matchingKeys.add(key) + } + } + return { canOptimize: true, matchingKeys } + } + } + } + + return { canOptimize: false, matchingKeys: new Set() } +} + +/** + * Checks if an IN array expression can be optimized + */ +function canOptimizeInArrayExpression( + expression: BasicExpression, + indexes: Map> +): boolean { + if (expression.type !== `func` || expression.args.length !== 2) { + return false + } + + const fieldArg = expression.args[0]! + const arrayArg = expression.args[1]! + + if ( + fieldArg.type === `ref` && + arrayArg.type === `val` && + Array.isArray((arrayArg as any).value) + ) { + const fieldPath = (fieldArg as any).path + const index = findIndexForField(indexes, fieldPath) + return index !== undefined + } + + return false +} diff --git a/packages/db/tests/collection-indexes.test.ts b/packages/db/tests/collection-indexes.test.ts new file mode 100644 index 00000000..5327ecc1 --- /dev/null +++ b/packages/db/tests/collection-indexes.test.ts @@ -0,0 +1,1476 @@ +import { beforeEach, describe, expect, it } from "vitest" +import mitt from "mitt" +import { createCollection } from "../src/collection" +import { createTransaction } from "../src/transactions" +import { + and, + eq, + gt, + gte, + inArray, + length, + lt, + lte, + or, +} from "../src/query/builder/functions" +import type { MutationFn, PendingMutation } from "../src/types" + +interface TestItem { + id: string + name: string + age: number + status: `active` | `inactive` | `pending` + score?: number + createdAt: Date +} + +// Index usage tracking utilities +interface IndexUsageStats { + rangeQueryCalls: number + fullScanCalls: number + indexesUsed: Array + queriesExecuted: Array<{ + type: `index` | `fullScan` + operation?: string + field?: string + value?: any + }> +} + +function createIndexUsageTracker(collection: any): { + stats: IndexUsageStats + restore: () => void +} { + const stats: IndexUsageStats = { + rangeQueryCalls: 0, + fullScanCalls: 0, + indexesUsed: [], + queriesExecuted: [], + } + + // Track index method calls by patching all existing indexes + const originalMethods = new Map() + + for (const [indexId, index] of collection.indexes) { + // Track lookup calls (new unified method) + const originalLookup = index.lookup.bind(index) + index.lookup = function (operation: any, value: any) { + stats.rangeQueryCalls++ + stats.indexesUsed.push(indexId) + stats.queriesExecuted.push({ + type: `index`, + operation, + field: index.expression?.path?.join(`.`), + value, + }) + return originalLookup(operation, value) + } + + originalMethods.set(indexId, { + lookup: originalLookup, + }) + } + + // Track full scan calls (entries() iteration) + const originalEntries = collection.entries + collection.entries = function* () { + // Only count as full scan if we're in a filtering context + // Check the call stack to see if we're inside createFilterFunction + const stack = new Error().stack || `` + if ( + stack.includes(`createFilterFunction`) || + stack.includes(`currentStateAsChanges`) + ) { + stats.fullScanCalls++ + stats.queriesExecuted.push({ + type: `fullScan`, + }) + } + yield* originalEntries.call(this) + } + + const restore = () => { + // Restore original index methods + for (const [indexId, index] of collection.indexes) { + const original = originalMethods.get(indexId) + if (original) { + index.lookup = original.lookup + } + } + collection.entries = originalEntries + } + + return { stats, restore } +} + +// Helper to assert index usage +function expectIndexUsage( + stats: IndexUsageStats, + expectations: { + shouldUseIndex: boolean + shouldUseFullScan?: boolean + indexCallCount?: number + fullScanCallCount?: number + } +) { + if (expectations.shouldUseIndex) { + expect(stats.rangeQueryCalls).toBeGreaterThan(0) + expect(stats.indexesUsed.length).toBeGreaterThan(0) + + if (expectations.indexCallCount !== undefined) { + expect(stats.rangeQueryCalls).toBe(expectations.indexCallCount) + } + } else { + expect(stats.rangeQueryCalls).toBe(0) + expect(stats.indexesUsed.length).toBe(0) + } + + if (expectations.shouldUseFullScan !== undefined) { + if (expectations.shouldUseFullScan) { + expect(stats.fullScanCalls).toBeGreaterThan(0) + + if (expectations.fullScanCallCount !== undefined) { + expect(stats.fullScanCalls).toBe(expectations.fullScanCallCount) + } + } else { + expect(stats.fullScanCalls).toBe(0) + } + } +} + +// Helper to run a test with index usage tracking (automatically handles setup/cleanup) +function withIndexTracking( + collection: any, + testFn: (tracker: { stats: IndexUsageStats }) => void | Promise +): void | Promise { + const tracker = createIndexUsageTracker(collection) + + try { + const result = testFn(tracker) + if (result instanceof Promise) { + return result.finally(() => tracker.restore()) + } + tracker.restore() + } catch (error) { + tracker.restore() + throw error + } +} + +describe(`Collection Indexes`, () => { + let collection: ReturnType> + let testData: Array + let mutationFn: MutationFn + let emitter: any + + beforeEach(async () => { + testData = [ + { + id: `1`, + name: `Alice`, + age: 25, + status: `active`, + score: 95, + createdAt: new Date(`2023-01-01`), + }, + { + id: `2`, + name: `Bob`, + age: 30, + status: `inactive`, + score: 80, + createdAt: new Date(`2023-01-02`), + }, + { + id: `3`, + name: `Charlie`, + age: 35, + status: `active`, + score: 90, + createdAt: new Date(`2023-01-03`), + }, + { + id: `4`, + name: `Diana`, + age: 28, + status: `pending`, + score: 85, + createdAt: new Date(`2023-01-04`), + }, + { + id: `5`, + name: `Eve`, + age: 22, + status: `active`, + score: undefined, + createdAt: new Date(`2023-01-05`), + }, + ] + + emitter = mitt() + + // Create mutation handler that syncs changes back via emitter + mutationFn = ({ transaction }) => { + emitter.emit(`sync`, transaction.mutations) + return Promise.resolve() + } + + collection = createCollection({ + getKey: (item) => item.id, + startSync: true, + sync: { + sync: ({ begin, write, commit }) => { + // Provide initial data through sync + begin() + for (const item of testData) { + write({ + type: `insert`, + value: item, + }) + } + commit() + + // Listen for mutations and sync them back (only register once) + if (!emitter.all.has(`sync`)) { + emitter.on(`sync`, (changes: Array) => { + begin() + changes.forEach((change) => { + write({ + type: change.type, + value: change.modified as unknown as TestItem, + }) + }) + commit() + }) + } + }, + }, + }) + + // Wait for sync to complete + await collection.stateWhenReady() + + // Verify data was loaded + expect(collection.size).toBe(5) + }) + + describe(`Index Creation`, () => { + it(`should create an index on a simple field`, () => { + const index = collection.createIndex((row) => row.status) + + expect(index.id).toMatch(/^\d+$/) + expect(index.name).toBeUndefined() + expect(index.expression.type).toBe(`ref`) + expect(index.indexedKeysSet.size).toBe(5) + }) + + it(`should create a named index`, () => { + const index = collection.createIndex((row) => row.age, { + name: `ageIndex`, + }) + + expect(index.name).toBe(`ageIndex`) + expect(index.indexedKeysSet.size).toBe(5) + }) + + it(`should create multiple indexes`, () => { + const statusIndex = collection.createIndex((row) => row.status) + const ageIndex = collection.createIndex((row) => row.age) + + expect(statusIndex.id).not.toBe(ageIndex.id) + expect(statusIndex.indexedKeysSet.size).toBe(5) + expect(ageIndex.indexedKeysSet.size).toBe(5) + }) + + it(`should maintain ordered entries`, () => { + const ageIndex = collection.createIndex((row) => row.age) + + // Ages should be ordered: 22, 25, 28, 30, 35 + const orderedAges = ageIndex.orderedEntriesArray.map(([age]) => age) + expect(orderedAges).toEqual([22, 25, 28, 30, 35]) + }) + + it(`should handle duplicate values in index`, () => { + const statusIndex = collection.createIndex((row) => row.status) + + // Should have 3 unique status values + expect(statusIndex.orderedEntriesArray.length).toBe(3) + + // "active" status should have 3 items + const activeKeys = statusIndex.valueMapData.get(`active`) + expect(activeKeys?.size).toBe(3) + }) + + it(`should handle undefined/null values`, () => { + const scoreIndex = collection.createIndex((row) => row.score) + + // Should include the item with undefined score + expect(scoreIndex.indexedKeysSet.size).toBe(5) + + // undefined should be first in ordered entries + const firstValue = scoreIndex.orderedEntriesArray[0]?.[0] + expect(firstValue).toBeUndefined() + }) + }) + + describe(`Index Maintenance`, () => { + beforeEach(() => { + collection.createIndex((row) => row.status) + collection.createIndex((row) => row.age) + }) + + it(`should reflect mutations in collection state and subscriptions`, async () => { + const changes: Array = [] + + // Subscribe to all changes + const unsubscribe = collection.subscribeChanges((items) => { + changes.push(...items) + }) + + const newItem: TestItem = { + id: `6`, + name: `Frank`, + age: 40, + status: `active`, + createdAt: new Date(`2023-01-06`), + } + + const tx = createTransaction({ mutationFn }) + tx.mutate(() => collection.insert(newItem)) + await tx.isPersisted.promise + + // Item should be in collection state + expect(collection.size).toBe(6) + expect(collection.get(`6`)).toEqual(newItem) + + // Should trigger subscription + expect(changes).toHaveLength(1) + expect(changes[0]?.type).toBe(`insert`) + expect(changes[0]?.value.name).toBe(`Frank`) + + unsubscribe() + }) + + it(`should reflect updates in collection state and subscriptions`, async () => { + const changes: Array = [] + + const unsubscribe = collection.subscribeChanges((items) => { + changes.push(...items) + }) + + const tx = createTransaction({ mutationFn }) + tx.mutate(() => + collection.update(`1`, (draft) => { + draft.status = `inactive` + draft.age = 26 + }) + ) + await tx.isPersisted.promise + + // Updated item should be in collection state + const updatedItem = collection.get(`1`) + expect(updatedItem?.status).toBe(`inactive`) + expect(updatedItem?.age).toBe(26) + + // Should trigger subscription + expect(changes).toHaveLength(1) + expect(changes[0]?.type).toBe(`update`) + expect(changes[0]?.value.status).toBe(`inactive`) + + unsubscribe() + }) + + it(`should reflect deletions in collection state and subscriptions`, async () => { + const changes: Array = [] + + const unsubscribe = collection.subscribeChanges((items) => { + changes.push(...items) + }) + + const tx = createTransaction({ mutationFn }) + tx.mutate(() => collection.delete(`1`)) + await tx.isPersisted.promise + + // Item should be removed from collection state + expect(collection.size).toBe(4) + expect(collection.get(`1`)).toBeUndefined() + + // Should trigger subscription (may be called multiple times in test environment) + expect(changes.length).toBeGreaterThanOrEqual(1) + expect(changes[0]?.type).toBe(`delete`) + expect(changes[0]?.key).toBe(`1`) + + // Ensure all events are the same delete event + const deleteEvents = changes.filter( + (c) => c.type === `delete` && c.key === `1` + ) + expect(deleteEvents.length).toBe(changes.length) // All events should be the same delete + + unsubscribe() + }) + + it(`should handle filtered subscriptions correctly with mutations`, async () => { + const activeChanges: Array = [] + + const unsubscribe = collection.subscribeChanges( + (items) => { + activeChanges.push(...items) + }, + { + where: (row) => eq(row.status, `active`), + } + ) + + // Change inactive item to active (should trigger) + const tx1 = createTransaction({ mutationFn }) + tx1.mutate(() => + collection.update(`2`, (draft) => { + draft.status = `active` + }) + ) + await tx1.isPersisted.promise + + expect(activeChanges).toHaveLength(1) + expect(activeChanges[0]?.value.name).toBe(`Bob`) + + // Change active item to inactive (should trigger delete event for item leaving filter) + activeChanges.length = 0 + const tx2 = createTransaction({ mutationFn }) + tx2.mutate(() => + collection.update(`1`, (draft) => { + draft.status = `inactive` + }) + ) + await tx2.isPersisted.promise + + // Should trigger delete event for item that no longer matches filter + expect(activeChanges).toHaveLength(1) + expect(activeChanges[0]?.type).toBe(`delete`) + expect(activeChanges[0]?.key).toBe(`1`) + expect(activeChanges[0]?.value.status).toBe(`active`) // Should be the previous value + + unsubscribe() + }) + }) + + describe(`Range Queries`, () => { + beforeEach(() => { + collection.createIndex((row) => row.age) + }) + + it(`should perform equality queries`, () => { + withIndexTracking(collection, (tracker) => { + const result = collection.currentStateAsChanges({ + where: (row) => eq(row.age, 25), + }) + + expect(result).toHaveLength(1) + expect(result[0]?.value.name).toBe(`Alice`) + + // Verify 100% index usage + expectIndexUsage(tracker.stats, { + shouldUseIndex: true, + shouldUseFullScan: false, + indexCallCount: 1, + fullScanCallCount: 0, + }) + }) + }) + + it(`should perform greater than queries`, () => { + withIndexTracking(collection, (tracker) => { + const result = collection.currentStateAsChanges({ + where: (row) => gt(row.age, 28), + }) + + expect(result).toHaveLength(2) + const names = result.map((r) => r.value.name).sort() + expect(names).toEqual([`Bob`, `Charlie`]) + + // Verify 100% index usage + expectIndexUsage(tracker.stats, { + shouldUseIndex: true, + shouldUseFullScan: false, + indexCallCount: 1, + fullScanCallCount: 0, + }) + }) + }) + + it(`should perform greater than or equal queries`, () => { + withIndexTracking(collection, (tracker) => { + const result = collection.currentStateAsChanges({ + where: (row) => gte(row.age, 28), + }) + + expect(result).toHaveLength(3) + const names = result.map((r) => r.value.name).sort() + expect(names).toEqual([`Bob`, `Charlie`, `Diana`]) + + // Verify 100% index usage + expectIndexUsage(tracker.stats, { + shouldUseIndex: true, + shouldUseFullScan: false, + indexCallCount: 1, + fullScanCallCount: 0, + }) + }) + }) + + it(`should perform less than queries`, () => { + withIndexTracking(collection, (tracker) => { + const result = collection.currentStateAsChanges({ + where: (row) => lt(row.age, 28), + }) + + expect(result).toHaveLength(2) + const names = result.map((r) => r.value.name).sort() + expect(names).toEqual([`Alice`, `Eve`]) + + // Verify 100% index usage + expectIndexUsage(tracker.stats, { + shouldUseIndex: true, + shouldUseFullScan: false, + indexCallCount: 1, + fullScanCallCount: 0, + }) + }) + }) + + it(`should perform less than or equal queries`, () => { + withIndexTracking(collection, (tracker) => { + const result = collection.currentStateAsChanges({ + where: (row) => lte(row.age, 28), + }) + + expect(result).toHaveLength(3) + const names = result.map((r) => r.value.name).sort() + expect(names).toEqual([`Alice`, `Diana`, `Eve`]) + + // Verify 100% index usage + expectIndexUsage(tracker.stats, { + shouldUseIndex: true, + shouldUseFullScan: false, + indexCallCount: 1, + fullScanCallCount: 0, + }) + }) + }) + + it(`should fall back to full scan for complex expressions`, () => { + withIndexTracking(collection, (tracker) => { + // This should work but use full scan since it's not a simple comparison + // Using a complex expression that can't be optimized with indexes + const result = collection.currentStateAsChanges({ + where: (row) => gt(length(row.name), 3), + }) + + expect(result).toHaveLength(3) // Alice, Charlie, Diana (names longer than 3 chars) + const names = result.map((r) => r.value.name).sort() + expect(names).toEqual([`Alice`, `Charlie`, `Diana`]) + + // Verify full scan is used, no index + expectIndexUsage(tracker.stats, { + shouldUseIndex: false, + shouldUseFullScan: true, + indexCallCount: 0, + fullScanCallCount: 1, + }) + }) + }) + + it(`should verify index optimization is being used for simple queries`, () => { + withIndexTracking(collection, (tracker) => { + // This should use index optimization + const result = collection.currentStateAsChanges({ + where: (row) => eq(row.age, 25), + }) + + expect(result).toHaveLength(1) + expect(result[0]?.value.name).toBe(`Alice`) + + // Verify 100% index usage, no full scan + expectIndexUsage(tracker.stats, { + shouldUseIndex: true, + shouldUseFullScan: false, + indexCallCount: 1, + fullScanCallCount: 0, + }) + + // Verify the specific index was used + expect(tracker.stats.indexesUsed[0]).toMatch(/^\d+$/) + expect(tracker.stats.queriesExecuted[0]).toMatchObject({ + type: `index`, + operation: `eq`, + field: `age`, + value: 25, + }) + }) + }) + + it(`should verify different range operations use indexes`, () => { + withIndexTracking(collection, (tracker) => { + // Test multiple range operations + const eqResult = collection.currentStateAsChanges({ + where: (row) => eq(row.age, 25), + }) + const gtResult = collection.currentStateAsChanges({ + where: (row) => gt(row.age, 30), + }) + const lteResult = collection.currentStateAsChanges({ + where: (row) => lte(row.age, 28), + }) + + expect(eqResult).toHaveLength(1) + expect(gtResult).toHaveLength(1) // Charlie (35) + expect(lteResult).toHaveLength(3) // Alice (25), Diana (28), Eve (22) + + // Should have used index 3 times, no full scans + expectIndexUsage(tracker.stats, { + shouldUseIndex: true, + shouldUseFullScan: false, + indexCallCount: 3, + fullScanCallCount: 0, + }) + + // Verify all operations used indexes + expect(tracker.stats.queriesExecuted).toHaveLength(3) + expect(tracker.stats.queriesExecuted[0]).toMatchObject({ + type: `index`, + operation: `eq`, + }) + expect(tracker.stats.queriesExecuted[1]).toMatchObject({ + type: `index`, + operation: `gt`, + }) + expect(tracker.stats.queriesExecuted[2]).toMatchObject({ + type: `index`, + operation: `lte`, + }) + }) + }) + + it(`should verify complex expressions fall back to full scan`, () => { + withIndexTracking(collection, (tracker) => { + // This should fall back to full scan + const result = collection.currentStateAsChanges({ + where: (row) => gt(length(row.name), 3), + }) + + expect(result).toHaveLength(3) // Alice, Charlie, Diana + + // Should use full scan, no index + expectIndexUsage(tracker.stats, { + shouldUseIndex: false, + shouldUseFullScan: true, + indexCallCount: 0, + fullScanCallCount: 1, + }) + + expect(tracker.stats.queriesExecuted[0]).toMatchObject({ + type: `fullScan`, + }) + }) + }) + + it(`should verify queries without matching indexes use full scan`, () => { + withIndexTracking(collection, (tracker) => { + // Query on a field without an index (status) + const result = collection.currentStateAsChanges({ + where: (row) => eq(row.status, `active`), + }) + + expect(result).toHaveLength(3) // Alice, Charlie, Eve + + // Should use full scan since no status index exists + expectIndexUsage(tracker.stats, { + shouldUseIndex: false, + shouldUseFullScan: true, + indexCallCount: 0, + fullScanCallCount: 1, + }) + }) + }) + }) + + describe(`Complex Query Optimization`, () => { + beforeEach(() => { + collection.createIndex((row) => row.age) + collection.createIndex((row) => row.status) + }) + + it(`should optimize AND queries with range conditions using indexes`, () => { + withIndexTracking(collection, (tracker) => { + // Test the key case: range query with AND + const result = collection.currentStateAsChanges({ + where: (row) => and(gt(row.age, 25), lt(row.age, 35)), + }) + + expect(result).toHaveLength(2) // Bob (30), Diana (28) + const names = result.map((r) => r.value.name).sort() + expect(names).toEqual([`Bob`, `Diana`]) + + // Verify 100% index usage - should use age index twice + expectIndexUsage(tracker.stats, { + shouldUseIndex: true, + shouldUseFullScan: false, + indexCallCount: 2, // gt and lt operations + fullScanCallCount: 0, + }) + + // Verify both operations used the age index + expect(tracker.stats.queriesExecuted).toHaveLength(2) + expect(tracker.stats.queriesExecuted[0]).toMatchObject({ + type: `index`, + operation: `gt`, + field: `age`, + value: 25, + }) + expect(tracker.stats.queriesExecuted[1]).toMatchObject({ + type: `index`, + operation: `lt`, + field: `age`, + value: 35, + }) + }) + }) + + it(`should optimize AND queries with multiple field conditions`, () => { + withIndexTracking(collection, (tracker) => { + const result = collection.currentStateAsChanges({ + where: (row) => and(eq(row.status, `active`), gte(row.age, 25)), + }) + + expect(result).toHaveLength(2) // Alice (25, active), Charlie (35, active) + const names = result.map((r) => r.value.name).sort() + expect(names).toEqual([`Alice`, `Charlie`]) + + // Verify 100% index usage - should use both status and age indexes + expectIndexUsage(tracker.stats, { + shouldUseIndex: true, + shouldUseFullScan: false, + indexCallCount: 2, // eq and gte operations + fullScanCallCount: 0, + }) + + // Verify different indexes were used + expect(tracker.stats.queriesExecuted).toHaveLength(2) + expect(tracker.stats.queriesExecuted[0]).toMatchObject({ + type: `index`, + operation: `eq`, + field: `status`, + value: `active`, + }) + expect(tracker.stats.queriesExecuted[1]).toMatchObject({ + type: `index`, + operation: `gte`, + field: `age`, + value: 25, + }) + }) + }) + + it(`should optimize OR queries using indexes`, () => { + withIndexTracking(collection, (tracker) => { + const result = collection.currentStateAsChanges({ + where: (row) => or(eq(row.age, 25), eq(row.age, 35)), + }) + + expect(result).toHaveLength(2) // Alice (25), Charlie (35) + const names = result.map((r) => r.value.name).sort() + expect(names).toEqual([`Alice`, `Charlie`]) + + // Verify 100% index usage - should use age index twice + expectIndexUsage(tracker.stats, { + shouldUseIndex: true, + shouldUseFullScan: false, + indexCallCount: 2, // Two eq operations + fullScanCallCount: 0, + }) + + // Verify both operations used the age index + expect(tracker.stats.queriesExecuted).toHaveLength(2) + expect(tracker.stats.queriesExecuted[0]).toMatchObject({ + type: `index`, + operation: `eq`, + field: `age`, + value: 25, + }) + expect(tracker.stats.queriesExecuted[1]).toMatchObject({ + type: `index`, + operation: `eq`, + field: `age`, + value: 35, + }) + }) + }) + + it(`should optimize inArray queries using indexes`, () => { + withIndexTracking(collection, (tracker) => { + const result = collection.currentStateAsChanges({ + where: (row) => inArray(row.status, [`active`, `pending`]), + }) + + expect(result).toHaveLength(4) // Alice, Charlie, Eve (active), Diana (pending) + const names = result.map((r) => r.value.name).sort() + expect(names).toEqual([`Alice`, `Charlie`, `Diana`, `Eve`]) + + // Verify 100% index usage - should use status index once with IN operation + expectIndexUsage(tracker.stats, { + shouldUseIndex: true, + shouldUseFullScan: false, + indexCallCount: 1, // One IN operation for the array values + fullScanCallCount: 0, + }) + + // Verify the IN operation was used + expect(tracker.stats.queriesExecuted).toHaveLength(1) + expect(tracker.stats.queriesExecuted[0]).toMatchObject({ + type: `index`, + operation: `in`, + field: `status`, + value: [`active`, `pending`], + }) + }) + }) + + it(`should optimize complex nested AND/OR expressions`, () => { + withIndexTracking(collection, (tracker) => { + // (age >= 25 AND age <= 30) OR status = 'pending' + const result = collection.currentStateAsChanges({ + where: (row) => + or( + and(gte(row.age, 25), lte(row.age, 30)), + eq(row.status, `pending`) + ), + }) + + expect(result).toHaveLength(3) // Alice (25), Bob (30), Diana (28, pending) + const names = result.map((r) => r.value.name).sort() + expect(names).toEqual([`Alice`, `Bob`, `Diana`]) + + // Verify 100% index usage - should use age index twice + status index once + expectIndexUsage(tracker.stats, { + shouldUseIndex: true, + shouldUseFullScan: false, + indexCallCount: 3, + fullScanCallCount: 0, + }) + + // Verify the operations + expect(tracker.stats.queriesExecuted).toHaveLength(3) + expect(tracker.stats.queriesExecuted[0]).toMatchObject({ + type: `index`, + operation: `gte`, + field: `age`, + value: 25, + }) + expect(tracker.stats.queriesExecuted[1]).toMatchObject({ + type: `index`, + operation: `lte`, + field: `age`, + value: 30, + }) + expect(tracker.stats.queriesExecuted[2]).toMatchObject({ + type: `index`, + operation: `eq`, + field: `status`, + value: `pending`, + }) + }) + }) + + it(`should partially optimize when some conditions can be optimized`, () => { + withIndexTracking(collection, (tracker) => { + // Mix of optimizable and non-optimizable conditions + const result = collection.currentStateAsChanges({ + where: (row) => + and( + eq(row.status, `active`), // Can optimize with index + gt(row.age, 24) // Can also optimize - will be AND combined + ), + }) + + expect(result).toHaveLength(2) // Alice (25), Charlie (35) - both active and age > 24 + const names = result.map((r) => r.value.name).sort() + expect(names).toEqual([`Alice`, `Charlie`]) + + // Should use optimization: both conditions can use indexes + expectIndexUsage(tracker.stats, { + shouldUseIndex: true, + shouldUseFullScan: false, + indexCallCount: 2, + fullScanCallCount: 0, + }) + }) + }) + + it(`should optimize queries with missing indexes by using partial optimization`, () => { + withIndexTracking(collection, (tracker) => { + // Query on a field without an index (name) + const result = collection.currentStateAsChanges({ + where: (row) => + and( + eq(row.age, 25), // Has index + eq(row.name, `Alice`) // No index on name + ), + }) + + expect(result).toHaveLength(1) // Alice (25, name Alice) + expect(result[0]?.value.name).toBe(`Alice`) + + // Should use partial optimization: age index, then filter by name + expectIndexUsage(tracker.stats, { + shouldUseIndex: true, + shouldUseFullScan: false, + indexCallCount: 1, + fullScanCallCount: 0, + }) + }) + }) + + it(`should fall back to full scan when no conditions can be optimized`, () => { + withIndexTracking(collection, (tracker) => { + // Only complex expressions that can't be optimized + const result = collection.currentStateAsChanges({ + where: (row) => gt(length(row.name), 3), + }) + + expect(result).toHaveLength(3) // Alice, Charlie, Diana (names > 3 chars) + const names = result.map((r) => r.value.name).sort() + expect(names).toEqual([`Alice`, `Charlie`, `Diana`]) + + // Should fall back to full scan since no conditions can be optimized + expectIndexUsage(tracker.stats, { + shouldUseIndex: false, + shouldUseFullScan: true, + indexCallCount: 0, + fullScanCallCount: 1, + }) + }) + }) + + it(`should fall back to full scan for complex nested expressions`, () => { + withIndexTracking(collection, (tracker) => { + // Complex expression involving function calls - no simple field comparisons + const result = collection.currentStateAsChanges({ + where: (row) => + and( + gt(length(row.name), 4), // Complex - can't optimize (Alice=5, Charlie=7, Diana=5) + gt(length(row.status), 6) // Complex - can't optimize (only "inactive" = 8 > 6) + ), + }) + + expect(result).toHaveLength(1) // Only Diana has name>4 AND status>6 (Diana name=5, status="pending"=7) + const names = result.map((r) => r.value.name).sort() + expect(names).toEqual([`Diana`]) + + // Should fall back to full scan for complex expressions + expectIndexUsage(tracker.stats, { + shouldUseIndex: false, + shouldUseFullScan: true, + indexCallCount: 0, + fullScanCallCount: 1, + }) + }) + }) + + it(`should fall back to full scan when OR conditions can't be optimized`, () => { + withIndexTracking(collection, (tracker) => { + // OR with complex conditions that can't be optimized + const result = collection.currentStateAsChanges({ + where: (row) => + or( + gt(length(row.name), 6), // Complex - can't optimize (only Charlie has name length 7 > 6) + gt(length(row.status), 7) // Complex - can't optimize (only Bob has status "inactive" = 8 > 7) + ), + }) + + expect(result).toHaveLength(2) // Charlie (name length 7 > 6), Bob (status length 8 > 7) + const names = result.map((r) => r.value.name).sort() + expect(names).toEqual([`Bob`, `Charlie`]) + + // Should fall back to full scan when no OR branches can be optimized + expectIndexUsage(tracker.stats, { + shouldUseIndex: false, + shouldUseFullScan: true, + indexCallCount: 0, + fullScanCallCount: 1, + }) + }) + }) + + it(`should fall back to full scan when querying non-indexed fields only`, () => { + withIndexTracking(collection, (tracker) => { + // Query only on fields without indexes (name and score fields don't have indexes) + const result = collection.currentStateAsChanges({ + where: (row) => and(eq(row.name, `Alice`), eq(row.score!, 95)), + }) + + expect(result).toHaveLength(1) // Alice + expect(result[0]?.value.name).toBe(`Alice`) + + // Should fall back to full scan since no indexed fields are used + expectIndexUsage(tracker.stats, { + shouldUseIndex: false, + shouldUseFullScan: true, + indexCallCount: 0, + fullScanCallCount: 1, + }) + }) + }) + + it(`should handle mixed optimization scenarios within same query`, () => { + // Test two separate queries to show different optimization strategies + + // First: partial optimization (age index + name filter) + withIndexTracking(collection, (tracker1) => { + const result1 = collection.currentStateAsChanges({ + where: (row) => + and( + eq(row.age, 25), // Can optimize - has index + eq(row.name, `Alice`) // Can't optimize - no index + ), + }) + + expect(result1).toHaveLength(1) // Alice via partial optimization + expectIndexUsage(tracker1.stats, { + shouldUseIndex: true, + shouldUseFullScan: false, + indexCallCount: 1, + fullScanCallCount: 0, + }) + }) + + // Second: full scan (no optimizable conditions) + withIndexTracking(collection, (tracker2) => { + const result2 = collection.currentStateAsChanges({ + where: (row) => + and( + eq(row.name, `Alice`), // Can't optimize - no index + gt(length(row.name), 3) // Can't optimize - complex expression + ), + }) + + expect(result2).toHaveLength(1) // Alice via full scan + expectIndexUsage(tracker2.stats, { + shouldUseIndex: false, + shouldUseFullScan: true, + indexCallCount: 0, + fullScanCallCount: 1, + }) + }) + }) + }) + + describe(`Index Usage Verification`, () => { + it(`should track multiple indexes and their usage patterns`, () => { + // Create multiple indexes + collection.createIndex((row) => row.age, { + name: `ageIndex`, + }) + collection.createIndex((row) => row.status, { + name: `statusIndex`, + }) + collection.createIndex((row) => row.name, { + name: `nameIndex`, + }) + + withIndexTracking(collection, (tracker) => { + // Query using age index + const ageQuery = collection.currentStateAsChanges({ + where: (row) => gte(row.age, 30), + }) + + // Query using status index + const statusQuery = collection.currentStateAsChanges({ + where: (row) => eq(row.status, `active`), + }) + + // Query using name index + const nameQuery = collection.currentStateAsChanges({ + where: (row) => eq(row.name, `Alice`), + }) + + expect(ageQuery).toHaveLength(2) // Bob (30), Charlie (35) + expect(statusQuery).toHaveLength(3) // Alice, Charlie, Eve + expect(nameQuery).toHaveLength(1) // Alice + + // Verify all queries used indexes + expectIndexUsage(tracker.stats, { + shouldUseIndex: true, + shouldUseFullScan: false, + indexCallCount: 3, + fullScanCallCount: 0, + }) + + // Verify specific indexes were used + expect(tracker.stats.indexesUsed).toHaveLength(3) + expect(tracker.stats.queriesExecuted).toEqual([ + { type: `index`, operation: `gte`, field: `age`, value: 30 }, + { type: `index`, operation: `eq`, field: `status`, value: `active` }, + { type: `index`, operation: `eq`, field: `name`, value: `Alice` }, + ]) + + // Test that we can identify which specific index was used + const usedIndexes = new Set(tracker.stats.indexesUsed) + expect(usedIndexes.size).toBe(3) // Three different indexes used + }) + }) + + it(`should verify 100% index usage for subscriptions`, () => { + collection.createIndex((row) => row.status) + + withIndexTracking(collection, (tracker) => { + const changes: Array = [] + + // Subscribe with a where clause that should use index + const unsubscribe = collection.subscribeChanges( + (items) => changes.push(...items), + { + includeInitialState: true, + where: (row) => eq(row.status, `active`), + } + ) + + expect(changes).toHaveLength(3) // Initial active items + + // Verify initial state query used index + expectIndexUsage(tracker.stats, { + shouldUseIndex: true, + shouldUseFullScan: false, + indexCallCount: 1, + fullScanCallCount: 0, + }) + + unsubscribe() + }) + }) + }) + + describe(`Filtered Subscriptions`, () => { + beforeEach(() => { + collection.createIndex((row) => row.age) + collection.createIndex((row) => row.status) + }) + + it(`should subscribe to filtered changes with index optimization`, async () => { + await withIndexTracking(collection, async (tracker) => { + const changes: Array = [] + + const unsubscribe = collection.subscribeChanges( + (items) => { + changes.push(...items) + }, + { + includeInitialState: true, + where: (row) => eq(row.status, `active`), + } + ) + + expect(changes).toHaveLength(3) // Initial active items + expect(changes.map((c) => c.value.name).sort()).toEqual([ + `Alice`, + `Charlie`, + `Eve`, + ]) + + // Verify initial state query used index + expectIndexUsage(tracker.stats, { + shouldUseIndex: true, + shouldUseFullScan: false, + indexCallCount: 1, + fullScanCallCount: 0, + }) + + // Add a new active item + changes.length = 0 + const tx1 = createTransaction({ mutationFn }) + tx1.mutate(() => + collection.insert({ + id: `6`, + name: `Frank`, + age: 40, + status: `active`, + createdAt: new Date(), + }) + ) + await tx1.isPersisted.promise + + expect(changes).toHaveLength(1) + expect(changes[0]?.value.name).toBe(`Frank`) + + // Add an inactive item (should not trigger) + changes.length = 0 + const tx2 = createTransaction({ mutationFn }) + tx2.mutate(() => + collection.insert({ + id: `7`, + name: `Grace`, + age: 35, + status: `inactive`, + createdAt: new Date(), + }) + ) + await tx2.isPersisted.promise + + expect(changes).toHaveLength(0) + + // Change an active item to inactive (should trigger delete event for item leaving filter) + changes.length = 0 + const tx3 = createTransaction({ mutationFn }) + tx3.mutate(() => + collection.update(`1`, (draft) => { + draft.status = `inactive` + }) + ) + await tx3.isPersisted.promise + + expect(changes).toHaveLength(1) // Should emit delete event for item leaving filter + expect(changes[0]?.type).toBe(`delete`) + expect(changes[0]?.key).toBe(`1`) + expect(changes[0]?.value.status).toBe(`active`) // Should be the previous value + + unsubscribe() + }) + }) + + it(`should handle range queries in subscriptions`, async () => { + await withIndexTracking(collection, async (tracker) => { + const changes: Array = [] + + const unsubscribe = collection.subscribeChanges( + (items) => { + changes.push(...items) + }, + { + includeInitialState: true, + where: (row) => gte(row.age, 30), + } + ) + + expect(changes).toHaveLength(2) // Bob (30) and Charlie (35) + expect(changes.map((c) => c.value.name).sort()).toEqual([ + `Bob`, + `Charlie`, + ]) + + // Verify initial state query used index + expectIndexUsage(tracker.stats, { + shouldUseIndex: true, + shouldUseFullScan: false, + indexCallCount: 1, + fullScanCallCount: 0, + }) + + // Update someone to be over 30 + changes.length = 0 + const tx = createTransaction({ mutationFn }) + tx.mutate(() => + collection.update(`4`, (draft) => { + draft.age = 32 + }) + ) + await tx.isPersisted.promise + + expect(changes).toHaveLength(1) + expect(changes[0]?.value.name).toBe(`Diana`) + + unsubscribe() + }) + }) + + it(`should use indexes for filtered subscription initial state`, async () => { + collection.createIndex((row) => row.status) + + await withIndexTracking(collection, (tracker) => { + const changes: Array = [] + + const unsubscribe = collection.subscribeChanges( + (items) => { + changes.push(...items) + }, + { + includeInitialState: true, + where: (row) => eq(row.status, `active`), + } + ) + + expect(changes).toHaveLength(3) // Initial active items + + // Verify initial state query used index + expectIndexUsage(tracker.stats, { + shouldUseIndex: true, + shouldUseFullScan: false, + indexCallCount: 1, + fullScanCallCount: 0, + }) + + unsubscribe() + }) + }) + }) + + describe(`Performance and Edge Cases`, () => { + it(`should handle special values correctly in indexes and queries`, async () => { + // Create a new collection with special values in the initial sync data + const specialData: Array = [ + ...testData, + { + id: `null_age`, + name: `Null Age`, + age: null as any, + status: `active`, + createdAt: new Date(), + }, + { + id: `zero_age`, + name: `Zero Age`, + age: 0, + status: `active`, + createdAt: new Date(), + }, + { + id: `negative_age`, + name: `Negative Age`, + age: -5, + status: `active`, + createdAt: new Date(), + }, + ] + + const specialCollection = createCollection({ + getKey: (item) => item.id, + startSync: true, + sync: { + sync: ({ begin, write, commit }) => { + begin() + for (const item of specialData) { + write({ + type: `insert`, + value: item, + }) + } + commit() + }, + }, + }) + + await specialCollection.stateWhenReady() + + const ageIndex = specialCollection.createIndex((row) => row.age) + + // Verify index contains all items including special values + expect(ageIndex.indexedKeysSet.size).toBe(8) // Original 5 + 3 special + expect(ageIndex.orderedEntriesArray).toHaveLength(8) // 8 unique age values (including null) + + // Null/undefined should be ordered first + const firstValue = ageIndex.orderedEntriesArray[0]?.[0] + expect(firstValue == null).toBe(true) + + // Test that queries with special values use indexes correctly + withIndexTracking(specialCollection, (tracker) => { + // Query for zero age + const zeroAgeResult = specialCollection.currentStateAsChanges({ + where: (row) => eq(row.age, 0), + }) + expect(zeroAgeResult).toHaveLength(1) + expect(zeroAgeResult[0]?.value.name).toBe(`Zero Age`) + + // Query for negative age + const negativeAgeResult = specialCollection.currentStateAsChanges({ + where: (row) => eq(row.age, -5), + }) + expect(negativeAgeResult).toHaveLength(1) + expect(negativeAgeResult[0]?.value.name).toBe(`Negative Age`) + + // Query for ages greater than negative + const gtNegativeResult = specialCollection.currentStateAsChanges({ + where: (row) => gt(row.age, -1), + }) + expect(gtNegativeResult.length).toBeGreaterThan(0) // Should find positive ages + + // Verify all queries used indexes + expectIndexUsage(tracker.stats, { + shouldUseIndex: true, + shouldUseFullScan: false, + indexCallCount: 3, + fullScanCallCount: 0, + }) + }) + }) + + it(`should handle index creation on empty collection`, () => { + const emptyCollection = createCollection({ + getKey: (item) => item.id, + sync: { sync: () => {} }, + }) + + const index = emptyCollection.createIndex((row) => row.age) + + expect(index.indexedKeysSet.size).toBe(0) + expect(index.orderedEntriesArray).toHaveLength(0) + expect(index.valueMapData.size).toBe(0) + }) + + it(`should handle index updates when data changes through sync`, async () => { + const ageIndex = collection.createIndex((row) => row.age) + + // Original index should have 5 items + expect(ageIndex.indexedKeysSet.size).toBe(5) + expect(ageIndex.orderedEntriesArray).toHaveLength(5) + + // Perform mutations that will sync back and update indexes + const tx1 = createTransaction({ mutationFn }) + tx1.mutate(() => + collection.insert({ + id: `new1`, + name: `NewItem1`, + age: 50, + status: `active`, + createdAt: new Date(), + }) + ) + + const tx2 = createTransaction({ mutationFn }) + tx2.mutate(() => + collection.update(`1`, (draft) => { + draft.age = 99 + }) + ) + + const tx3 = createTransaction({ mutationFn }) + tx3.mutate(() => collection.delete(`2`)) + + await Promise.all([ + tx1.isPersisted.promise, + tx2.isPersisted.promise, + tx3.isPersisted.promise, + ]) + + // Wait a bit for sync to complete and indexes to update + await new Promise((resolve) => setTimeout(resolve, 10)) + + // Verify that indexes are updated after sync + expect(ageIndex.indexedKeysSet.size).toBe(5) // 5 original - 1 deleted + 1 inserted + + // Test that index-optimized queries work with the updated data + withIndexTracking(collection, (tracker) => { + const result = collection.currentStateAsChanges({ + where: (row) => gte(row.age, 50), + }) + + // Should find items with age >= 50 using index + expect(result.length).toBeGreaterThanOrEqual(1) + + // Verify it used the index + expectIndexUsage(tracker.stats, { + shouldUseIndex: true, + shouldUseFullScan: false, + indexCallCount: 1, + fullScanCallCount: 0, + }) + }) + }) + }) +}) diff --git a/packages/db/tests/collection-subscribe-changes.test.ts b/packages/db/tests/collection-subscribe-changes.test.ts index aef60677..23ae8ecd 100644 --- a/packages/db/tests/collection-subscribe-changes.test.ts +++ b/packages/db/tests/collection-subscribe-changes.test.ts @@ -2,6 +2,7 @@ import { describe, expect, it, vi } from "vitest" import mitt from "mitt" import { createCollection } from "../src/collection" import { createTransaction } from "../src/transactions" +import { eq } from "../src/query/builder/functions" import type { ChangeMessage, ChangesPayload, @@ -642,4 +643,171 @@ describe(`Collection.subscribeChanges`, () => { // Callback shouldn't be called after unsubscribe expect(callback).not.toHaveBeenCalled() }) + + it(`should correctly handle filtered updates that transition between filter states`, () => { + const callback = vi.fn() + + // Create collection with items that have a status field + const collection = createCollection<{ + id: number + value: string + status: `active` | `inactive` + }>({ + id: `filtered-updates-test`, + getKey: (item) => item.id, + sync: { + sync: ({ begin, write, commit }) => { + // Start with some initial data + begin() + write({ + type: `insert`, + value: { id: 1, value: `item1`, status: `inactive` }, + }) + write({ + type: `insert`, + value: { id: 2, value: `item2`, status: `active` }, + }) + commit() + }, + }, + }) + + const mutationFn: MutationFn = async () => { + // Simulate sync by writing the mutations back + const syncCollection = collection as any + syncCollection.config.sync.sync({ + collection: syncCollection, + begin: () => { + syncCollection.pendingSyncedTransactions.push({ + committed: false, + operations: [], + }) + }, + write: (messageWithoutKey: any) => { + const pendingTransaction = + syncCollection.pendingSyncedTransactions[ + syncCollection.pendingSyncedTransactions.length - 1 + ] + const key = syncCollection.getKeyFromItem(messageWithoutKey.value) + const message = { ...messageWithoutKey, key } + pendingTransaction.operations.push(message) + }, + commit: () => { + const pendingTransaction = + syncCollection.pendingSyncedTransactions[ + syncCollection.pendingSyncedTransactions.length - 1 + ] + pendingTransaction.committed = true + syncCollection.commitPendingTransactions() + }, + markReady: () => { + syncCollection.markReady() + }, + }) + return Promise.resolve() + } + + // Subscribe to changes with a filter for active items only + const unsubscribe = collection.subscribeChanges(callback, { + includeInitialState: true, + where: (row) => eq(row.status, `active`), + }) + + // Should only receive the active item in initial state + expect(callback).toHaveBeenCalledTimes(1) + const initialChanges = callback.mock.calls[0]![0] as ChangesPayload<{ + id: number + value: string + status: `active` | `inactive` + }> + expect(initialChanges).toHaveLength(1) + expect(initialChanges[0]!.key).toBe(2) + expect(initialChanges[0]!.type).toBe(`insert`) + + // Reset mock + callback.mockReset() + + // Test 1: Update an inactive item to active (should emit insert) + const tx1 = createTransaction({ mutationFn }) + tx1.mutate(() => + collection.update(1, (draft) => { + draft.status = `active` + }) + ) + + // Should emit an insert event for the newly active item + expect(callback).toHaveBeenCalledTimes(1) + const insertChanges = callback.mock.calls[0]![0] as ChangesPayload<{ + id: number + value: string + status: `active` | `inactive` + }> + expect(insertChanges).toHaveLength(1) + expect(insertChanges[0]!.type).toBe(`insert`) + expect(insertChanges[0]!.key).toBe(1) + expect(insertChanges[0]!.value.status).toBe(`active`) + + // Reset mock + callback.mockReset() + + // Test 2: Update an active item to inactive (should emit delete) + const tx2 = createTransaction({ mutationFn }) + tx2.mutate(() => + collection.update(2, (draft) => { + draft.status = `inactive` + }) + ) + + // Should emit a delete event for the newly inactive item + expect(callback).toHaveBeenCalledTimes(1) + const deleteChanges = callback.mock.calls[0]![0] as ChangesPayload<{ + id: number + value: string + status: `active` | `inactive` + }> + expect(deleteChanges).toHaveLength(1) + expect(deleteChanges[0]!.type).toBe(`delete`) + expect(deleteChanges[0]!.key).toBe(2) + expect(deleteChanges[0]!.value.status).toBe(`active`) // Should be the previous value (active) + + // Reset mock + callback.mockReset() + + // Test 3: Update an active item while keeping it active (should emit update) + const tx3 = createTransaction({ mutationFn }) + tx3.mutate(() => + collection.update(1, (draft) => { + draft.value = `updated item1` + }) + ) + + // Should emit an update event for the active item + expect(callback).toHaveBeenCalledTimes(1) + const updateChanges = callback.mock.calls[0]![0] as ChangesPayload<{ + id: number + value: string + status: `active` | `inactive` + }> + expect(updateChanges).toHaveLength(1) + expect(updateChanges[0]!.type).toBe(`update`) + expect(updateChanges[0]!.key).toBe(1) + expect(updateChanges[0]!.value.value).toBe(`updated item1`) + + // Reset mock + callback.mockReset() + + // Test 4: Update an inactive item while keeping it inactive (should emit nothing) + const tx4 = createTransaction({ mutationFn }) + tx4.mutate(() => + collection.update(2, (draft) => { + draft.value = `updated inactive item` + }) + ) + + // Should not emit any events for inactive items + expect(callback).not.toHaveBeenCalled() + + // Clean up + unsubscribe() + }) })