Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add unhappy paths #2

Merged
merged 8 commits into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 49 additions & 21 deletions client.js
Original file line number Diff line number Diff line change
@@ -1,45 +1,73 @@
const { once } = require('events')
const crypto = require('crypto')
const ReadyResource = require('ready-resource')
const safetyCatch = require('safety-catch')
const RPC = require('protomux-rpc')
const { MetricsReplyEnc } = require('dht-prom-client/lib/encodings')
const b4a = require('b4a')

const PROTOCOL_NAME = 'prometheus-metrics'

class ScraperClient extends ReadyResource {
constructor (dht, promClientPublicKey) {
constructor (swarm, promClientPubKey) {
super()

this.dht = dht
this.key = promClientPublicKey
this.swarm = swarm
this.targetKey = promClientPubKey

this.rpc = null
this.socket = null
this._currentConnUid = null

this._boundConnectionHandler = this._connectionHandler.bind(this)
this.swarm.on('connection', this._boundConnectionHandler)

// Handles reconnects/suspends
this.swarm.joinPeer(this.targetKey)
}

async _open () {
// TODO: auto reconnect
// TODO: retry on failure
// TODO: define a keepAlive
// TODO: handle error paths (peer not available etc)
this.socket = this.dht.connect(this.key)
this.socket.on('error', safetyCatch)
_open () { }

_close () {
this.swarm.off('connection', this._boundConnectionHandler)
this.swarm.leavePeer(this.targetKey)

await this.socket.opened
if (this.rpc) this.rpc.destroy()
if (this.socket) this.socket.destroy()
}

if (!this.socket.connected) {
throw new Error('Could not open socket')
_connectionHandler (socket, peerInfo) {
if (!b4a.equals(peerInfo.publicKey, this.targetKey)) {
// Not our connection
return
}

this.rpc = new RPC(this.socket, { protocol: 'prometheus-metrics' })
await once(this.rpc, 'open')
}
const connUid = crypto.randomUUID() // TODO: check if actually needed
this._currentConnUid = connUid

const rpc = new RPC(socket, { protocol: PROTOCOL_NAME })

socket.on('error', safetyCatch)
socket.on('close', () => {
if (connUid === this._currentConnUid) {
// No other connection arrived in the the mean time
this.socket = null
this.rpc = null
this._currentConnUid = null
}

async _close () {
this.rpc?.destroy()
this.socket?.destroy()
rpc.destroy()
})

this.socket = socket
this.rpc = rpc
}

async lookup () {
if (!this.opened) await this.ready()
if (!this.rpc) throw new Error('Not connected')

if (this.rpc && !this.rpc.opened) await this.rpc.fullyOpened()

// Note: can throw (for example if rpc closed in the mean time)
const res = await this.rpc.request(
'metrics',
null,
Expand Down
33 changes: 25 additions & 8 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ const ReadyResource = require('ready-resource')
const idEnc = require('hypercore-id-encoding')
const b4a = require('b4a')
const safetyCatch = require('safety-catch')
const Hyperswarm = require('hyperswarm')

class PrometheusDhtBridge extends ReadyResource {
constructor (dht, server) {
super()

this.dht = dht
this.swarm = new Hyperswarm({ dht })

this.server = server
this.server.get(
Expand All @@ -20,29 +21,35 @@ class PrometheusDhtBridge extends ReadyResource {
this.aliases = new Map() // alias->scrapeClient
}

get dht () {
return this.swarm.dht
}

get publicKey () {
return this.dht.defaultKeyPair.publicKey
return this.swarm.keyPair.publicKey
}

async _close () {
await Promise.all([
[...this.aliases.values()].map(a => a.close())
])

await this.swarm.destroy()
}

putAlias (alias, targetPubKey) {
targetPubKey = idEnc.decode(idEnc.normalize(targetPubKey))
const current = this.aliases.get(alias)

if (current) {
if (b4a.equals(current.key, targetPubKey)) {
if (b4a.equals(current.targetKey, targetPubKey)) {
return // Idempotent
}

current.close().catch(safetyCatch)
}

const scrapeClient = new ScraperClient(this.dht, targetPubKey)
const scrapeClient = new ScraperClient(this.swarm, targetPubKey)
this.aliases.set(alias, scrapeClient)
}

Expand All @@ -52,17 +59,27 @@ class PrometheusDhtBridge extends ReadyResource {
const scrapeClient = this.aliases.get(alias)

if (!scrapeClient) {
// TODO: 404 code
throw new Error('Unkown alias')
reply.code(404)
reply.send('Unknown alias')
return
}

if (!scrapeClient.opened) await scrapeClient.ready()

const res = await scrapeClient.lookup()
let res
try {
res = await scrapeClient.lookup()
} catch (e) {
this.emit('upstream-error', e)
reply.code(502)
reply.send('Upstream unavailable')
}

if (res.success) {
reply.send(res.metrics)
} else {
// TODO:
reply.code(502)
reply.send(`Upstream error: ${res.errorMessage}`)
}
}
}
Expand Down
5 changes: 4 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,13 @@
"standard": "^17.1.0"
},
"dependencies": {
"b4a": "^1.6.6",
"debounceify": "^1.1.0",
"dht-prom-client": "^0.0.1-alpha.1",
"fastify": "^4.28.0",
"hypercore-id-encoding": "^1.3.0",
"protomux-rpc": "^1.5.2",
"hyperswarm": "^4.7.15",
"protomux-rpc": "holepunchto/protomux-rpc#main",
"ready-resource": "^1.1.1",
"safety-catch": "^1.0.2"
}
Expand Down
83 changes: 82 additions & 1 deletion test.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@ test('put alias + lookup happy flow', async t => {
const baseUrl = await bridge.server.listen({ host: '127.0.0.1', port: 0 })

bridge.putAlias('dummy', dhtPromClient.publicKey)
await bridge.swarm.flush() // Avoid race condition

const res = await axios.get(`${baseUrl}/scrape/dummy/metrics`)
const res = await axios.get(
`${baseUrl}/scrape/dummy/metrics`,
{ validateStatus: null }
)
t.is(res.status, 200, 'correct status')
t.is(
res.data.includes('process_cpu_user_seconds_total'),
Expand All @@ -26,6 +30,83 @@ test('put alias + lookup happy flow', async t => {
)
})

test('404 on unknown alias', async t => {
const { bridge } = await setup(t)

await bridge.ready()

const baseUrl = await bridge.server.listen({ host: '127.0.0.1', port: 0 })

const res = await axios.get(
`${baseUrl}/scrape/nothinghere/metrics`,
{ validateStatus: null }
)
t.is(res.status, 404, 'correct status')
t.is(
res.data.includes('Unknown alias'),
true,
'Sensible err msg'
)
})

test('502 with uid if upstream returns success: false', async t => {
const { bridge, dhtPromClient } = await setup(t)

new promClient.Gauge({ // eslint-disable-line no-new
name: 'broken_metric',
help: 'A metric which throws on collecting it',
collect () {
throw new Error('I break stuff')
}
})

let reqUid = null
dhtPromClient.on('metrics-request', ({ uid }) => {
reqUid = uid
})

await dhtPromClient.ready()
await bridge.ready()

const baseUrl = await bridge.server.listen({ host: '127.0.0.1', port: 0 })
bridge.putAlias('dummy', dhtPromClient.publicKey)
await bridge.swarm.flush() // Avoid race condition

const res = await axios.get(
`${baseUrl}/scrape/dummy/metrics`,
{ validateStatus: null }
)
t.is(res.status, 502, 'correct status')
t.is(
res.data.includes(reqUid),
true,
'uid included in error message'
)
})

test('502 if upstream unavailable', async t => {
const { bridge, dhtPromClient } = await setup(t)

await dhtPromClient.ready()
await bridge.ready()

const baseUrl = await bridge.server.listen({ host: '127.0.0.1', port: 0 })
bridge.putAlias('dummy', dhtPromClient.publicKey)

await dhtPromClient.close()

const res = await axios.get(
`${baseUrl}/scrape/dummy/metrics`,
{ validateStatus: null }
)
t.is(res.status, 502, 'correct status')
t.is(
res.data,
'Upstream unavailable',
'uid included in error message'
)
})

test('No new alias if adding same key', async t => {
const { bridge } = await setup(t)
const key = 'a'.repeat(64)
Expand Down
Loading