Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/kind-squids-rest.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@tanstack/db-ivm": patch
"@tanstack/db": patch
---

Add `groupByKey` and `groupKeyFn` options to `orderByWithFractionalIndex` and `topKWithFractionalIndex`. This is groundwork for hierarchical “includes” projections in TanStack DB, where child collections need to enforce limits within each parent’s slice of the stream rather than across the entire dataset. ([Issue #288](https://github.com/TanStack/db/issues/288))
28 changes: 26 additions & 2 deletions packages/db-ivm/src/operators/orderBy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,17 @@ export interface OrderByOptions<Ve> {
offset?: number
}

type OrderByWithFractionalIndexOptions<Ve> = OrderByOptions<Ve> & {
type OrderByWithFractionalIndexOptions<
Ve,
KeyType = unknown,
ValueType = unknown,
> = OrderByOptions<Ve> & {
setSizeCallback?: (getSize: () => number) => void
setWindowFn?: (
windowFn: (options: { offset?: number; limit?: number }) => void
) => void
groupByKey?: boolean
groupKeyFn?: (key: KeyType, value: ValueType) => unknown
}

/**
Expand Down Expand Up @@ -142,7 +148,11 @@ export function orderByWithFractionalIndexBase<
valueExtractor: (
value: T extends KeyValue<unknown, infer V> ? V : never
) => Ve,
options?: OrderByWithFractionalIndexOptions<Ve>
options?: OrderByWithFractionalIndexOptions<
Ve,
T extends KeyValue<infer K, unknown> ? K : never,
T extends KeyValue<unknown, infer V> ? V : never
>
) {
type KeyType = T extends KeyValue<infer K, unknown> ? K : never
type ValueType = T extends KeyValue<unknown, infer V> ? V : never
Expand All @@ -160,6 +170,19 @@ export function orderByWithFractionalIndexBase<
return 1
})

type GroupKeyFn = (key: KeyType, value: ValueType) => unknown
const shouldGroupByKey =
options?.groupKeyFn !== undefined ? true : (options?.groupByKey ?? true)

const resolvedGroupKeyFn: GroupKeyFn | undefined =
options?.groupKeyFn ??
(shouldGroupByKey
? (((key: KeyType) =>
Array.isArray(key)
? (key as unknown as Array<unknown>)[0]
: key) as GroupKeyFn)
: undefined)

return (
stream: IStreamBuilder<T>
): IStreamBuilder<[KeyType, [ValueType, string]]> => {
Expand All @@ -172,6 +195,7 @@ export function orderByWithFractionalIndexBase<
offset,
setSizeCallback,
setWindowFn,
groupKeyFn: resolvedGroupKeyFn,
}
),
consolidate()
Expand Down
177 changes: 114 additions & 63 deletions packages/db-ivm/src/operators/topKWithFractionalIndex.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@ import type { HRange } from "../utils.js"
import type { DifferenceStreamReader } from "../graph.js"
import type { IStreamBuilder, PipedOperator } from "../types.js"

export interface TopKWithFractionalIndexOptions {
export interface TopKWithFractionalIndexOptions<K = unknown, V = unknown> {
limit?: number
offset?: number
setSizeCallback?: (getSize: () => number) => void
setWindowFn?: (
windowFn: (options: { offset?: number; limit?: number }) => void
) => void
groupKeyFn?: (key: K, value: V) => unknown
}

export type TopKChanges<V> = {
Expand All @@ -36,6 +37,13 @@ export type TopKMoveChanges<V> = {
moveOuts: Array<IndexedValue<V>>
}

const DEFAULT_GROUP = Symbol(`topk-default-group`)

type GroupState<K, T> = {
multiplicities: Map<K, number>
topK: TopK<TaggedValue<K, T>>
}

/**
* A topK data structure that supports insertions and deletions
* and returns changes to the topK.
Expand Down Expand Up @@ -243,40 +251,38 @@ export class TopKWithFractionalIndexOperator<K, T> extends UnaryOperator<
[K, T],
[K, IndexedValue<T>]
> {
#index: Map<K, number> = new Map() // maps keys to their multiplicity

/**
* topK data structure that supports insertions and deletions
* and returns changes to the topK.
*/
#topK: TopK<TaggedValue<K, T>>
#groupStates: Map<unknown, GroupState<K, T>> = new Map()
#groupKeyFn?: (key: K, value: T) => unknown
#compareTaggedValues: (a: TaggedValue<K, T>, b: TaggedValue<K, T>) => number
#offset: number
#limit: number

constructor(
id: number,
inputA: DifferenceStreamReader<[K, T]>,
output: DifferenceStreamWriter<[K, IndexedValue<T>]>,
comparator: (a: T, b: T) => number,
options: TopKWithFractionalIndexOptions
options: TopKWithFractionalIndexOptions<K, T>
) {
super(id, inputA, output)
const limit = options.limit ?? Infinity
const offset = options.offset ?? 0
const compareTaggedValues = (
this.#groupKeyFn = options.groupKeyFn
this.#limit = options.limit ?? Infinity
this.#offset = options.offset ?? 0

this.#compareTaggedValues = (
a: TaggedValue<K, T>,
b: TaggedValue<K, T>
) => {
// First compare on the value
const valueComparison = comparator(getVal(a), getVal(b))
if (valueComparison !== 0) {
return valueComparison
}
// If the values are equal, compare on the tag (object identity)
const tieBreakerA = getTag(a)
const tieBreakerB = getTag(b)
return tieBreakerA - tieBreakerB
}
this.#topK = this.createTopK(offset, limit, compareTaggedValues)
options.setSizeCallback?.(() => this.#topK.size)

options.setSizeCallback?.(() => this.getTotalSize())
options.setWindowFn?.(this.moveTopK.bind(this))
}

Expand All @@ -288,28 +294,92 @@ export class TopKWithFractionalIndexOperator<K, T> extends UnaryOperator<
return new TopKArray(offset, limit, comparator)
}

/**
* Moves the topK window based on the provided offset and limit.
* Any changes to the topK are sent to the output.
*/
private getTotalSize(): number {
let size = 0
for (const state of this.#groupStates.values()) {
size += state.topK.size
}
return size
}

private resolveGroupKey(key: K, value: T): unknown {
return this.#groupKeyFn ? this.#groupKeyFn(key, value) : DEFAULT_GROUP
}

private getOrCreateGroupState(groupKey: unknown): GroupState<K, T> {
let state = this.#groupStates.get(groupKey)
if (!state) {
state = {
multiplicities: new Map(),
topK: this.createTopK(
this.#offset,
this.#limit,
this.#compareTaggedValues
),
}
this.#groupStates.set(groupKey, state)
}
return state
}

private updateMultiplicity(
state: GroupState<K, T>,
key: K,
multiplicity: number
): { oldMultiplicity: number; newMultiplicity: number } {
if (multiplicity === 0) {
const current = state.multiplicities.get(key) ?? 0
return { oldMultiplicity: current, newMultiplicity: current }
}

const oldMultiplicity = state.multiplicities.get(key) ?? 0
const newMultiplicity = oldMultiplicity + multiplicity
if (newMultiplicity === 0) {
state.multiplicities.delete(key)
} else {
state.multiplicities.set(key, newMultiplicity)
}
return { oldMultiplicity, newMultiplicity }
}

private cleanupGroupIfEmpty(groupKey: unknown, state: GroupState<K, T>) {
if (state.multiplicities.size === 0 && state.topK.size === 0) {
this.#groupStates.delete(groupKey)
}
}

moveTopK({ offset, limit }: { offset?: number; limit?: number }) {
if (!(this.#topK instanceof TopKArray)) {
throw new Error(
`Cannot move B+-tree implementation of TopK with fractional index`
)
if (offset !== undefined) {
this.#offset = offset
}
if (limit !== undefined) {
this.#limit = limit
}

const result: Array<[[K, IndexedValue<T>], number]> = []
let hasChanges = false

const diff = this.#topK.move({ offset, limit })
for (const state of this.#groupStates.values()) {
if (!(state.topK instanceof TopKArray)) {
throw new Error(
`Cannot move B+-tree implementation of TopK with fractional index`
)
}

const diff = state.topK.move({
offset: this.#offset,
limit: this.#limit,
})

diff.moveIns.forEach((moveIn) => this.handleMoveIn(moveIn, result))
diff.moveOuts.forEach((moveOut) => this.handleMoveOut(moveOut, result))
diff.moveIns.forEach((moveIn) => this.handleMoveIn(moveIn, result))
diff.moveOuts.forEach((moveOut) => this.handleMoveOut(moveOut, result))

if (diff.changes) {
// There are changes to the topK
// it could be that moveIns and moveOuts are empty
// because the collection is lazy, so we will run the graph again to load the data
if (diff.changes) {
hasChanges = true
}
}

if (hasChanges) {
this.output.sendData(new MultiSet(result))
}
}
Expand All @@ -334,32 +404,31 @@ export class TopKWithFractionalIndexOperator<K, T> extends UnaryOperator<
multiplicity: number,
result: Array<[[K, IndexedValue<T>], number]>
): void {
const { oldMultiplicity, newMultiplicity } = this.addKey(key, multiplicity)
const groupKey = this.resolveGroupKey(key, value)
const state = this.getOrCreateGroupState(groupKey)

const { oldMultiplicity, newMultiplicity } = this.updateMultiplicity(
state,
key,
multiplicity
)

let res: TopKChanges<TaggedValue<K, T>> = {
moveIn: null,
moveOut: null,
}
if (oldMultiplicity <= 0 && newMultiplicity > 0) {
// The value was invisible but should now be visible
// Need to insert it into the array of sorted values
const taggedValue = tagValue(key, value)
res = this.#topK.insert(taggedValue)
res = state.topK.insert(taggedValue)
} else if (oldMultiplicity > 0 && newMultiplicity <= 0) {
// The value was visible but should now be invisible
// Need to remove it from the array of sorted values
const taggedValue = tagValue(key, value)
res = this.#topK.delete(taggedValue)
} else {
// The value was invisible and it remains invisible
// or it was visible and remains visible
// so it doesn't affect the topK
res = state.topK.delete(taggedValue)
}

this.handleMoveIn(res.moveIn, result)
this.handleMoveOut(res.moveOut, result)

return
this.cleanupGroupIfEmpty(groupKey, state)
}

private handleMoveIn(
Expand Down Expand Up @@ -387,24 +456,6 @@ export class TopKWithFractionalIndexOperator<K, T> extends UnaryOperator<
result.push([[k, [val, index]], -1])
}
}

private getMultiplicity(key: K): number {
return this.#index.get(key) ?? 0
}

private addKey(
key: K,
multiplicity: number
): { oldMultiplicity: number; newMultiplicity: number } {
const oldMultiplicity = this.getMultiplicity(key)
const newMultiplicity = oldMultiplicity + multiplicity
if (newMultiplicity === 0) {
this.#index.delete(key)
} else {
this.#index.set(key, newMultiplicity)
}
return { oldMultiplicity, newMultiplicity }
}
}

/**
Expand All @@ -419,9 +470,9 @@ export class TopKWithFractionalIndexOperator<K, T> extends UnaryOperator<
*/
export function topKWithFractionalIndex<KType, T>(
comparator: (a: T, b: T) => number,
options?: TopKWithFractionalIndexOptions
options?: TopKWithFractionalIndexOptions<KType, T>
): PipedOperator<[KType, T], [KType, IndexedValue<T>]> {
const opts = options || {}
const opts: TopKWithFractionalIndexOptions<KType, T> = options ?? {}

return (
stream: IStreamBuilder<[KType, T]>
Expand Down
4 changes: 2 additions & 2 deletions packages/db-ivm/src/operators/topKWithFractionalIndexBTree.ts
Original file line number Diff line number Diff line change
Expand Up @@ -277,9 +277,9 @@ export class TopKWithFractionalIndexBTreeOperator<
*/
export function topKWithFractionalIndexBTree<KType, T>(
comparator: (a: T, b: T) => number,
options?: TopKWithFractionalIndexOptions
options?: TopKWithFractionalIndexOptions<KType, T>
): PipedOperator<[KType, T], [KType, IndexedValue<T>]> {
const opts = options || {}
const opts: TopKWithFractionalIndexOptions<KType, T> = options ?? {}

if (BTree === undefined) {
throw new Error(
Expand Down
Loading
Loading