diff --git a/README.md b/README.md index 934dbb7..0e33923 100644 --- a/README.md +++ b/README.md @@ -7,13 +7,12 @@ _Variables in bold are required._ | Name | Default | Description | | ---------------------------- | --------------------- | -------------------------------------------------------------------------------------- | | **BITSWAP_PEER_MULTIADDR** | | The multiaddr of the BitSwap peer to download the data from. Omit the `/p2p/...` part. | +| **HTTP_PEER_MULTIADDR** | | The multiaddr of the HTTP peer to download the data from. Omit the `/p2p/...` part. | | ENV_FILE_PATH | `$PWD/.env` | The environment file to load. | | **HANDLER** | | The operation to execute. Can be `content` or `advertisement`. | | **INDEXER_NODE_URL** | | The root URL (schema, host and port) of the indexer node to announce data to. | | NODE_DEBUG | | If it contains `aws-ipfs`, debug mode is enabled. | | NODE_ENV | | Set to `production` to disable pretty logging. | -| PEER_ID_DIRECTORY | `/tmp` | The directory of the file containing the BitSwap PeerID in JSON format. | -| PEER_ID_FILE | `peerId.json` | The filename of the file containing the BitSwap PeerID in JSON format. | | PEER_ID_S3_BUCKET | | The S3 bucket to download the BitSwap PeerID in JSON format. | | S3_BUCKET | `advertisements` | The S3 bucket where to upload advertisement and head information to. | | SQS_ADVERTISEMENTS_QUEUE_URL | `advertisementsQueue` | The SQS topic URL to upload advertisement to for announcement. | diff --git a/package-lock.json b/package-lock.json index 7c82193..8a927e0 100644 --- a/package-lock.json +++ b/package-lock.json @@ -23,7 +23,8 @@ "multiformats": "^9.6.1", "peer-id": "^0.16.0", "pino": "^7.2.0", - "undici": "^4.13.0" + "undici": "^4.13.0", + "varint": "^6.0.0" }, "devDependencies": { "aws-sdk-client-mock": "^0.5.6", @@ -8731,7 +8732,8 @@ }, "node_modules/varint": { "version": "6.0.0", - "license": "MIT" + "resolved": "https://registry.npmjs.org/varint/-/varint-6.0.0.tgz", + "integrity": "sha512-cXEIW6cfr15lFv563k4GuVuW/fiwjknytD37jIOLSdSWuOI6WnO/oKwmP2FQTU2l01LP8/M5TSAJpzUaGe3uWg==" }, "node_modules/verror": { "version": "1.10.0", @@ -14653,7 +14655,9 @@ } }, "varint": { - "version": "6.0.0" + "version": "6.0.0", + "resolved": "https://registry.npmjs.org/varint/-/varint-6.0.0.tgz", + "integrity": "sha512-cXEIW6cfr15lFv563k4GuVuW/fiwjknytD37jIOLSdSWuOI6WnO/oKwmP2FQTU2l01LP8/M5TSAJpzUaGe3uWg==" }, "verror": { "version": "1.10.0", diff --git a/package.json b/package.json index a6265dd..fa86a99 100644 --- a/package.json +++ b/package.json @@ -31,7 +31,8 @@ "multiformats": "^9.6.1", "peer-id": "^0.16.0", "pino": "^7.2.0", - "undici": "^4.13.0" + "undici": "^4.13.0", + "varint": "^6.0.0" }, "devDependencies": { "aws-sdk-client-mock": "^0.5.6", diff --git a/src/config.js b/src/config.js index 2872f86..4c950c3 100644 --- a/src/config.js +++ b/src/config.js @@ -1,7 +1,6 @@ 'use strict' -const { readFile, writeFile } = require('fs/promises') -const { join, resolve } = require('path') +const { resolve } = require('path') const PeerId = require('peer-id') /* c8 ignore next */ @@ -13,41 +12,39 @@ const { fetchFromS3 } = require('./storage') const { AWS_REGION: awsRegion, BITSWAP_PEER_MULTIADDR: bitswapPeerMultiaddr, + HTTP_PEER_MULTIADDR: httpPeerMultiaddr, INDEXER_NODE_URL: indexerNodeUrl, - PEER_ID_DIRECTORY: peerIdJsonDirectory, - PEER_ID_FILE: peerIdJsonFile, + PEER_ID_S3_BUCKET: peerIdBucket, S3_BUCKET: s3Bucket, SQS_ADVERTISEMENTS_QUEUE_URL: advertisementsQueue } = process.env -async function downloadPeerIdFile() { - const file = peerIdJsonFile ?? 'peerId.json' - logger.info(`Downloading PeerId from s3://${process.env.PEER_ID_S3_BUCKET}/${file}`) - - const contents = await fetchFromS3(process.env.PEER_ID_S3_BUCKET, file) - return writeFile(module.exports.peerIdJsonPath, contents) +async function fetchPeerId (file) { + if (!peerIdBucket) { + throw new Error('PEER_ID_S3_BUCKET must be set in ENV') + } + logger.info(`Downloading PeerId from s3://${peerIdBucket}/${file}`) + const contents = await fetchFromS3(peerIdBucket, file) + const json = JSON.parse(contents) + return await PeerId.createFromJSON(json) } -async function getPeerId() { - if (process.env.PEER_ID_S3_BUCKET) { - await downloadPeerIdFile() - } +async function getHttpPeerId () { + return fetchPeerId('peerId-http.json') +} - try { - const peerIdJson = JSON.parse(await readFile(module.exports.peerIdJsonPath, 'utf-8')) - return await PeerId.createFromJSON(peerIdJson) - } catch (e) { - return PeerId.create() - } +async function getBitswapPeerId () { + return fetchPeerId('peerId.json') } module.exports = { advertisementsQueue: advertisementsQueue ?? 'advertisementsQueue', awsRegion, bitswapPeerMultiaddr, - getPeerId, + httpPeerMultiaddr, + getBitswapPeerId, + getHttpPeerId, indexerNodeUrl, - metadata: Buffer.from('gBI=', 'base64'), // To regenerate: Buffer.from(require('varint').encode(0x900)).toString('base64') - peerIdJsonPath: join(peerIdJsonDirectory ?? '/tmp', peerIdJsonFile ?? 'peerId.json'), + peerIdBucket, s3Bucket: s3Bucket ?? 'advertisements' } diff --git a/src/handlers/advertisement.js b/src/handlers/advertisement.js index ee33e6e..1f05074 100644 --- a/src/handlers/advertisement.js +++ b/src/handlers/advertisement.js @@ -10,10 +10,24 @@ const { CID } = require('multiformats/cid') const { Multiaddr } = require('multiaddr') const { request } = require('undici') -const { awsRegion, getPeerId, s3Bucket, bitswapPeerMultiaddr, indexerNodeUrl, metadata } = require('../config') +const { awsRegion, getBitswapPeerId, getHttpPeerId, s3Bucket, bitswapPeerMultiaddr, httpPeerMultiaddr, indexerNodeUrl } = require('../config') const { logger, serializeError } = require('../logging') const { uploadToS3 } = require('../storage') const telemetry = require('../telemetry') +const varint = require('varint') + +// see: https://github.com/ipni/specs/blob/main/IPNI.md#metadata +const BITSWAP_METADATA = Buffer.from(varint.encode(0x900)) +const HTTP_METADATA = Buffer.from(varint.encode(0x3D0000)) + +/** + * see: https://github.com/ipni/specs/blob/main/IPNI.md#extendedprovider + * @typedef {object} Provider + * @prop {string} ID - peerID as string + * @prop {string[]} Addresses - multiaddrs for peer e.g /dns4/freeway.dag.house/tcp/443/https + * @prop {Buffer} Metadata - prefixed with varint for http or bitswap + * @prop {Buffer} [Signature] - signature per + */ async function fetchHeadCid() { try { @@ -57,7 +71,73 @@ async function fetchHeadCid() { } } -async function computeAdvertisementSignature(previous, peerId, cid, addresses) { +/** + * Create array of signed Providers + * @param {Provider[]} providers + * @param {object} previous + * @param {PeerId} peerId - advertisment peerId + * @param {CID} cid - entries CID + * @param {Buffer} contextId - derived from cid + */ +async function signProviders (providers, previous, peerId, cid, contextId) { + const signed = [] + for (const provider of providers) { + const Signature = await providerSignature(previous, peerId, cid, contextId, provider) + signed.push({ + ...provider, + Signature + }) + } + return signed +} + +/** + * Calculate the signature for an Extended Provider + * see: https://github.com/ipni/go-libipni/blob/afe2d8ea45b86c2a22f756ee521741c8f99675e5/ingest/schema/envelope.go#L125 + * @param {object} previous + * @param {PeerId} peerId - advertisment peerId + * @param {CID} cid - entries CID + * @param {Buffer} contextId - derived from cid + * @param {Provider} provider - provider to sign + * @param {boolean} extendedProviderOverride + */ +async function providerSignature (previous, peerId, cid, contextId, provider, extendedProviderOverride = false) { + const sigBuf = Buffer.concat([ + previous ? Buffer.from(CID.parse(previous['/']).bytes) : Buffer.alloc(0), + Buffer.from(cid.bytes), + Buffer.from(peerId.toString(), 'utf-8'), + contextId, + Buffer.from(provider.ID, 'utf-8'), + ...provider.Addresses.map(a => Buffer.from(a, 'utf-8')), + provider.Metadata, + extendedProviderOverride ? Buffer.from([1]) : Buffer.from([0]) // if ad.ExtendedProvider.Override { sigBuf.WriteByte(1)} else { sigBuf.WriteByte(0 } + ]) + + const digest = await sha256.digest(sigBuf) + const payload = digest.bytes + + const sealed = await Envelope.seal( + { + domain: Buffer.from('indexer', 'utf-8'), + codec: Buffer.from('/indexer/ingest/adSignature', 'utf-8'), + marshal: () => payload + }, + peerId + ) + + return sealed.marshal() +} + +/** + * Calculate the signature for an Extended Provider + * see: https://github.com/ipni/go-libipni/blob/afe2d8ea45b86c2a22f756ee521741c8f99675e5/ingest/schema/envelope.go#L125 + * @param {object} previous + * @param {PeerId} peerId - advertisment peerId + * @param {CID} cid - entries CID + * @param {string[]} addresses - multiaddrs + * @param {Buffer} metadata - prefixed with varint for http or bitswap + */ +async function computeAdvertisementSignature(previous, peerId, cid, addresses, metadata) { const payload = ( await sha256.digest( Buffer.concat([ @@ -169,7 +249,22 @@ async function notifyIndexer(cid, peerId) { async function main(event) { try { - const peerId = await getPeerId() + const bsPeerId = await getBitswapPeerId() + const httpPeerId = await getHttpPeerId() + + /** @type {Provider} */ + const httpProvider = { + ID: bsPeerId.toString(), + Addresses: [httpPeerMultiaddr], + Metadata: HTTP_METADATA + } + + /** @type {Provider} */ + const bitswapProvider = { + ID: httpPeerId.toString(), + Addresses: [bitswapPeerMultiaddr], + Metadata: BITSWAP_METADATA + } // Track the latest read cid and advertisementCid let cid @@ -177,20 +272,23 @@ async function main(event) { for (const record of event.Records) { const cidString = record.body + const contextId = Buffer.from(cidString) cid = CID.parse(cidString) const previous = advertisementCid ? { '/': advertisementCid.toString() } : await fetchHeadCid() - const addresses = [bitswapPeerMultiaddr] // Create the advertisement const rawAdvertisement = { - Provider: peerId.toString(), - Addresses: addresses, + Provider: bsPeerId.toString(), + Addresses: bitswapProvider.Addresses, Entries: { '/': cidString }, - ContextID: { '/': { bytes: Buffer.from(cidString).toString('base64') } }, - Metadata: metadata, + ContextID: contextId, + Metadata: BITSWAP_METADATA, IsRm: false, - Signature: await computeAdvertisementSignature(previous, peerId, cid, addresses) + ExtendedProvider: { + Providers: await signProviders([bitswapProvider, httpProvider], previous, bsPeerId, cid, contextId) + }, + Signature: await computeAdvertisementSignature(previous, bsPeerId, cid, bitswapProvider.Addresses, BITSWAP_METADATA) } if (previous) { @@ -206,10 +304,10 @@ async function main(event) { } // Update the head - await updateHead(advertisementCid, peerId) + await updateHead(advertisementCid, bsPeerId) // Notify the indexer-node - await notifyIndexer(advertisementCid, peerId) + await notifyIndexer(advertisementCid, bsPeerId) // Return a empty object to signal we have consumed all the messages return {} diff --git a/test/advertisement.test.js b/test/advertisement.test.js index 4de9945..c88dae8 100644 --- a/test/advertisement.test.js +++ b/test/advertisement.test.js @@ -5,9 +5,11 @@ process.env.HANDLER = 'advertisement' const t = require('tap') const { decode: decodeDAG } = require('@ipld/dag-json') const { MockAgent, setGlobalDispatcher } = require('undici') -const { awsRegion, s3Bucket, indexerNodeUrl } = require('../src/config') +const { awsRegion, s3Bucket, peerIdBucket, indexerNodeUrl } = require('../src/config') const { handler } = require('../src/index') -const { trackAWSUsages } = require('./utils/mock') +const { trackAWSUsages, mockPeerIds } = require('./utils/mock') + +mockPeerIds() t.test('advertisement - creates a new head when none is found and notifies the indexer', async t => { t.plan(8) diff --git a/test/fixtures/peerId-http.json b/test/fixtures/peerId-http.json new file mode 100644 index 0000000..e221ea3 --- /dev/null +++ b/test/fixtures/peerId-http.json @@ -0,0 +1,5 @@ +{ + "id": "Qmb8hPXH8E2kmNWyQvUt4yTXSenBBbd7kUAmWSZ6UksmCC", + "privKey": "CAASqQkwggSlAgEAAoIBAQDLhmHURcpa6rO4py71Pu9i2+4RP+dVc6fUV/ew1UENVHdKG5Enrd9My2wy5T45r3CIPooUAvQqS1UP3y6w+VFBmfHOR17Bd0Jpy8daWeKmxW5K0IE3gGYbZiCoXPrOjM7Zjy7rQvFwtqpC1cnbck/XPylrflynkze/7aVCICjnNGQX/otS/Y9nZdUzwPZrnlmJbev86FA+z8HtSjJAO0oCYvJVqC/owlT7Kcf1PpFP0gspMTxfvd8yjkOrEiYdWETHGy92NExX1lihQTcSxAx/GEuFfwCprG/lnzaX2M4KOQcnzzeAzE+nvlWV7SHH3xBD/GowZOpC5LbN+imVTwe1AgMBAAECggEBALxiuO75YqLhGFXVOhv7ky2YkTYaRpDMKw7cFgLygfJKutg3yBZIVKcKrC44D27pu2oBKWH4kfUMIcI9PUTGobXtPutHGKhPMYQoAXDaPndLzBkoAlNTYDAASj5NQkIqB9VNOYq+PimsitCuftfHaau9ZSOApVbfZ/0ZhZw9lF7ccywrxt2oRs3vWJdpxaNj8pobmMWEBE+st9icfytoYzq0MzeFCKFe1efF6uCbJjjsJN9VlUgmwuoj7LLwk9gui5DTVjWTUZav0fscbdGrTxzDVGN6l1zz61imoYYs6/peaOSpH6ElXqZPVGqx5crX7kISAtVYMwEl1rYvmzdEaYECgYEA5lupGNRmCuNvdul78TkmNpTzo7XxDd3vsuWeD2zSYiHscixag3PwUziOimAFoObE0Q1WrRwUZ0DBMEQlczSglnxwfqcmFTMD10hKZ7iHLDaVSD4VA5SQDjcLB6iGUUJSFGlA3vzbsCAPJ/jiJrTpNXRlAf3pwLmY1wGjpDDYbmUCgYEA4i4UFx+IjRtR7O0sLSPG0PMeMii/H8qvkDOnI+6QVfMM9u7jPfJr3dPgNhGjsiE3KtE9iQ2toAPM7cN1KlFkLtJj1Krbgplq92WUmq0+FGsJ7uJN+v3AW1DMvY3aUKiP36o7YJVFm8aFkJvK28H8dXGowZtnyl4FFjZNr8rYNxECgYEAub1QpFemi8ME/kGXEVK+UCZcKTWrzz2TZ5EjmFZrcHrf9tNyjCmwrrmcnVB9WVaZfeKeb+iDPxoEa5GXDMRzc8OUxVqrJ07CGxUMAYPVYj4EsoJ0WTWxzfRt7VfiLaUnH5QanXIPBX1kGxvf5cvbeqGGTEva7e34oeyNcPd9m80CgYBADcyiv8tsKtCIyKI6JmenxTJIAkIePsh5jidHhchajvqc9ApnAMCLkfVtWpGBYOa+uYMzzSvepTTkIPaNhX9ZwzuCfuCrm8cvSRVMuMNcBPRLMsviyPow4jEZfZLWROZ58TvfnmaYNyCcU4KfudBFcmrYaap7JSVLpRVbM5cIAQKBgQCSP6ekcyZ4zQUTrbnzxhwVHoZqWus5WU9Mhcn32RRIbbMZhU4uRJjy//0MDilgoAbpyA7oUKSYNAZj5QmdMRYyd0lc9+K79eh3qClyZb/zjUcQiJTMSQVSrZHgOsb+sfOd1CYHbHZU8BcYWqBMRdGBxdt/h49JJmDclEYkUF0fYw==", + "pubKey": "CAASpgIwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQDLhmHURcpa6rO4py71Pu9i2+4RP+dVc6fUV/ew1UENVHdKG5Enrd9My2wy5T45r3CIPooUAvQqS1UP3y6w+VFBmfHOR17Bd0Jpy8daWeKmxW5K0IE3gGYbZiCoXPrOjM7Zjy7rQvFwtqpC1cnbck/XPylrflynkze/7aVCICjnNGQX/otS/Y9nZdUzwPZrnlmJbev86FA+z8HtSjJAO0oCYvJVqC/owlT7Kcf1PpFP0gspMTxfvd8yjkOrEiYdWETHGy92NExX1lihQTcSxAx/GEuFfwCprG/lnzaX2M4KOQcnzzeAzE+nvlWV7SHH3xBD/GowZOpC5LbN+imVTwe1AgMBAAE=" +} diff --git a/test/miscellaneous.test.js b/test/miscellaneous.test.js index da28d20..e441b0a 100644 --- a/test/miscellaneous.test.js +++ b/test/miscellaneous.test.js @@ -8,7 +8,7 @@ const { resolve } = require('path') const { createFromJSON } = require('peer-id') const { Readable } = require('stream') const t = require('tap') -const { getPeerId } = require('../src/config') +const { getBitswapPeerId } = require('../src/config') const { logger } = require('../src/logging') const telemetry = require('../src/telemetry') const { s3Mock } = require('./utils/mock') @@ -16,7 +16,7 @@ const { s3Mock } = require('./utils/mock') t.test('config - download the peerId from S3', async t => { t.plan(3) - const rawPeer = await readFile(resolve(process.cwd(), 'test/fixtures/peerId.json')) + const bsPeer = require('./fixtures/peerId.json') s3Mock.on(GetObjectCommand).callsFake(async params => { t.equal(params.Bucket, 'idBucket') @@ -25,22 +25,19 @@ t.test('config - download the peerId from S3', async t => { return { Body: Readable.from(rawPeer) } }) - t.equal((await getPeerId()).toB58String(), (await createFromJSON(JSON.parse(rawPeer))).toB58String()) + t.equal((await getBitswapPeerId()).toB58String(), (await createFromJSON(bsPeer).toB58String()) }) -t.test('config - creates a new PeerId if download fails', async t => { +t.test('config - fails if PeerId not set', async t => { t.plan(3) - const rawPeer = await readFile(resolve(process.cwd(), 'test/fixtures/peerId.json')) - s3Mock.on(GetObjectCommand).callsFake(async params => { t.equal(params.Bucket, 'idBucket') t.equal(params.Key, 'peerId.json') return { Body: Readable.from('INVALID', 'utf-8') } }) - - t.not((await getPeerId()).toB58String(), (await createFromJSON(JSON.parse(rawPeer))).toB58String()) + t.throws(async () => await getBitswapPeerId()) }) t.test('telemetry', async t => { diff --git a/test/utils/env b/test/utils/env index cef591d..21f1416 100644 --- a/test/utils/env +++ b/test/utils/env @@ -1,3 +1,5 @@ +HTTP_PEER_MULTIADDR=/dns4/freeway.dag.house/tcp/443/https BITSWAP_PEER_MULTIADDR=/ip4/12.34.56.78/tcp/999/ws INDEXER_NODE_URL=http://87.65.43.21:2345 -S3_BUCKET=advertisements \ No newline at end of file +S3_BUCKET=advertisements +PEER_ID_S3_BUCKET=peerids \ No newline at end of file diff --git a/test/utils/mock.js b/test/utils/mock.js index 5d4a917..dc7dec5 100644 --- a/test/utils/mock.js +++ b/test/utils/mock.js @@ -1,12 +1,16 @@ 'use strict' +const { Readable } = require('stream') const { SQSClient, SendMessageCommand } = require('@aws-sdk/client-sqs') -const { S3Client, PutObjectCommand } = require('@aws-sdk/client-s3') +const { S3Client, PutObjectCommand, GetObjectCommand } = require('@aws-sdk/client-s3') const { mockClient } = require('aws-sdk-client-mock') const sqsMock = mockClient(SQSClient) const s3Mock = mockClient(S3Client) +const bsPeerJson = require('../fixtures/peerId.json') +const httpPeerJson = require('../fixtures/peerId-http.json') + function trackAWSUsages(t, failed = false) { t.context = { s3: { puts: [] }, @@ -26,8 +30,16 @@ function trackAWSUsages(t, failed = false) { }) } +function mockPeerIds () { + s3Mock.on(GetObjectCommand, { Key: 'peerId.json' }) + .callsFake(() => ({ Body: Readable.from(JSON.stringify(bsPeerJson)) })) + s3Mock.on(GetObjectCommand, { Key: 'peerId-http.json' }) + .callsFake(() => ({ Body: Readable.from(JSON.stringify(httpPeerJson)) })) +} + module.exports = { s3Mock, sqsMock, - trackAWSUsages + trackAWSUsages, + mockPeerIds }