Skip to content

Commit

Permalink
compactor experiments
Browse files Browse the repository at this point in the history
  • Loading branch information
jchris committed Feb 18, 2024
1 parent 9cbcd09 commit 340c102
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 52 deletions.
35 changes: 24 additions & 11 deletions packages/encrypted-blockstore/src/loader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ export class Loader implements Loadable {
keyId?: string
seenCompacted: Set<string> = new Set()
writing: Promise<TransactionMeta | void> = Promise.resolve()
compacting: Promise<unknown> = Promise.resolve()

private getBlockCache: Map<string, AnyBlock> = new Map()
private seenMeta: Set<string> = new Set()
Expand Down Expand Up @@ -114,6 +115,7 @@ export class Loader implements Loadable {
}

async mergeDbMetaIntoClock(meta: DbMeta): Promise<void> {
await this.compacting
if (this.isCompacting) {
throw new Error('cannot merge while compacting')
}
Expand Down Expand Up @@ -291,19 +293,27 @@ export class Loader implements Loadable {
// }
// }

async *entries(): AsyncIterableIterator<AnyBlock> {
async *entries(cache = true): AsyncIterableIterator<AnyBlock> {
await this.ready
for (const [, block] of this.getBlockCache) {
yield block
if (cache) {
for (const [, block] of this.getBlockCache) {
yield block
}
} else {
for (const [, block] of this.getBlockCache) {
yield block
}
for (const cid of this.carLog) {
const reader = await this.loadCar(cid)
if (!reader) throw new Error(`missing car reader ${cid.toString()}`)
for await (const block of reader.blocks()) {
const sCid = block.cid.toString()
if (!this.getBlockCache.has(sCid)) {
yield block
}
}
}
}
// assumes we cache all blocks in the carLog
// for (const cid of this.carLog) {
// const reader = await this.loadCar(cid)
// if (!reader) throw new Error(`missing car reader ${cid.toString()}`)
// for await (const block of reader.blocks()) {
// yield block
// }
// }
}

async getBlock(cid: AnyLink): Promise<AnyBlock | undefined> {
Expand Down Expand Up @@ -339,6 +349,9 @@ export class Loader implements Loadable {

if (got) {
this.getBlockCache.set(sCid, got)
} else {
console.log('missing loader block', cid.toString())
// todo try compact lookup
}
return got
}
Expand Down
78 changes: 63 additions & 15 deletions packages/encrypted-blockstore/src/transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,15 @@ import { MemoryBlockstore } from '@web3-storage/pail/block'
// todo get these from multiformats?
import { BlockFetcher as BlockFetcherAPI } from '@web3-storage/pail/api'

import { AnyAnyBlock, AnyAnyLink, AnyBlock, AnyLink, CarMakeable, DbMeta, TransactionMeta as TM } from './types'
import {
AnyAnyBlock,
AnyAnyLink,
AnyBlock,
AnyLink,
CarMakeable,
DbMeta,
TransactionMeta as TM
} from './types'

import { Loader } from './loader'
import type { CID } from 'multiformats'
Expand Down Expand Up @@ -37,6 +45,7 @@ export class EncryptedBlockstore implements BlockFetcher {
compacting = false
ebOpts: BlockstoreOpts
transactions: Set<CarTransaction> = new Set()
lastTxMeta: TransactionMeta | null = null

constructor(ebOpts: BlockstoreOpts) {
this.ebOpts = ebOpts
Expand All @@ -56,6 +65,7 @@ export class EncryptedBlockstore implements BlockFetcher {
): Promise<TransactionMeta> {
const t = new CarTransaction(this)
const done: TransactionMeta = await fn(t)
this.lastTxMeta = done
if (this.loader) {
const car = await this.loader.commit(t, done, opts)
if (this.ebOpts.autoCompact && this.loader.carLog.length > this.ebOpts.autoCompact) {
Expand Down Expand Up @@ -83,7 +93,9 @@ export class EncryptedBlockstore implements BlockFetcher {
if (v) return v
}
if (!this.loader) return
return await this.loader.getBlock(cid)
const b = await this.loader.getBlock(cid)
if (b) return b
console.log('missing t block', cid.toString())
}

async getFile(car: AnyLink, cid: AnyLink, isPublic = false) {
Expand All @@ -97,19 +109,55 @@ export class EncryptedBlockstore implements BlockFetcher {
}

async compact() {
await this.ready
if (!this.loader) throw new Error('loader required to compact')
if (this.loader.carLog.length < 2) return
const compactFn = this.ebOpts.compact // todo add default compaction function
if (!compactFn || this.compacting) return
const blockLog = new CompactionFetcher(this)
this.compacting = true
const meta = await compactFn(blockLog)
await this.loader!.commit(blockLog.loggedBlocks, meta, {
compact: true,
noLoader: true
})
this.compacting = false
let resolveCompacting: (value: unknown) => void = () => {}
let rejectCompacting: (e: Error) => void = () => {}
try {
await this.ready
if (!this.loader) throw new Error('loader required to compact')
if (this.loader.carLog.length < 2) return
const compactFn =
this.ebOpts.compact || ((blocks: CompactionFetcher) => this.defaultCompact(blocks))
if (!compactFn || this.compacting) return
const blockLog = new CompactionFetcher(this)
this.loader.isCompacting = true
const compPromise = new Promise((resolve, reject) => {
resolveCompacting = resolve
rejectCompacting = reject
})
this.loader.compacting = compPromise
this.compacting = true
const meta = await compactFn(blockLog)
await this.loader!.commit(blockLog.loggedBlocks, meta, {
compact: true,
noLoader: true
})
this.compacting = false
this.loader!.isCompacting = false
if (resolveCompacting) resolveCompacting(null)
} finally {
this.compacting = false
this.loader!.isCompacting = false
if (rejectCompacting) rejectCompacting(new Error('compacting failed'))
}
}

async defaultCompact(blocks: CompactionFetcher) {
// console.log('eb compact')
if (!this.loader) {
throw new Error('no loader')
}
if (!this.lastTxMeta) {
throw new Error('no lastTxMeta')
}
for await (const blk of this.loader.entries(false)) {
blocks.loggedBlocks.putSync(blk.cid, blk.bytes)
}
for (const t of this.transactions) {
for await (const blk of t.entries()) {
blocks.loggedBlocks.putSync(blk.cid, blk.bytes)
}
}
return this.lastTxMeta as TransactionMeta
}

async *entries(): AsyncIterableIterator<AnyBlock> {
Expand Down
61 changes: 39 additions & 22 deletions packages/fireproof/src/crdt-helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ async function gatherUpdates(
ops = [event.data] as PutOperation[]
}
for (let i = ops.length - 1; i >= 0; i--) {
const { key, value } = ops[i];
const { key, value } = ops[i]
if (!keys.has(key)) {
// todo option to see all updates
const docValue = await getValueFromLink(blocks, value)
Expand Down Expand Up @@ -306,12 +306,15 @@ export async function doCompact(blockLog: CompactionFetcher, head: ClockHead) {
isCompacting = true

time('compact head')
for (const cid of head) {
const bl = await blockLog.get(cid)
if (!bl) throw new Error('Missing head block: ' + cid.toString())
try {
for (const cid of head) {
const bl = await blockLog.get(cid)
if (!bl) throw new Error('Missing head block: ' + cid.toString())
}
} catch (e) {
console.log('compact head error', e)
}
timeEnd('compact head')

// for await (const blk of blocks.entries()) {
// const bl = await blockLog.get(blk.cid)
// if (!bl) throw new Error('Missing tblock: ' + blk.cid.toString())
Expand All @@ -324,12 +327,15 @@ export async function doCompact(blockLog: CompactionFetcher, head: ClockHead) {
// }

time('compact all entries')
for await (const _entry of getAllEntries(blockLog, head)) {
// result.push(entry)
void 1
try {
for await (const _entry of getAllEntries(blockLog, head)) {
// result.push(entry)
void 1
}
} catch (e) {
console.log('compact getAllEntries error', e)
}
timeEnd('compact all entries')

// time("compact crdt entries")
// for await (const [, link] of entries(blockLog, head)) {
// const bl = await blockLog.get(link)
Expand All @@ -338,26 +344,37 @@ export async function doCompact(blockLog: CompactionFetcher, head: ClockHead) {
// timeEnd("compact crdt entries")

time('compact clock vis')
// eslint-disable-next-line @typescript-eslint/no-unused-vars
for await (const _line of vis(blockLog, head)) {
void 1
try {
// eslint-disable-next-line @typescript-eslint/no-unused-vars
for await (const _line of vis(blockLog, head)) {
void 1
}
} catch (e) {
console.log('compact vis error', e)
}
timeEnd('compact clock vis')

time('compact root')
const result = await root(blockLog, head)
timeEnd('compact root')
timeEnd('compact clock vis')

time('compact root blocks')
for (const { cid, bytes } of [...result.additions, ...result.removals]) {
blockLog.loggedBlocks.putSync(cid, bytes)
try {
time('compact root')
const result = await root(blockLog, head)
timeEnd('compact root')
time('compact root blocks')
for (const { cid, bytes } of [...result.additions, ...result.removals]) {
blockLog.loggedBlocks.putSync(cid, bytes)
}
timeEnd('compact root blocks')
} catch (e) {
console.log('compact root error', e)
}
timeEnd('compact root blocks')

time('compact changes')
await clockChangesSince(blockLog, head, [], {})
try {
await clockChangesSince(blockLog, head, [], { dirty: false })
} catch (e) {
console.log('clockChangesSince error', e)
}
timeEnd('compact changes')

isCompacting = false
}

Expand Down
9 changes: 5 additions & 4 deletions packages/fireproof/src/crdt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,11 @@ export class CRDT {
const crdtMeta = meta as unknown as CRDTMeta
await this.clock.applyHead(crdtMeta.head, [])
},
compact: async (blocks: CompactionFetcher) => {
await doCompact(blocks, this.clock.head)
return { head: this.clock.head } as TransactionMeta
},
// compact: async (blocks: CompactionFetcher) => {
// // console.log('compacting fp')
// await doCompact(blocks, this.clock.head)
// return { head: this.clock.head } as TransactionMeta
// },
autoCompact: this.opts.autoCompact || 100,
crypto: this.opts.crypto || crypto,
store: this.opts.store || store,
Expand Down

0 comments on commit 340c102

Please sign in to comment.