Skip to content

Commit

Permalink
feat: query result create and archive (#2)
Browse files Browse the repository at this point in the history
Adds tooling to create and archive query results.
  • Loading branch information
alanshaw authored Nov 7, 2024
1 parent 5771819 commit e6d5fc5
Show file tree
Hide file tree
Showing 5 changed files with 239 additions and 72 deletions.
4 changes: 4 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@
"./errors": {
"import": "./src/errors.js",
"types": "./dist/src/errors.d.ts"
},
"./query-result": {
"import": "./src/query-result.js",
"types": "./dist/src/query-result.d.ts"
}
},
"dependencies": {
Expand Down
17 changes: 12 additions & 5 deletions src/api.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
import { MultihashDigest, Link } from 'multiformats'
import { Delegation, Failure, Result, Principal, IPLDView } from '@ucanto/interface'
import { DecodeFailure, ShardedDAGIndex, UnknownFormat } from '@storacha/blob-index/types'
import { Delegation, Failure, Result, Principal, IPLDView, IPLDBlock } from '@ucanto/interface'
import { DecodeFailure, ShardedDAGIndex, ShardedDAGIndexView, UnknownFormat } from '@storacha/blob-index/types'

export { MultihashDigest, Link }
export { Delegation, Failure, Result }
export { DecodeFailure, ShardedDAGIndex, UnknownFormat }
export { Delegation, Failure, Result, Principal, IPLDView, IPLDBlock }
export { DecodeFailure, ShardedDAGIndex, ShardedDAGIndexView, UnknownFormat }

export interface IndexingServiceClient {
queryClaims (q: Query): Promise<Result<QueryOk, QueryError>>
}

/**
* Match narrows parameters for locating providers/claims for a set of multihashes.
Expand All @@ -21,9 +25,12 @@ export interface Query {
match?: Match
}

export interface QueryOk extends IPLDView {
export interface QueryOk extends QueryResult {}

export interface QueryResult extends IPLDView {
claims: Map<string, Delegation>
indexes: Map<string, ShardedDAGIndex>
archive (): Promise<Result<Uint8Array>>
}

export type QueryError =
Expand Down
73 changes: 6 additions & 67 deletions src/index.js
Original file line number Diff line number Diff line change
@@ -1,32 +1,14 @@
import * as CBOR from '@ipld/dag-cbor'
import { CID } from 'multiformats/cid'
import { base58btc } from 'multiformats/bases/base58'
import { z } from 'zod'
import { ok, error } from '@ucanto/core'
import * as CAR from '@ucanto/core/car'
import * as Delegation from '@ucanto/core/delegation'
import * as ShardedDAGIndex from '@storacha/blob-index/sharded-dag-index'
import { InvalidQueryError, NetworkError, UnknownFormatError, DecodeError } from './errors.js'
import { error } from '@ucanto/core'
import * as QueryResult from './query-result.js'
import { InvalidQueryError, NetworkError } from './errors.js'

/** @import { Result, Link, Query, QueryOk, QueryError } from './api.js' */
/** @import { IndexingServiceClient, Result, Query, QueryOk, QueryError } from './api.js' */

const SERVICE_URL = 'https://indexing.storacha.network'
const CLAIMS_PATH = '/claims'

const QueryResult = z
.object({
'index/query/[email protected]': z.object({
claims: z
.array(
z.instanceof(CID).transform(cid => /** @type {Link} */ (cid))
),
indexes: z
.record(z.string(), z.instanceof(CID))
.transform((record) => Object.values(record)),
}),
})
.transform((object) => object['index/query/[email protected]'])

/** @implements {IndexingServiceClient} */
export class Client {
#fetch
#serviceURL
Expand All @@ -44,8 +26,6 @@ export class Client {

/**
* @param {Query} query
* @param {object} [options]
* @param {typeof fetch} options.fetch
* @returns {Promise<Result<QueryOk, QueryError>>}
*/
async queryClaims({ hashes = [], match = { subject: [] } }) {
Expand All @@ -69,47 +49,6 @@ export class Client {
return error(new NetworkError('missing response body'))
}

const { roots, blocks } = CAR.decode(new Uint8Array(await response.arrayBuffer()))
if (roots.length !== 1) {
return error(new DecodeError('expected exactly one root'))
}

let parsed
try {
parsed = QueryResult.parse(await CBOR.decode(roots[0].bytes))
} catch (/** @type {any} */ err) {
return error(new UnknownFormatError(`parsing root block: ${err.message}`))
}

const claims = new Map()
for (const root of parsed.claims) {
let claim
try {
claim = Delegation.view({ root, blocks })
} catch (/** @type {any} */ err) {
return error(new DecodeError(`decoding claim: ${root}: ${err.message}`))
}
claims.set(root.toString(), claim)
}

const indexes = new Map()
for (const link of parsed.indexes) {
const block = blocks.get(link.toString())
if (!block) {
return error(new DecodeError(`missing index: ${link}`))
}
const { ok: index, error: err } = ShardedDAGIndex.extract(block.bytes)
if (!index) {
return error(new DecodeError(`extracting index: ${link}: ${err.message}`))
}
indexes.set(link.toString(), index)
}

return ok({
root: roots[0],
iterateIPLDBlocks: () => blocks.values(),
claims,
indexes
})
return QueryResult.extract(new Uint8Array(await response.arrayBuffer()))
}
}
185 changes: 185 additions & 0 deletions src/query-result.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
/** @import * as API from './api.js' */
import * as CBOR from '@ipld/dag-cbor'
import { CID } from 'multiformats/cid'
import { create as createLink } from 'multiformats/link'
import { z } from 'zod'
import { ok, error } from '@ucanto/core'
import * as CAR from '@ucanto/core/car'
import * as Delegation from '@ucanto/core/delegation'
import * as ShardedDAGIndex from '@storacha/blob-index/sharded-dag-index'
import { UnknownFormatError, DecodeError } from './errors.js'
import { sha256 } from 'multiformats/hashes/sha2'

const QueryResultSchema = z
.object({
'index/query/[email protected]': z.object({
claims: z
.array(
z.instanceof(CID).transform(cid => /** @type {API.Link} */ (cid))
),
indexes: z
.record(z.string(), z.instanceof(CID))
.transform((record) => Object.values(record)),
}),
})
.transform((object) => object['index/query/[email protected]'])

/**
* @param {{ root: API.Link, blocks: Map<string, API.IPLDBlock> }} arg
* @returns {Promise<API.Result<API.QueryResult, API.DecodeFailure|API.UnknownFormat>>}
*/
export const create = async ({ root, blocks }) => {
const rootBlock = blocks.get(root.toString())
if (!rootBlock) {
return error(new DecodeError(`missing root block: ${root}`))
}
return view({ root: rootBlock, blocks })
}

/**
* @param {{ root: API.IPLDBlock, blocks: Map<string, API.IPLDBlock> }} arg
* @returns {Promise<API.Result<API.QueryResult, API.DecodeFailure|API.UnknownFormat>>}
*/
export const view = async ({ root, blocks }) => {
let parsed
try {
parsed = QueryResultSchema.parse(CBOR.decode(root.bytes))
} catch (/** @type {any} */ err) {
return error(new UnknownFormatError(`parsing root block: ${err.message}`))
}

const claims = new Map()
for (const root of parsed.claims) {
let claim
try {
claim = Delegation.view({ root, blocks })
} catch (/** @type {any} */ err) {
return error(new DecodeError(`decoding claim: ${root}: ${err.message}`))
}
claims.set(root.toString(), claim)
}

const indexes = new Map()
for (const link of parsed.indexes) {
const block = blocks.get(link.toString())
if (!block) {
return error(new DecodeError(`missing index: ${link}`))
}
const { ok: index, error: err } = ShardedDAGIndex.extract(block.bytes)
if (!index) {
return error(new DecodeError(`extracting index: ${link}: ${err.message}`))
}
indexes.set(link.toString(), index)
}

return ok(new QueryResult({ root, blocks, data: { claims, indexes } }))
}

/**
* @typedef {string} ContextID
* @param {{ claims: API.Delegation[], indexes: Map<ContextID, API.ShardedDAGIndexView> }} param
*/
export const from = async ({ claims, indexes }) => {
const blocks = new Map()
const rootData = {
'index/query/[email protected]': {
claims: /** @type {API.Link[]} **/ ([]),
indexes: /** @type {Record<string, API.Link>} */ ({})
}
}
const data = { claims: new Map(), indexes: new Map() }

for (const claim of claims) {
rootData['index/query/[email protected]'].claims.push(claim.link())
for (const block of claim.iterateIPLDBlocks()) {
blocks.set(block.cid.toString(), block)
}
data.claims.set(claim.link().toString(), claim)
}

for (const [contextID, index] of indexes.entries()) {
const result = await index.archive()
if (!result.ok) {
return result
}
const digest = await sha256.digest(result.ok)
const link = createLink(0x0202, digest)
rootData['index/query/[email protected]'].indexes[contextID] = link
blocks.set(link.toString(), { cid: link, bytes: result.ok })
data.indexes.set(link.toString(), index)
}

const rootBytes = CBOR.encode(rootData)
const rootDigest = await sha256.digest(rootBytes)
const rootLink = createLink(CBOR.code, rootDigest)
const root = { cid: rootLink, bytes: rootBytes }
blocks.set(rootLink.toString(), root)

return ok(new QueryResult({ root, blocks, data }))
}

class QueryResult {
#root
#blocks
#data

/**
* @param {{
* root: API.IPLDBlock
* blocks: Map<string, API.IPLDBlock>
* data: {
* claims: Map<string, API.Delegation>
* indexes: Map<string, API.ShardedDAGIndex>
* }
* }} param
*/
constructor ({ root, blocks, data }) {
this.#root = root
this.#blocks = blocks
this.#data = data
}

get root () {
return this.#root
}

iterateIPLDBlocks () {
return this.#blocks.values()
}

get claims () {
return this.#data.claims
}

get indexes () {
return this.#data.indexes
}

archive () {
return archive(this)
}
}

/**
* @param {API.QueryResult} result
* @returns {Promise<API.Result<Uint8Array>>}
*/
export const archive = async (result) => {
const blocks = new Map()
for (const block of result.iterateIPLDBlocks()) {
blocks.set(block.cid.toString(), block)
}
return ok(CAR.encode({ roots: [result.root], blocks }))
}

/**
* @param {Uint8Array} bytes
* @returns {Promise<API.Result<API.QueryResult, API.DecodeFailure|API.UnknownFormat>>}
*/
export const extract = async (bytes) => {
const { roots, blocks } = CAR.decode(bytes)
if (roots.length !== 1) {
return error(new DecodeError('expected exactly one root'))
}
return view({ root: roots[0], blocks })
}
32 changes: 32 additions & 0 deletions test/query-result.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import * as fs from 'node:fs'
import * as path from 'node:path'
import { describe, it } from 'mocha'
import { assert } from 'chai'
import * as QueryResult from '../src/query-result.js'

describe('query result', () => {
it('round trip', async () => {
const digestString = 'zQmRm3SMS4EbiKYy7VeV3zqXqzyr76mq9b2zg3Tij3VhKUG'
const fixturePath = path.join(import.meta.dirname, 'fixtures', `${digestString}.queryresult.car`)
const carBytes = await fs.promises.readFile(fixturePath)

const extract0 = await QueryResult.extract(carBytes)
assert(extract0.ok)
assert(!extract0.error)

assert(extract0.ok.claims.size > 0)
assert(extract0.ok.indexes.size > 0)

const archive = await extract0.ok.archive()
assert(archive.ok)
assert(!archive.error)

const extract1 = await QueryResult.extract(archive.ok)
assert(extract1.ok)
assert(!extract1.error)

assert.equal(extract0.ok.root.toString(), extract1.ok.root.toString())
assert.equal(extract0.ok.claims.size, extract1.ok.claims.size)
assert.equal(extract0.ok.indexes.size, extract1.ok.indexes.size)
})
})

0 comments on commit e6d5fc5

Please sign in to comment.