diff --git a/indexer/lib/advertisement-walker.js b/indexer/lib/advertisement-walker.js index e752621..6f52ebc 100644 --- a/indexer/lib/advertisement-walker.js +++ b/indexer/lib/advertisement-walker.js @@ -311,9 +311,16 @@ export async function fetchAdvertisedPayload (providerAddress, advertisementCid, } } - debug('entriesChunk %s %j', entriesCid, entriesChunk.Entries.slice(0, 5)) - const entryHash = entriesChunk.Entries[0]['/'].bytes - const payloadCid = CID.create(1, 0x55 /* raw */, multihash.decode(Buffer.from(entryHash, 'base64'))).toString() + let payloadCid + try { + payloadCid = processEntries(entriesCid, entriesChunk) + } catch (err) { + debug('Error processing entries: %s', err) + return { + error: /** @type {const} */('PAYLOAD_CID_NOT_EXTRACTABLE'), + previousAdvertisementCid + } + } return { previousAdvertisementCid, @@ -326,16 +333,44 @@ export async function fetchAdvertisedPayload (providerAddress, advertisementCid, * @param {string} cid * @param {object} [options] * @param {number} [options.fetchTimeout] + * @param {typeof fetch} [options.fetchMethod] * @returns {Promise} */ -async function fetchCid (providerBaseUrl, cid, { fetchTimeout } = {}) { - const url = new URL(cid, new URL('/ipni/v1/ad/_cid_placeholder_', providerBaseUrl)) +export async function fetchCid (providerBaseUrl, cid, { fetchTimeout, fetchMethod } = {}) { + let url = new URL(providerBaseUrl) + + // Check if the URL already has a path + if (!(url.pathname && url.pathname !== '/')) { + // If no path, add the standard path with a placeholder + url = new URL('/ipni/v1/ad/_cid_placeholder_', providerBaseUrl) + } else { + // If there's already a path, append the additional path + url = new URL(`${url.pathname}/ipni/v1/ad/_cid_placeholder_`, url.origin) + } + url = new URL(cid, url) debug('Fetching %s', url) try { - const res = await fetch(url, { signal: AbortSignal.timeout(fetchTimeout ?? 30_000) }) + const res = await (fetchMethod ?? fetch)(url, { signal: AbortSignal.timeout(fetchTimeout ?? 30_000) }) debug('Response from %s → %s %o', url, res.status, res.headers) await assertOkResponse(res) - return await res.json() + + // Determine the codec based on the CID + const parsedCid = CID.parse(cid) + const codec = parsedCid.code + + // List of codecs: + // https://github.com/multiformats/multicodec/blob/f18c7ba5f4d3cc74afb51ee79978939abc0e6556/table.csv + switch (codec) { + case 297: // DAG-JSON + return await res.json() + + case 113: { // DAG-CBOR + const buffer = await res.arrayBuffer() + return cbor.decode(new Uint8Array(buffer)) } + + default: + throw new Error(`Unknown codec ${codec} for CID ${cid}`) + } } catch (err) { if (err && typeof err === 'object') { Object.assign(err, { url }) @@ -371,3 +406,51 @@ export function parseMetadata (meta) { return { protocol } } } + +/** + * Process entries from either DAG-JSON or DAG-CBOR format + * @param {string} entriesCid - The CID of the entries + * @param {{Entries: Array}} entriesChunk - The decoded entries + * @returns {string} The payload CID + */ +export function processEntries (entriesCid, entriesChunk) { + if (!entriesChunk.Entries || !entriesChunk.Entries.length) { + throw new Error(`No entries found in the response for ${entriesCid}`) + } + const parsedCid = CID.parse(entriesCid) + const codec = parsedCid.code + let entryBytes + switch (codec) { + case 297: { + // DAG-JSON + // For DAG-JSON format, the entry is a base64 encoded string + const entry = entriesChunk.Entries[0] + // Check that entry is an object with a '/' property + if (!entry || typeof entry !== 'object' || !('/' in entry)) { + throw new Error('DAG-JSON entry must have a "/" property') + } + + // Verify the '/' property is an object with 'bytes' property + // In DAG-JSON, CIDs are represented as objects with a '/' property that contains 'bytes' + if (!entry['/'] || typeof entry['/'] !== 'object' || !('bytes' in entry['/'])) { + throw new Error('DAG-JSON entry\'s "/" property must be an object with a "bytes" property') + } + + const entryHash = entry['/'].bytes + if (typeof entryHash !== 'string') { + throw new Error('DAG-JSON entry\'s ["/"]["bytes"] property must be a string') + } + entryBytes = Buffer.from(entryHash, 'base64') + break } + case 113: { + // DAG-CBOR + // For DAG-CBOR format, the entry is already a Uint8Array with the multihash + entryBytes = entriesChunk.Entries[0] + assert(entryBytes instanceof Uint8Array, 'DAG-CBOR entry must be a Uint8Array') + break } + default: + throw new Error(`Unsupported codec ${codec}`) + } + assert(entryBytes, 'Entry bytes must be set') + return CID.create(1, 0x55 /* raw */, multihash.decode(entryBytes)).toString() +} diff --git a/indexer/test/advertisement-walker.test.js b/indexer/test/advertisement-walker.test.js index 3056b8a..ce4c8d8 100644 --- a/indexer/test/advertisement-walker.test.js +++ b/indexer/test/advertisement-walker.test.js @@ -1,19 +1,25 @@ import { RedisRepository } from '@filecoin-station/spark-piece-indexer-repository' import { Redis } from 'ioredis' import assert from 'node:assert' -import { after, before, beforeEach, describe, it } from 'node:test' +import { after, before, beforeEach, describe, it, mock } from 'node:test' import { setTimeout } from 'node:timers/promises' import { fetchAdvertisedPayload, processNextAdvertisement, walkOneStep + , processEntries, + fetchCid } from '../lib/advertisement-walker.js' +import pRetry from 'p-retry' import { givenHttpServer } from './helpers/http-server.js' import { FRISBII_ADDRESS, FRISBII_AD_CID } from './helpers/test-data.js' import { assertOkResponse } from '../lib/http-assertions.js' import * as stream from 'node:stream' import { pipeline } from 'node:stream/promises' - +import * as cbor from '@ipld/dag-cbor' +import { CID } from 'multiformats/cid' +import * as crypto from 'node:crypto' +import * as multihash from 'multiformats/hashes/digest' /** @import { ProviderInfo, WalkerState } from '../lib/typings.js' */ // TODO(bajtos) We may need to replace this with a mock index provider @@ -517,3 +523,249 @@ describe('data schema for REST API', () => { }) }) }) + +describe('processEntries', () => { + // Use a real DAG-JSON CID that will naturally have codec 0x0129 (297) + // CIDs that start with 'bagu' are DAG-JSON encoded + const dagJsonCid = 'baguqeeraa5mjufqdwuwrrrqboctnn3vhdlq63rj3hce2igpzbmae7sazkfea' + // Use a real DAG-CBOR CID that will naturally have codec 0x71 (113) + // CIDs that start with 'bafy' are DAG-CBOR encoded + const dagCborCid = 'bafyreibpxkmu65ezxy7rynxotbghfz3ktiapjisntepd67hghfn4hde3na' + const testData = 'test data for multihash' + // Create a proper multihash from this digest + const mh = multihash.create(0x12, crypto.createHash('sha256').update(testData).digest()) + // @ts-ignore + const entryBytes = Buffer.from(mh.bytes).toString('base64') + const dagJsonChunk = { + Entries: [ + { + '/': { + bytes: entryBytes + } + } + ] + } + + const dagCborChunk = { + Entries: [mh.bytes] + } + it('processes DAG-JSON entries correctly', () => { + // Process the entries with the real CID + const result = processEntries(dagJsonCid, dagJsonChunk) + + // Verify the result is a valid CID string + assert(CID.parse(result), 'Result should be a parseable CID') + }) + + it('processes DAG-CBOR entries correctly', () => { + // Process the entries with the real CID + const result = processEntries(dagCborCid, dagCborChunk) + + // Verify the result is a valid CID string + assert(CID.parse(result), 'Result should be a parseable CID') + }) + + // Error handling tests + it('throws an error when entries array is empty', () => { + assert.throws( + () => processEntries(dagCborCid, { Entries: [] }), + /No entries found/ + ) + }) + + it('throws an error when Entries field is missing', () => { + assert.throws( + // @ts-ignore + () => processEntries(dagCborCid, {}), + /No entries found/ + ) + }) + + it('throws an error for unsupported codec', () => { + // Use a CID with an unsupported codec + const unsupportedCid = 'bafkreigrnnl64xuevvkhknbhrcqzbdvvmqnchp7ae2a4ulninsjoc5svoq' + + assert.throws( + () => processEntries(unsupportedCid, dagJsonChunk), + /Unsupported codec/ + ) + }) + + // Data integrity test using real multihash operations + it('correctly creates a CID from entry data', () => { + // Process the entries + const result = processEntries(dagJsonCid, dagJsonChunk) + + // Create the expected CID directly + const expectedCid = CID.create(1, 0x55, mh).toString() + + // They should match + assert.strictEqual(result, expectedCid, 'CID should match the one created directly') + }) + + // Test with entries from CBOR encoding/decoding + it('correctly handles DAG-CBOR entries serialized with @ipld/dag-cbor', () => { + // Process the entries + const result = processEntries(dagCborCid, dagCborChunk) + + // Create the expected CID directly + const expectedCid = CID.create(1, 0x55, mh).toString() + + // They should match + assert.strictEqual(result, expectedCid, 'CID should match the one created directly') + }) + + // Error case tests with real data + it('handles malformed base64 in DAG-JSON gracefully', () => { + const malformedChunk = { + Entries: [ + { + '/': { + bytes: 'This-is-not-valid-base64!' + } + } + ] + } + + // We expect an error when processing this malformed data + assert.throws( + () => processEntries(dagJsonCid, malformedChunk), + /Incorrect length/ + ) + }) + + it('handles invalid multihash in DAG-CBOR gracefully', () => { + const invalidChunk = { + Entries: [new Uint8Array([0, 1, 2, 3])] // Too short to be a valid multihash + } + + // We expect an error when processing this invalid multihash + assert.throws( + () => processEntries(dagCborCid, invalidChunk), + /Incorrect length/ + ) + }) +}) + +describe('fetchCid', () => { + // Use a real DAG-JSON CID that will naturally have codec 0x0129 (297) + // CIDs that start with 'bagu' are DAG-JSON encoded + const dagJsonCid = 'baguqeerayzpbdctxk4iyps45uldgibsvy6zro33vpfbehggivhcxcq5suaia' + // Sample JSON response + const testResponse = { test: 'value' } + // Use a real DAG-CBOR CID that will naturally have codec 0x71 (113) + // CIDs that start with 'bafy' are DAG-CBOR encoded + const dagCborCid = 'bafyreictdikh363qfxsmjp63i6kup6aukjqfpd4r6wbhbiz2ctuji4bofm' + + it('uses DAG-JSON codec (0x0129) to parse response as JSON', async () => { + // Mock fetch to return JSON + // @ts-ignore + const mockFetch = mock.fn(() => Promise.resolve({ + ok: true, + status: 200, + json: () => Promise.resolve(testResponse), + arrayBuffer: () => { throw new Error('Should not call arrayBuffer for JSON') } + })) + + const parsedCid = CID.parse(dagJsonCid) + assert.strictEqual(parsedCid.code, 297) + + // @ts-ignore + const result = await fetchCid('http://example.com', dagJsonCid, { fetchMethod: mockFetch }) + + // Verify we got the JSON response + assert.deepStrictEqual(result, testResponse) + }) + + it('uses DAG-CBOR codec (0x71) to parse response as CBOR', async () => { + const cborData = cbor.encode(testResponse) + + // Mock fetch to return ArrayBuffer + // @ts-ignore + const mockFetch = mock.fn(() => Promise.resolve({ + ok: true, + status: 200, + json: () => { throw new Error('Should not call json for CBOR') }, + arrayBuffer: () => Promise.resolve(cborData.buffer) + }) + ) + + const parsedCid = CID.parse(dagCborCid) + assert.strictEqual(parsedCid.code, 113) + + // @ts-ignore + const result = await fetchCid('http://example.com', dagCborCid, { fetchMethod: mockFetch }) + + // Verify we got the decoded CBOR data + assert.deepStrictEqual(result, testResponse) + }) + + it('throws an error for unknown codec', async () => { + // Mock fetch to return JSON + + // @ts-ignore + const mockFetch = mock.fn(() => Promise.resolve({ + ok: true, + status: 200, + json: () => { throw new Error('Should not call json for CBOR') }, + arrayBuffer: () => { throw new Error('Should not call arrayBuffer for fallback') } + }) + ) + + // Use a CID with a codec that is neither DAG-JSON (0x0129) nor DAG-CBOR (0x71) + // This is a raw codec (0x55) CID + const unknownCodecCid = 'bafkreigrnnl64xuevvkhknbhrcqzbdvvmqnchp7ae2a4ulninsjoc5svoq' + const parsedCid = CID.parse(unknownCodecCid) + assert.strictEqual(parsedCid.code, 85) + const errorMessage = 'Unknown codec 85' + try { + // @ts-ignore + await fetchCid('http://example.com', unknownCodecCid, { fetchMethod: mockFetch }) + assert.fail('fetchCid should have thrown an error') + } catch (error) { + // @ts-ignore + assert.ok(error.message.includes(errorMessage), `Error message should include: ${errorMessage}`) + } + }) + it('correctly fetches and processes real DAG-CBOR data from Curio provider', async function () { + // Use a real Curio provider and known DAG-CBOR CID + const curioProviderUrl = 'https://f03303347-market.duckdns.org/ipni-provider/12D3KooWJ91c6xQshrNe7QAXPFAaeRrHWq2UrgXGPf8UmMZMwyZ5' + const dagCborCid = 'baguqeeracgnw2ecmhaa6qkb3irrgjjk5zt5fes7wwwpb4aymoaogzyvvbrma' + /** @type {unknown} */ + let result = await pRetry( + () => + ( + fetchCid(curioProviderUrl, dagCborCid) + ) + ) + + // Verify the result has the expected structure for DAG-CBOR entries + assert(result, 'Expected a non-null result') + assert(typeof result === 'object' && result !== null, 'Result should be an object') + + /** @type {Record} */ + const resultObj = /** @type {Record} */ (result) + assert('Entries' in resultObj, 'Result should have Entries property') + assert(typeof resultObj.Entries === 'object' && resultObj.Entries !== null, 'Entries should be an object') + + /** @type {Record} */ + const entries = /** @type {Record} */ (resultObj.Entries) + assert('/' in entries, 'Entries should have a "/" property') + + const entriesCid = entries['/'] + assert(typeof entriesCid === 'string', 'Entries CID should be a string') + + /** @type {unknown} */ + result = await pRetry( + () => + ( + fetchCid(curioProviderUrl, entriesCid) + ) + ) + /** @type {{ Entries: unknown[]; }} */ + const entriesChunk = /** @type {{ Entries: unknown[]; }} */ (result) + const payloadCid = processEntries(entriesCid, entriesChunk) + console.log(payloadCid) + assert.deepStrictEqual(payloadCid, 'bafkreiefrclz7c6w57yl4u7uiq4kvht4z7pits5jpcj3cajbvowik3rvhm') + }) +})