Skip to content

Commit

Permalink
feat: resolve the retrieval provider using IPNI (#51)
Browse files Browse the repository at this point in the history
Signed-off-by: Miroslav Bajtoš <[email protected]>
  • Loading branch information
bajtos authored Feb 1, 2024
1 parent 74e69e0 commit 038d4b7
Show file tree
Hide file tree
Showing 9 changed files with 229 additions and 112 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- run: curl -L https://github.com/filecoin-station/zinnia/releases/download/v0.14.0/zinnia-linux-x64.tar.gz | tar -xz
- run: curl -L https://github.com/filecoin-station/zinnia/releases/download/v0.16.0/zinnia-linux-x64.tar.gz | tar -xz
- uses: actions/setup-node@v3
- run: npx standard
- run: ./zinnia run test.js
9 changes: 9 additions & 0 deletions deps.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
// 3rd-party dependencies from Denoland
//
// Run the following script after making change in this file:
// deno bundle deps.ts vendor/deno-deps.js
//

export { encodeHex } from 'https://deno.land/[email protected]/encoding/hex.ts'
export { decodeBase64 } from 'https://deno.land/[email protected]/encoding/base64.ts'
export { decode as decodeVarint } from 'https://deno.land/x/[email protected]/varint.ts'
93 changes: 0 additions & 93 deletions lib/deno-encoding-hex.js

This file was deleted.

75 changes: 75 additions & 0 deletions lib/ipni-client.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import { decodeBase64, decodeVarint } from '../vendor/deno-deps.js'

/**
*
* @param {string} cid
* @returns {Promise<{
* indexerResult: string;
* provider?: { address: string; protocol: string };
* }>}
*/
export async function queryTheIndex (cid) {
const url = `https://cid.contact/cid/${encodeURIComponent(cid)}`

let providerResults
try {
const res = await fetch(url)
if (!res.ok) {
console.error('IPNI query failed, HTTP response: %s %s', res.status, await res.text())
return { indexerResult: `ERROR_${res.status}` }
}

const result = await res.json()
providerResults = result.MultihashResults.flatMap(r => r.ProviderResults)
console.log('IPNI returned %s provider results', providerResults.length)
} catch (err) {
console.error('IPNI query failed.', err)
return { indexerResult: 'ERROR_FETCH' }
}

let graphsyncProvider
for (const p of providerResults) {
// TODO: find only the contact advertised by the SP handling this deal
// See https://filecoinproject.slack.com/archives/C048DLT4LAF/p1699958601915269?thread_ts=1699956597.137929&cid=C048DLT4LAF
// bytes of CID of dag-cbor encoded DealProposal
// https://github.com/filecoin-project/boost/blob/main/indexprovider/wrapper.go#L168-L172
// https://github.com/filecoin-project/boost/blob/main/indexprovider/wrapper.go#L195

const [protocolCode] = decodeVarint(decodeBase64(p.Metadata))
const protocol = {
0x900: 'bitswap',
0x910: 'graphsync',
0x0920: 'http',
4128768: 'graphsync'
}[protocolCode]

const address = p.Provider.Addrs[0]
if (!address) continue

switch (protocol) {
case 'http':
return {
indexerResult: 'OK',
provider: { address, protocol }
}

case 'graphsync':
if (!graphsyncProvider) {
graphsyncProvider = {
address: `${address}/p2p/${p.Provider.ID}`,
protocol
}
}
}
}
if (graphsyncProvider) {
console.log('HTTP protocol is not advertised, falling back to Graphsync.')
return {
indexerResult: 'HTTP_NOT_ADVERTISED',
provider: graphsyncProvider
}
}

console.log('All advertisements are for unsupported protocols.')
return { indexerResult: 'NO_VALID_ADVERTISEMENT' }
}
52 changes: 35 additions & 17 deletions lib/spark.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@

import { ActivityState } from './activity-state.js'
import { SPARK_VERSION, MAX_CAR_SIZE, APPROX_ROUND_LENGTH_IN_MS } from './constants.js'
import { encodeHex } from './deno-encoding-hex.js'
import { queryTheIndex } from './ipni-client.js'
import {
encodeHex
} from '../vendor/deno-deps.js'

const sleep = dt => new Promise(resolve => setTimeout(resolve, dt))

Expand Down Expand Up @@ -35,8 +38,34 @@ export default class Spark {
return retrieval
}

async executeRetrievalCheck (retrieval, stats) {
console.log(`Querying IPNI to find retrieval providers for ${retrieval.cid}`)
const { indexerResult, provider } = await queryTheIndex(retrieval.cid)
stats.indexerResult = indexerResult

if (indexerResult !== 'OK') return

stats.protocol = provider.protocol
stats.providerAddress = provider.address

const searchParams = new URLSearchParams({
// See https://github.com/filecoin-project/lassie/blob/main/docs/HTTP_SPEC.md#dag-scope-request-query-parameter
// Only the root block at the end of the path is returned after blocks required to verify the specified path segments.
'dag-scope': 'block',
protocols: provider.protocol,
providers: provider.address
})
const url = `ipfs://${retrieval.cid}?${searchParams.toString()}`
try {
await this.fetchCAR(url, stats)
} catch (err) {
console.error(`Failed to fetch ${url}`)
console.error(err)
}
}

async fetchCAR (url, stats) {
console.log(`Fetching ${url}...`)
console.log(`Fetching: ${url}`)

// Abort if no progress was made for 60 seconds
const controller = new AbortController()
Expand Down Expand Up @@ -140,23 +169,12 @@ export default class Spark {
carTooLarge: false,
byteLength: 0,
carChecksum: null,
statusCode: null
}
const searchParams = new URLSearchParams({
// See https://github.com/filecoin-project/lassie/blob/main/docs/HTTP_SPEC.md#dag-scope-request-query-parameter
// Only the root block at the end of the path is returned after blocks required to verify the specified path segments.
'dag-scope': 'block',
protocols: retrieval.protocol,
providers: retrieval.providerAddress
})
const url = `ipfs://${retrieval.cid}?${searchParams.toString()}`
try {
await this.fetchCAR(url, stats)
} catch (err) {
console.error(`Failed to fetch ${url}`)
console.error(err)
statusCode: null,
indexerResult: null
}

await this.executeRetrievalCheck(retrieval, stats)

const measurementId = await this.submitMeasurement(retrieval, { ...stats })
Zinnia.jobCompleted()
return measurementId
Expand Down
1 change: 1 addition & 0 deletions test.js
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
import './test/ipni-client.test.js'
import './test/integration.js'
import './test/spark.js'
26 changes: 25 additions & 1 deletion test/integration.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import Spark from '../lib/spark.js'
import { test } from 'zinnia:test'
import { assert } from 'zinnia:assert'
import { assert, assertEquals } from 'zinnia:assert'

const KNOWN_CID = 'bafkreih25dih6ug3xtj73vswccw423b56ilrwmnos4cbwhrceudopdp5sq'

test('integration', async () => {
const spark = new Spark()
Expand All @@ -11,3 +13,25 @@ test('integration', async () => {
assert(retrieval.startAt)
assert(retrieval.finishedAt)
})

test('retrieval check for our CID', async () => {
const spark = new Spark()
spark.getRetrieval = async () => ({ cid: KNOWN_CID })
const measurementId = await spark.nextRetrieval()
const res = await fetch(`https://api.filspark.com/measurements/${measurementId}`)
assert(res.ok)
const m = await res.json()
const assertProp = (prop, expectedValue) => assertEquals(m[prop], expectedValue, prop)

assertProp('cid', KNOWN_CID)
// TODO - spark-api does not record this field yet
// assertProp('indexerResult', 'OK')
assertProp('providerAddress', '/dns/frisbii.fly.dev/tcp/443/https')
assertProp('protocol', 'http')
assertProp('timeout', false)
assertProp('statusCode', 200)
assertProp('byteLength', 200)
assertProp('carTooLarge', false)
// TODO - spark-api does not record this field yet
// assertProp('carChecksum', '122069f03061f7ad4c14a5691b7e96d3ddd109023a6539a0b4230ea3dc92050e7136')
})
16 changes: 16 additions & 0 deletions test/ipni-client.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import { test } from 'zinnia:test'
import { assertEquals } from 'zinnia:assert'
import { queryTheIndex } from '../lib/ipni-client.js'

const KNOWN_CID = 'bafkreih25dih6ug3xtj73vswccw423b56ilrwmnos4cbwhrceudopdp5sq'

test('query advertised CID', async () => {
const result = await queryTheIndex(KNOWN_CID)
assertEquals(result, {
indexerResult: 'OK',
provider: {
address: '/dns/frisbii.fly.dev/tcp/443/https',
protocol: 'http'
}
})
})
67 changes: 67 additions & 0 deletions vendor/deno-deps.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// deno-fmt-ignore-file
// deno-lint-ignore-file
// This code was bundled using `deno bundle` and it's not recommended to edit it manually

const encoder = new TextEncoder();
function getTypeName(value) {
const type = typeof value;
if (type !== "object") {
return type;
} else if (value === null) {
return "null";
} else {
return value?.constructor?.name ?? "object";
}
}
function validateBinaryLike(source) {
if (typeof source === "string") {
return encoder.encode(source);
} else if (source instanceof Uint8Array) {
return source;
} else if (source instanceof ArrayBuffer) {
return new Uint8Array(source);
}
throw new TypeError(`The input must be a Uint8Array, a string, or an ArrayBuffer. Received a value of the type ${getTypeName(source)}.`);
}
const hexTable = new TextEncoder().encode("0123456789abcdef");
new TextEncoder();
const textDecoder = new TextDecoder();
function encodeHex(src) {
const u8 = validateBinaryLike(src);
const dst = new Uint8Array(u8.length * 2);
for(let i = 0; i < dst.length; i++){
const v = u8[i];
dst[i * 2] = hexTable[v >> 4];
dst[i * 2 + 1] = hexTable[v & 0x0f];
}
return textDecoder.decode(dst);
}
function decodeBase64(b64) {
const binString = atob(b64);
const size = binString.length;
const bytes = new Uint8Array(size);
for(let i = 0; i < size; i++){
bytes[i] = binString.charCodeAt(i);
}
return bytes;
}
const MaxUInt64 = 18446744073709551615n;
const REST = 0x7f;
const SHIFT = 7;
function decode(buf, offset = 0) {
for(let i = offset, len = Math.min(buf.length, offset + 10), shift = 0, decoded = 0n; i < len; i += 1, shift += SHIFT){
let __byte = buf[i];
decoded += BigInt((__byte & REST) * Math.pow(2, shift));
if (!(__byte & 0x80) && decoded > MaxUInt64) {
throw new RangeError("overflow varint");
}
if (!(__byte & 0x80)) return [
decoded,
i + 1
];
}
throw new RangeError("malformed or overflow varint");
}
export { encodeHex as encodeHex };
export { decodeBase64 as decodeBase64 };
export { decode as decodeVarint };

0 comments on commit 038d4b7

Please sign in to comment.