Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: indexer support for cbor encoding #139

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 83 additions & 6 deletions indexer/lib/advertisement-walker.js
Original file line number Diff line number Diff line change
Expand Up @@ -311,9 +311,16 @@ export async function fetchAdvertisedPayload (providerAddress, advertisementCid,
}
}

debug('entriesChunk %s %j', entriesCid, entriesChunk.Entries.slice(0, 5))
const entryHash = entriesChunk.Entries[0]['/'].bytes
const payloadCid = CID.create(1, 0x55 /* raw */, multihash.decode(Buffer.from(entryHash, 'base64'))).toString()
let payloadCid
try {
payloadCid = processEntries(entriesCid, entriesChunk)
} catch (err) {
debug('Error processing entries: %s', err)
return {
error: /** @type {const} */('ENTRIES_NOT_RETRIEVABLE'),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's create a new error code for this error case. We were able to retrieve the entries, but we are not able to extract the payload CID from the retrieved chunk. The code ENTRIES_NOT_RETRIEVABLE does not describe such situation correctly.

previousAdvertisementCid
}
}

return {
previousAdvertisementCid,
Expand All @@ -328,14 +335,39 @@ export async function fetchAdvertisedPayload (providerAddress, advertisementCid,
* @param {number} [options.fetchTimeout]
* @returns {Promise<unknown>}
*/
async function fetchCid (providerBaseUrl, cid, { fetchTimeout } = {}) {
const url = new URL(cid, new URL('/ipni/v1/ad/_cid_placeholder_', providerBaseUrl))
export async function fetchCid (providerBaseUrl, cid, { fetchTimeout } = {}) {
let url = new URL(providerBaseUrl)

// Check if the URL already has a path
if (!(url.pathname && url.pathname !== '/')) {
// If no path, add the standard path with a placeholder
url = new URL('/ipni/v1/ad/_cid_placeholder_', providerBaseUrl)
} else {
// If there's already a path, append the additional path
url = new URL(`${url.pathname}/ipni/v1/ad/_cid_placeholder_`, url.origin)
}
url = new URL(cid, url)
debug('Fetching %s', url)
try {
const res = await fetch(url, { signal: AbortSignal.timeout(fetchTimeout ?? 30_000) })
debug('Response from %s → %s %o', url, res.status, res.headers)
await assertOkResponse(res)
return await res.json()

// Determine the codec based on the CID
const parsedCid = CID.parse(cid)
const codec = parsedCid.code

switch (codec) {
case 297: // DAG-JSON: https://github.com/multiformats/multicodec/blob/master/table.csv#L113
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not a permalink. If a new entry is added to table.csv, the line L113 may no longer point to DAG-JSON entry. You can create a permalink by pressing y - GitHub web UI will change the URL to point to the commit SHA you are viewing.

I think it's not necessary to link to specific lines in the multicodec table. I would simply add a link to https://github.com/multiformats/multicodec/blob/master/table.csv before the switch statement.

// List of codecs:
// https://github.com/multiformats/multicodec/blob/master/table.csv
switch (codec)  {
  case 297: // DAG-JSON
  case 113: // DAG-CBOR
}

I'll let you decide which approach you prefer. Please update line 364 below in sync with this line.

return await res.json()

case 113: { // DAG-CBOR: https://github.com/multiformats/multicodec/blob/master/table.csv#L46
const buffer = await res.arrayBuffer()
return cbor.decode(new Uint8Array(buffer)) }

default:
throw new Error(`Unknown codec ${codec} for CID ${cid}`)
}
} catch (err) {
if (err && typeof err === 'object') {
Object.assign(err, { url })
Expand Down Expand Up @@ -371,3 +403,48 @@ export function parseMetadata (meta) {
return { protocol }
}
}

/**
* Process entries from either DAG-JSON or DAG-CBOR format
* @param {string} entriesCid - The CID of the entries
* @param {{Entries: Array<unknown>}} entriesChunk - The decoded entries
* @returns {string} The payload CID
*/
export function processEntries (entriesCid, entriesChunk) {
if (!entriesChunk.Entries || !entriesChunk.Entries.length) {
throw new Error('No entries found in DAG-CBOR response')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure it's only DAG-CBOR response that can have no entries? Cannot it happen with DAG-JSON response too?

Suggested change
throw new Error('No entries found in DAG-CBOR response')
throw new Error(`No entries found in the response for ${entriesCid}`)

}
const parsedCid = CID.parse(entriesCid)
const codec = parsedCid.code
let entryBytes
switch (codec) {
case 297: {
// DAG-JSON
// For DAG-JSON format, the entry is a base64 encoded string
const entry = entriesChunk.Entries[0]
// Check that entry is an object with a '/' property
if (!entry || typeof entry !== 'object' || !('/' in entry)) {
throw new Error('DAG-JSON entry must have a "/" property')
}

// Verify the '/' property is an object with 'bytes' property
// In DAG-JSON, CIDs are represented as objects with a '/' property that contains 'bytes'
if (!entry['/'] || typeof entry['/'] !== 'object' || !('bytes' in entry['/'])) {
throw new Error('DAG-JSON entry\'s "/" property must be a CID object with a bytes property')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I understand, IPNI entries are multihashes, not CIDs. (That's why we have to convert them to CIDs by adding a Raw codec.) In that case, the error message is not correct.

Suggested change
throw new Error('DAG-JSON entry\'s "/" property must be a CID object with a bytes property')
throw new Error('DAG-JSON entry\'s "/" property must be an object with a "bytes" property')

}

const entryHash = entry['/'].bytes
entryBytes = Buffer.from(String(entryHash), 'base64')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it necessary to convert entryHash to a String? I believe it is already a string.

The original version was using this conversion:

Buffer.from(entryHash, 'base64')

What am I missing?

If you want to better handle non-string bytes values, then we should reject them.

if (typeof entryHash !== 'string') {
  throw new Error('DAG-JSON entry\'s ["/"]["bytes"] property must be a string')
}

That should satisfy the TypeScript checker to allow you to call Buffer.from(entryHash).

break }
case 113: {
// DAG-CBOR
// For DAG-CBOR format, the entry is already a Uint8Array with the multihash
entryBytes = entriesChunk.Entries[0]
assert(entryBytes instanceof Uint8Array, 'DAG-CBOR entry must be a Uint8Array')
break }
default:
throw new Error(`Unsupported codec ${codec}`)
}
assert(entryBytes, 'Entry bytes must be set')
return CID.create(1, 0x55 /* raw */, multihash.decode(entryBytes)).toString()
}
263 changes: 261 additions & 2 deletions indexer/test/advertisement-walker.test.js
Original file line number Diff line number Diff line change
@@ -1,19 +1,25 @@
import { RedisRepository } from '@filecoin-station/spark-piece-indexer-repository'
import { Redis } from 'ioredis'
import assert from 'node:assert'
import { after, before, beforeEach, describe, it } from 'node:test'
import { after, afterEach, before, beforeEach, describe, it, mock } from 'node:test'
import { setTimeout } from 'node:timers/promises'
import {
fetchAdvertisedPayload,
processNextAdvertisement,
walkOneStep
, processEntries,
fetchCid
} from '../lib/advertisement-walker.js'
import pRetry from 'p-retry'
import { givenHttpServer } from './helpers/http-server.js'
import { FRISBII_ADDRESS, FRISBII_AD_CID } from './helpers/test-data.js'
import { assertOkResponse } from '../lib/http-assertions.js'
import * as stream from 'node:stream'
import { pipeline } from 'node:stream/promises'

import * as cbor from '@ipld/dag-cbor'
import { CID } from 'multiformats/cid'
import * as crypto from 'node:crypto'
import * as multihash from 'multiformats/hashes/digest'
/** @import { ProviderInfo, WalkerState } from '../lib/typings.js' */

// TODO(bajtos) We may need to replace this with a mock index provider
Expand Down Expand Up @@ -517,3 +523,256 @@ describe('data schema for REST API', () => {
})
})
})

describe('processEntries', () => {
// Use a real DAG-JSON CID that will naturally have codec 0x0129 (297)
// CIDs that start with 'bagu' are DAG-JSON encoded
const dagJsonCid = 'baguqeeraa5mjufqdwuwrrrqboctnn3vhdlq63rj3hce2igpzbmae7sazkfea'
// Use a real DAG-CBOR CID that will naturally have codec 0x71 (113)
// CIDs that start with 'bafy' are DAG-CBOR encoded
const dagCborCid = 'bafyreibpxkmu65ezxy7rynxotbghfz3ktiapjisntepd67hghfn4hde3na'
const testData = 'test data for multihash'
// Create a proper multihash from this digest
const mh = multihash.create(0x12, crypto.createHash('sha256').update(testData).digest())
// @ts-ignore
const entryBytes = Buffer.from(mh.bytes).toString('base64')
const dagJsonChunk = {
Entries: [
{
'/': {
bytes: entryBytes
}
}
]
}

const dagCborChunk = {
Entries: [mh.bytes]
}
it('processes DAG-JSON entries correctly', () => {
// Process the entries with the real CID
const result = processEntries(dagJsonCid, dagJsonChunk)

// Verify the result is a valid CID string
assert(CID.parse(result), 'Result should be a parseable CID')
})

it('processes DAG-CBOR entries correctly', () => {
// Process the entries with the real CID
const result = processEntries(dagCborCid, dagCborChunk)

// Verify the result is a valid CID string
assert(CID.parse(result), 'Result should be a parseable CID')
})

// Error handling tests
it('throws an error when entries array is empty', () => {
assert.throws(
() => processEntries(dagCborCid, { Entries: [] }),
/No entries found/
)
})

it('throws an error when Entries field is missing', () => {
assert.throws(
// @ts-ignore
() => processEntries(dagCborCid, {}),
/No entries found/
)
})

it('throws an error for unsupported codec', () => {
// Use a CID with an unsupported codec
const unsupportedCid = 'bafkreigrnnl64xuevvkhknbhrcqzbdvvmqnchp7ae2a4ulninsjoc5svoq'

assert.throws(
() => processEntries(unsupportedCid, dagJsonChunk),
/Unsupported codec/
)
})

// Data integrity test using real multihash operations
it('correctly creates a CID from entry data', () => {
// Process the entries
const result = processEntries(dagJsonCid, dagJsonChunk)

// Create the expected CID directly
const expectedCid = CID.create(1, 0x55, mh).toString()

// They should match
assert.strictEqual(result, expectedCid, 'CID should match the one created directly')
})

// Test with entries from CBOR encoding/decoding
it('correctly handles DAG-CBOR entries serialized with @ipld/dag-cbor', () => {
// Process the entries
const result = processEntries(dagCborCid, dagCborChunk)

// Create the expected CID directly
const expectedCid = CID.create(1, 0x55, mh).toString()

// They should match
assert.strictEqual(result, expectedCid, 'CID should match the one created directly')
})

// Error case tests with real data
it('handles malformed base64 in DAG-JSON gracefully', () => {
const malformedChunk = {
Entries: [
{
'/': {
bytes: 'This-is-not-valid-base64!'
}
}
]
}

// We expect an error when processing this malformed data
assert.throws(
() => processEntries(dagJsonCid, malformedChunk),
/Incorrect length/
)
})

it('handles invalid multihash in DAG-CBOR gracefully', () => {
const invalidChunk = {
Entries: [new Uint8Array([0, 1, 2, 3])] // Too short to be a valid multihash
}

// We expect an error when processing this invalid multihash
assert.throws(
() => processEntries(dagCborCid, invalidChunk),
/Incorrect length/
)
})
})

describe('fetchCid', () => {
// Store the original fetch function before each test
/**
* @type {{ (input: string | URL | globalThis.Request, init?: RequestInit): Promise<Response>; (input: string | URL | globalThis.Request, init?: RequestInit): Promise<Response>; }}
*/
let originalFetch
// Use a real DAG-JSON CID that will naturally have codec 0x0129 (297)
// CIDs that start with 'bagu' are DAG-JSON encoded
const dagJsonCid = 'baguqeerayzpbdctxk4iyps45uldgibsvy6zro33vpfbehggivhcxcq5suaia'
// Sample JSON response
const jsonResponse = { test: 'value' }
// Use a real DAG-CBOR CID that will naturally have codec 0x71 (113)
// CIDs that start with 'bafy' are DAG-CBOR encoded
const dagCborCid = 'bafyreictdikh363qfxsmjp63i6kup6aukjqfpd4r6wbhbiz2ctuji4bofm'

beforeEach(() => {
originalFetch = globalThis.fetch
})

afterEach(() => {
// Restore the original fetch function after each test
globalThis.fetch = originalFetch
Comment on lines +670 to +671
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not mock the global fetch function and instead inject it to fetchCid function. It's a pattern we already use in other repositories.

The trick is to define the fetch parameter as optional and defaulting to the global fetch function.

export async function fetchCid (providerBaseUrl, cid, { fetchTimeout, fetch } = { globalThis.fetch }) {
  // ...
}

// in tests
const fetch = async () => {
  ok: true,
  status: 200,
  json: () => Promise.resolve(jsonResponse),
  arrayBuffer: () => { throw new Error('Should not call arrayBuffer for JSON') }
  })
})
const result = await fetchCid('http://example.com', dagJsonCid, { fetch })
// ... 

Also notice that the following line can be simplified:

let fn
fn = () => { return Promise.resolve(/* result */) }
// is the same as
fn = () => Promise.resolve(/* result */)
// can be also written as
fn = async () => /* result */

})

it('uses DAG-JSON codec (0x0129) to parse response as JSON', async () => {
// Mock fetch to return JSON
// @ts-ignore
globalThis.fetch = mock.fn(() => {
return Promise.resolve({
ok: true,
status: 200,
json: () => Promise.resolve(jsonResponse),
arrayBuffer: () => { throw new Error('Should not call arrayBuffer for JSON') }
})
})

const parsedCid = CID.parse(dagJsonCid)
assert.strictEqual(parsedCid.code, 297)

const result = await fetchCid('http://example.com', dagJsonCid)

// Verify we got the JSON response
assert.deepStrictEqual(result, jsonResponse)
})

it('uses DAG-CBOR codec (0x71) to parse response as CBOR', async () => {
const cborData = cbor.encode(jsonResponse)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit unexpected - jsonResponse encoded using cbor encoding. I was expecting to see cbor.encode(cborResponse).

After reading the rest of the test, I see that it does not matter what is the structure of the response body. Let's rename jsonResponse to something like someResponse or testResponse to make it more clear this object is not related to whether we use JSON or CBOR encoding.


// Mock fetch to return ArrayBuffer
// @ts-ignore
globalThis.fetch = mock.fn(() => {
return Promise.resolve({
ok: true,
status: 200,
json: () => { throw new Error('Should not call json for CBOR') },
arrayBuffer: () => Promise.resolve(cborData.buffer)
})
})

const parsedCid = CID.parse(dagCborCid)
assert.strictEqual(parsedCid.code, 113)

const result = await fetchCid('http://example.com', dagCborCid)

// Verify we got the decoded CBOR data
assert.deepStrictEqual(result, jsonResponse)
})

it('throws an error for unknown codec', async () => {
// Mock fetch to return JSON

// @ts-ignore
globalThis.fetch = mock.fn(() => {
return Promise.resolve({
ok: true,
status: 200,
json: () => { throw new Error('Should not call json for CBOR') },
arrayBuffer: () => { throw new Error('Should not call arrayBuffer for fallback') }
})
})

// Use a CID with a codec that is neither DAG-JSON (0x0129) nor DAG-CBOR (0x71)
// This is a raw codec (0x55) CID
const unknownCodecCid = 'bafkreigrnnl64xuevvkhknbhrcqzbdvvmqnchp7ae2a4ulninsjoc5svoq'
const parsedCid = CID.parse(unknownCodecCid)
assert.strictEqual(parsedCid.code, 85)
const errorMessage = 'To parse non base32, base36 or base58btc encoded CID multibase decoder must be provided'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am confused about this error message. The CID in unknownCodecCid is encoded using base32 as you can verify here: https://cid.ipfs.tech/#bafkreigrnnl64xuevvkhknbhrcqzbdvvmqnchp7ae2a4ulninsjoc5svoq

I am expecting an error message telling us that we encountered an unsupported codec 85.

Perhaps the test is triggering a different error path than you thought?

try {
await fetchCid('http://example.com', 'testcid')
assert.fail('fetchCid should have thrown an error')
} catch (error) {
// Check the error message

// @ts-ignore
assert.ok(error.message.includes(errorMessage), `Error message should include: ${errorMessage}`)
}
})
it('correctly fetches and processes real DAG-CBOR data from Curio provider', async function () {
// Use a real Curio provider and known DAG-CBOR CID
const curioProviderUrl = 'https://f03303347-market.duckdns.org/ipni-provider/12D3KooWJ91c6xQshrNe7QAXPFAaeRrHWq2UrgXGPf8UmMZMwyZ5'
const dagCborCid = 'baguqeeracgnw2ecmhaa6qkb3irrgjjk5zt5fes7wwwpb4aymoaogzyvvbrma'
// Use the real fetchCid function with the original fetch implementation
globalThis.fetch = originalFetch
/** @type {{ Entries: { [key: string]: string } }} */
// @ts-ignore
const result = await pRetry(
() =>
(
fetchCid(curioProviderUrl, dagCborCid)
)
)
Comment on lines +754 to +760
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we have to disable type-checking for this statement?

Let's try to find a way how to tell TypeScript what we are trying to accomplish.


// Verify the result has the expected structure for DAG-CBOR entries
assert(result, 'Expected a non-null result')
assert(result.Entries, 'Result should have Entries property')
const entriesCid = result.Entries['/']
/** @type {{Entries: Array<unknown>}} */
// @ts-ignore
const entriesChunk = await pRetry(
() =>
(
fetchCid(curioProviderUrl, entriesCid)
)
)
Comment on lines +767 to +773
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

const payloadCid = processEntries(entriesCid, entriesChunk)
console.log(payloadCid)
assert.deepStrictEqual(payloadCid, 'bafkreiefrclz7c6w57yl4u7uiq4kvht4z7pits5jpcj3cajbvowik3rvhm')
})
})