Skip to content

Commit

Permalink
feat: advertise http endpoint as extended provider
Browse files Browse the repository at this point in the history
This is a first pass at adding http as provider to our advertisments for the IPNI indexers. The goal is to advertise our content as being available over bitswap and http.

**NOTE** This module needs refactoring and updating to the (dag) haus style. I have not tackled that yet.
This PR is currently makes only the minimal changes required to add Extended Providers.

The logic of how to serialise and sign a Provider was derivied from the exiting signing code in this module and the implementation in go-libipni

see: https://github.com/ipni/go-libipni/blob/afe2d8ea45b86c2a22f756ee521741c8f99675e5/ingest/schema/envelope.go#L125-L167

License: MIT
Signed-off-by: Oli Evans <[email protected]>
  • Loading branch information
olizilla committed May 5, 2023
1 parent 8a71991 commit 64b2e35
Show file tree
Hide file tree
Showing 10 changed files with 170 additions and 53 deletions.
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,12 @@ _Variables in bold are required._
| Name | Default | Description |
| ---------------------------- | --------------------- | -------------------------------------------------------------------------------------- |
| **BITSWAP_PEER_MULTIADDR** | | The multiaddr of the BitSwap peer to download the data from. Omit the `/p2p/...` part. |
| **HTTP_PEER_MULTIADDR** | | The multiaddr of the HTTP peer to download the data from. Omit the `/p2p/...` part. |
| ENV_FILE_PATH | `$PWD/.env` | The environment file to load. |
| **HANDLER** | | The operation to execute. Can be `content` or `advertisement`. |
| **INDEXER_NODE_URL** | | The root URL (schema, host and port) of the indexer node to announce data to. |
| NODE_DEBUG | | If it contains `aws-ipfs`, debug mode is enabled. |
| NODE_ENV | | Set to `production` to disable pretty logging. |
| PEER_ID_DIRECTORY | `/tmp` | The directory of the file containing the BitSwap PeerID in JSON format. |
| PEER_ID_FILE | `peerId.json` | The filename of the file containing the BitSwap PeerID in JSON format. |
| PEER_ID_S3_BUCKET | | The S3 bucket to download the BitSwap PeerID in JSON format. |
| S3_BUCKET | `advertisements` | The S3 bucket where to upload advertisement and head information to. |
| SQS_ADVERTISEMENTS_QUEUE_URL | `advertisementsQueue` | The SQS topic URL to upload advertisement to for announcement. |
Expand Down
10 changes: 7 additions & 3 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@
"multiformats": "^9.6.1",
"peer-id": "^0.16.0",
"pino": "^7.2.0",
"undici": "^4.13.0"
"undici": "^4.13.0",
"varint": "^6.0.0"
},
"devDependencies": {
"aws-sdk-client-mock": "^0.5.6",
Expand Down
43 changes: 20 additions & 23 deletions src/config.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
'use strict'

const { readFile, writeFile } = require('fs/promises')
const { join, resolve } = require('path')
const { resolve } = require('path')
const PeerId = require('peer-id')

/* c8 ignore next */
Expand All @@ -13,41 +12,39 @@ const { fetchFromS3 } = require('./storage')
const {
AWS_REGION: awsRegion,
BITSWAP_PEER_MULTIADDR: bitswapPeerMultiaddr,
HTTP_PEER_MULTIADDR: httpPeerMultiaddr,
INDEXER_NODE_URL: indexerNodeUrl,
PEER_ID_DIRECTORY: peerIdJsonDirectory,
PEER_ID_FILE: peerIdJsonFile,
PEER_ID_S3_BUCKET: peerIdBucket,
S3_BUCKET: s3Bucket,
SQS_ADVERTISEMENTS_QUEUE_URL: advertisementsQueue
} = process.env

async function downloadPeerIdFile() {
const file = peerIdJsonFile ?? 'peerId.json'
logger.info(`Downloading PeerId from s3://${process.env.PEER_ID_S3_BUCKET}/${file}`)

const contents = await fetchFromS3(process.env.PEER_ID_S3_BUCKET, file)
return writeFile(module.exports.peerIdJsonPath, contents)
async function fetchPeerId (file) {
if (!peerIdBucket) {
throw new Error('PEER_ID_S3_BUCKET must be set in ENV')
}
logger.info(`Downloading PeerId from s3://${peerIdBucket}/${file}`)
const contents = await fetchFromS3(peerIdBucket, file)
const json = JSON.parse(contents)
return await PeerId.createFromJSON(json)
}

async function getPeerId() {
if (process.env.PEER_ID_S3_BUCKET) {
await downloadPeerIdFile()
}
async function getHttpPeerId () {
return fetchPeerId('peerId-http.json')
}

try {
const peerIdJson = JSON.parse(await readFile(module.exports.peerIdJsonPath, 'utf-8'))
return await PeerId.createFromJSON(peerIdJson)
} catch (e) {
return PeerId.create()
}
async function getBitswapPeerId () {
return fetchPeerId('peerId.json')
}

module.exports = {
advertisementsQueue: advertisementsQueue ?? 'advertisementsQueue',
awsRegion,
bitswapPeerMultiaddr,
getPeerId,
httpPeerMultiaddr,
getBitswapPeerId,
getHttpPeerId,
indexerNodeUrl,
metadata: Buffer.from('gBI=', 'base64'), // To regenerate: Buffer.from(require('varint').encode(0x900)).toString('base64')
peerIdJsonPath: join(peerIdJsonDirectory ?? '/tmp', peerIdJsonFile ?? 'peerId.json'),
peerIdBucket,
s3Bucket: s3Bucket ?? 'advertisements'
}
120 changes: 109 additions & 11 deletions src/handlers/advertisement.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,24 @@ const { CID } = require('multiformats/cid')
const { Multiaddr } = require('multiaddr')
const { request } = require('undici')

const { awsRegion, getPeerId, s3Bucket, bitswapPeerMultiaddr, indexerNodeUrl, metadata } = require('../config')
const { awsRegion, getBitswapPeerId, getHttpPeerId, s3Bucket, bitswapPeerMultiaddr, httpPeerMultiaddr, indexerNodeUrl } = require('../config')
const { logger, serializeError } = require('../logging')
const { uploadToS3 } = require('../storage')
const telemetry = require('../telemetry')
const varint = require('varint')

// see: https://github.com/ipni/specs/blob/main/IPNI.md#metadata
const BITSWAP_METADATA = Buffer.from(varint.encode(0x900))
const HTTP_METADATA = Buffer.from(varint.encode(0x3D0000))

/**
* see: https://github.com/ipni/specs/blob/main/IPNI.md#extendedprovider
* @typedef {object} Provider
* @prop {string} ID - peerID as string
* @prop {string[]} Addresses - multiaddrs for peer e.g /dns4/freeway.dag.house/tcp/443/https
* @prop {Buffer} Metadata - prefixed with varint for http or bitswap
* @prop {Buffer} [Signature] - signature per
*/

async function fetchHeadCid() {
try {
Expand Down Expand Up @@ -57,7 +71,73 @@ async function fetchHeadCid() {
}
}

async function computeAdvertisementSignature(previous, peerId, cid, addresses) {
/**
* Create array of signed Providers
* @param {Provider[]} providers
* @param {object} previous
* @param {PeerId} peerId - advertisment peerId
* @param {CID} cid - entries CID
* @param {Buffer} contextId - derived from cid
*/
async function signProviders (providers, previous, peerId, cid, contextId) {
const signed = []
for (const provider of providers) {
const Signature = await providerSignature(previous, peerId, cid, contextId, provider)
signed.push({
...provider,
Signature
})
}
return signed
}

/**
* Calculate the signature for an Extended Provider
* see: https://github.com/ipni/go-libipni/blob/afe2d8ea45b86c2a22f756ee521741c8f99675e5/ingest/schema/envelope.go#L125
* @param {object} previous
* @param {PeerId} peerId - advertisment peerId
* @param {CID} cid - entries CID
* @param {Buffer} contextId - derived from cid
* @param {Provider} provider - provider to sign
* @param {boolean} extendedProviderOverride
*/
async function providerSignature (previous, peerId, cid, contextId, provider, extendedProviderOverride = false) {
const sigBuf = Buffer.concat([
previous ? Buffer.from(CID.parse(previous['/']).bytes) : Buffer.alloc(0),
Buffer.from(cid.bytes),
Buffer.from(peerId.toString(), 'utf-8'),
contextId,
Buffer.from(provider.ID, 'utf-8'),
...provider.Addresses.map(a => Buffer.from(a, 'utf-8')),
provider.Metadata,
extendedProviderOverride ? Buffer.from([1]) : Buffer.from([0]) // if ad.ExtendedProvider.Override { sigBuf.WriteByte(1)} else { sigBuf.WriteByte(0 }
])

const digest = await sha256.digest(sigBuf)
const payload = digest.bytes

const sealed = await Envelope.seal(
{
domain: Buffer.from('indexer', 'utf-8'),
codec: Buffer.from('/indexer/ingest/adSignature', 'utf-8'),
marshal: () => payload
},
peerId
)

return sealed.marshal()
}

/**
* Calculate the signature for an Extended Provider
* see: https://github.com/ipni/go-libipni/blob/afe2d8ea45b86c2a22f756ee521741c8f99675e5/ingest/schema/envelope.go#L125
* @param {object} previous
* @param {PeerId} peerId - advertisment peerId
* @param {CID} cid - entries CID
* @param {string[]} addresses - multiaddrs
* @param {Buffer} metadata - prefixed with varint for http or bitswap
*/
async function computeAdvertisementSignature(previous, peerId, cid, addresses, metadata) {
const payload = (
await sha256.digest(
Buffer.concat([
Expand Down Expand Up @@ -169,28 +249,46 @@ async function notifyIndexer(cid, peerId) {

async function main(event) {
try {
const peerId = await getPeerId()
const bsPeerId = await getBitswapPeerId()
const httpPeerId = await getHttpPeerId()

/** @type {Provider} */
const httpProvider = {
ID: bsPeerId.toString(),
Addresses: [httpPeerMultiaddr],
Metadata: HTTP_METADATA
}

/** @type {Provider} */
const bitswapProvider = {
ID: httpPeerId.toString(),
Addresses: [bitswapPeerMultiaddr],
Metadata: BITSWAP_METADATA
}

// Track the latest read cid and advertisementCid
let cid
let advertisementCid

for (const record of event.Records) {
const cidString = record.body
const contextId = Buffer.from(cidString)
cid = CID.parse(cidString)

const previous = advertisementCid ? { '/': advertisementCid.toString() } : await fetchHeadCid()
const addresses = [bitswapPeerMultiaddr]

// Create the advertisement
const rawAdvertisement = {
Provider: peerId.toString(),
Addresses: addresses,
Provider: bsPeerId.toString(),
Addresses: bitswapProvider.Addresses,
Entries: { '/': cidString },
ContextID: { '/': { bytes: Buffer.from(cidString).toString('base64') } },
Metadata: metadata,
ContextID: contextId,
Metadata: BITSWAP_METADATA,
IsRm: false,
Signature: await computeAdvertisementSignature(previous, peerId, cid, addresses)
ExtendedProvider: {
Providers: await signProviders([bitswapProvider, httpProvider], previous, bsPeerId, cid, contextId)
},
Signature: await computeAdvertisementSignature(previous, bsPeerId, cid, bitswapProvider.Addresses, BITSWAP_METADATA)
}

if (previous) {
Expand All @@ -206,10 +304,10 @@ async function main(event) {
}

// Update the head
await updateHead(advertisementCid, peerId)
await updateHead(advertisementCid, bsPeerId)

// Notify the indexer-node
await notifyIndexer(advertisementCid, peerId)
await notifyIndexer(advertisementCid, bsPeerId)

// Return a empty object to signal we have consumed all the messages
return {}
Expand Down
6 changes: 4 additions & 2 deletions test/advertisement.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ process.env.HANDLER = 'advertisement'
const t = require('tap')
const { decode: decodeDAG } = require('@ipld/dag-json')
const { MockAgent, setGlobalDispatcher } = require('undici')
const { awsRegion, s3Bucket, indexerNodeUrl } = require('../src/config')
const { awsRegion, s3Bucket, peerIdBucket, indexerNodeUrl } = require('../src/config')
const { handler } = require('../src/index')
const { trackAWSUsages } = require('./utils/mock')
const { trackAWSUsages, mockPeerIds } = require('./utils/mock')

mockPeerIds()

t.test('advertisement - creates a new head when none is found and notifies the indexer', async t => {
t.plan(8)
Expand Down
5 changes: 5 additions & 0 deletions test/fixtures/peerId-http.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"id": "Qmb8hPXH8E2kmNWyQvUt4yTXSenBBbd7kUAmWSZ6UksmCC",
"privKey": "CAASqQkwggSlAgEAAoIBAQDLhmHURcpa6rO4py71Pu9i2+4RP+dVc6fUV/ew1UENVHdKG5Enrd9My2wy5T45r3CIPooUAvQqS1UP3y6w+VFBmfHOR17Bd0Jpy8daWeKmxW5K0IE3gGYbZiCoXPrOjM7Zjy7rQvFwtqpC1cnbck/XPylrflynkze/7aVCICjnNGQX/otS/Y9nZdUzwPZrnlmJbev86FA+z8HtSjJAO0oCYvJVqC/owlT7Kcf1PpFP0gspMTxfvd8yjkOrEiYdWETHGy92NExX1lihQTcSxAx/GEuFfwCprG/lnzaX2M4KOQcnzzeAzE+nvlWV7SHH3xBD/GowZOpC5LbN+imVTwe1AgMBAAECggEBALxiuO75YqLhGFXVOhv7ky2YkTYaRpDMKw7cFgLygfJKutg3yBZIVKcKrC44D27pu2oBKWH4kfUMIcI9PUTGobXtPutHGKhPMYQoAXDaPndLzBkoAlNTYDAASj5NQkIqB9VNOYq+PimsitCuftfHaau9ZSOApVbfZ/0ZhZw9lF7ccywrxt2oRs3vWJdpxaNj8pobmMWEBE+st9icfytoYzq0MzeFCKFe1efF6uCbJjjsJN9VlUgmwuoj7LLwk9gui5DTVjWTUZav0fscbdGrTxzDVGN6l1zz61imoYYs6/peaOSpH6ElXqZPVGqx5crX7kISAtVYMwEl1rYvmzdEaYECgYEA5lupGNRmCuNvdul78TkmNpTzo7XxDd3vsuWeD2zSYiHscixag3PwUziOimAFoObE0Q1WrRwUZ0DBMEQlczSglnxwfqcmFTMD10hKZ7iHLDaVSD4VA5SQDjcLB6iGUUJSFGlA3vzbsCAPJ/jiJrTpNXRlAf3pwLmY1wGjpDDYbmUCgYEA4i4UFx+IjRtR7O0sLSPG0PMeMii/H8qvkDOnI+6QVfMM9u7jPfJr3dPgNhGjsiE3KtE9iQ2toAPM7cN1KlFkLtJj1Krbgplq92WUmq0+FGsJ7uJN+v3AW1DMvY3aUKiP36o7YJVFm8aFkJvK28H8dXGowZtnyl4FFjZNr8rYNxECgYEAub1QpFemi8ME/kGXEVK+UCZcKTWrzz2TZ5EjmFZrcHrf9tNyjCmwrrmcnVB9WVaZfeKeb+iDPxoEa5GXDMRzc8OUxVqrJ07CGxUMAYPVYj4EsoJ0WTWxzfRt7VfiLaUnH5QanXIPBX1kGxvf5cvbeqGGTEva7e34oeyNcPd9m80CgYBADcyiv8tsKtCIyKI6JmenxTJIAkIePsh5jidHhchajvqc9ApnAMCLkfVtWpGBYOa+uYMzzSvepTTkIPaNhX9ZwzuCfuCrm8cvSRVMuMNcBPRLMsviyPow4jEZfZLWROZ58TvfnmaYNyCcU4KfudBFcmrYaap7JSVLpRVbM5cIAQKBgQCSP6ekcyZ4zQUTrbnzxhwVHoZqWus5WU9Mhcn32RRIbbMZhU4uRJjy//0MDilgoAbpyA7oUKSYNAZj5QmdMRYyd0lc9+K79eh3qClyZb/zjUcQiJTMSQVSrZHgOsb+sfOd1CYHbHZU8BcYWqBMRdGBxdt/h49JJmDclEYkUF0fYw==",
"pubKey": "CAASpgIwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQDLhmHURcpa6rO4py71Pu9i2+4RP+dVc6fUV/ew1UENVHdKG5Enrd9My2wy5T45r3CIPooUAvQqS1UP3y6w+VFBmfHOR17Bd0Jpy8daWeKmxW5K0IE3gGYbZiCoXPrOjM7Zjy7rQvFwtqpC1cnbck/XPylrflynkze/7aVCICjnNGQX/otS/Y9nZdUzwPZrnlmJbev86FA+z8HtSjJAO0oCYvJVqC/owlT7Kcf1PpFP0gspMTxfvd8yjkOrEiYdWETHGy92NExX1lihQTcSxAx/GEuFfwCprG/lnzaX2M4KOQcnzzeAzE+nvlWV7SHH3xBD/GowZOpC5LbN+imVTwe1AgMBAAE="
}
13 changes: 5 additions & 8 deletions test/miscellaneous.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ const { resolve } = require('path')
const { createFromJSON } = require('peer-id')
const { Readable } = require('stream')
const t = require('tap')
const { getPeerId } = require('../src/config')
const { getBitswapPeerId } = require('../src/config')
const { logger } = require('../src/logging')
const telemetry = require('../src/telemetry')
const { s3Mock } = require('./utils/mock')

t.test('config - download the peerId from S3', async t => {
t.plan(3)

const rawPeer = await readFile(resolve(process.cwd(), 'test/fixtures/peerId.json'))
const bsPeer = require('./fixtures/peerId.json')

s3Mock.on(GetObjectCommand).callsFake(async params => {
t.equal(params.Bucket, 'idBucket')
Expand All @@ -25,22 +25,19 @@ t.test('config - download the peerId from S3', async t => {
return { Body: Readable.from(rawPeer) }
})

t.equal((await getPeerId()).toB58String(), (await createFromJSON(JSON.parse(rawPeer))).toB58String())
t.equal((await getBitswapPeerId()).toB58String(), (await createFromJSON(bsPeer).toB58String())
})

t.test('config - creates a new PeerId if download fails', async t => {
t.test('config - fails if PeerId not set', async t => {
t.plan(3)

const rawPeer = await readFile(resolve(process.cwd(), 'test/fixtures/peerId.json'))

s3Mock.on(GetObjectCommand).callsFake(async params => {
t.equal(params.Bucket, 'idBucket')
t.equal(params.Key, 'peerId.json')

return { Body: Readable.from('INVALID', 'utf-8') }
})

t.not((await getPeerId()).toB58String(), (await createFromJSON(JSON.parse(rawPeer))).toB58String())
t.throws(async () => await getBitswapPeerId())
})

t.test('telemetry', async t => {
Expand Down
4 changes: 3 additions & 1 deletion test/utils/env
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
HTTP_PEER_MULTIADDR=/dns4/freeway.dag.house/tcp/443/https
BITSWAP_PEER_MULTIADDR=/ip4/12.34.56.78/tcp/999/ws
INDEXER_NODE_URL=http://87.65.43.21:2345
S3_BUCKET=advertisements
S3_BUCKET=advertisements
PEER_ID_S3_BUCKET=peerids
Loading

0 comments on commit 64b2e35

Please sign in to comment.