Skip to content

Commit

Permalink
Spark TTFB poc (#434)
Browse files Browse the repository at this point in the history
* Include round id in retreival task id

* Add time to first byte public stat

* Remove unused variable

* Make names more generic

* Fix typo

* Ignore on conflict

* Update lib/public-stats.js

Co-authored-by: Julian Gruber <[email protected]>

* Update lib/public-stats.js

Co-authored-by: Julian Gruber <[email protected]>

* Fix typo in public-stats.js

Co-authored-by: Miroslav Bajtoš <[email protected]>

* Rename table retrieval_times to retrieval_timings

* Fix typo

Co-authored-by: Miroslav Bajtoš <[email protected]>

* Aggregate retrieval timing values to array

* Fix typo in tests

* Change migration number

* Remove unused values from test data

* Expand test with update case

* Remove asserts for backwards compatibility

* Ciel ttfb p50 value

* Add follow-up issue for asserting reported ttfb values

* Update test/public-stats.test.js

Co-authored-by: Miroslav Bajtoš <[email protected]>

* Update test/public-stats.test.js

Co-authored-by: Miroslav Bajtoš <[email protected]>

* Update test/public-stats.test.js

Co-authored-by: Miroslav Bajtoš <[email protected]>

* Check if start value is bigger than time to first byte

* Check if ttfb measurment is NaN

* Add ttfb calculation edgecase tests

* Remove ts-ignore

---------

Co-authored-by: Julian Gruber <[email protected]>
Co-authored-by: Miroslav Bajtoš <[email protected]>
  • Loading branch information
3 people authored Jan 14, 2025
1 parent 9f3af99 commit d236813
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 1 deletion.
48 changes: 47 additions & 1 deletion lib/public-stats.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import * as Sentry from '@sentry/node'
import createDebug from 'debug'
import * as providerRetrievalResultStats from './provider-retrieval-result-stats.js'
import { updatePlatformStats } from './platform-stats.js'
import { getTaskId } from './retrieval-stats.js'
import { getTaskId, getValueAtPercentile } from './retrieval-stats.js'

/** @import pg from 'pg' */
/** @import { Committee } from './committee.js' */
Expand All @@ -26,6 +26,7 @@ export const updatePublicStats = async ({ createPgClient, committees, allMeasure
await updateIndexerQueryStats(pgClient, committees)
await updateDailyDealsStats(pgClient, committees, findDealClients)
await updatePlatformStats(pgClient, allMeasurements)
await updateRetrievalTimings(pgClient, committees)
} finally {
await pgClient.end()
}
Expand Down Expand Up @@ -227,3 +228,48 @@ const updateDailyDealsStats = async (pgClient, committees, findDealClients) => {
flatStats.map(stat => stat.retrievable)
])
}

/**
* @param {pg.Client} pgClient
* @param {Iterable<Committee>} committees
*/
const updateRetrievalTimings = async (pgClient, committees) => {
/** @type {Map<string, number[]>} */
const retrievalTimings = new Map()
for (const c of committees) {
if (!c.evaluation || !c.evaluation.hasRetrievalMajority || c.evaluation.retrievalResult !== 'OK') continue
const { minerId } = c.retrievalTask
const ttfbMeasurments = []
for (const m of c.measurements) {
// FIXME: assert first_byte_at and start_at during preprocessing
// See https://github.com/filecoin-station/spark-evaluate/issues/447
if (m.fraudAssessment !== 'OK' || !m.first_byte_at || !m.start_at || m.start_at > m.first_byte_at) continue
const ttfbMeasurment = m.first_byte_at - m.start_at
if (isNaN(ttfbMeasurment)) continue
ttfbMeasurments.push(ttfbMeasurment)
}

if (!retrievalTimings.has(minerId)) {
retrievalTimings.set(minerId, [])
}

const ttfb = Math.ceil(getValueAtPercentile(ttfbMeasurments, 0.5))
retrievalTimings.get(minerId).push(ttfb)
}

// eslint-disable-next-line camelcase
const rows = Array.from(retrievalTimings.entries()).flatMap(([miner_id, ttfb_p50]) => ({ miner_id, ttfb_p50 }))

await pgClient.query(`
INSERT INTO retrieval_timings (day, miner_id, ttfb_p50)
SELECT now(), miner_id, ttfb_p50 FROM jsonb_to_recordset($1::jsonb) AS t (miner_id text, ttfb_p50 int[])
ON CONFLICT (day, miner_id)
DO UPDATE SET
ttfb_p50 = array_cat(
retrieval_timings.ttfb_p50,
EXCLUDED.ttfb_p50
)
`, [
JSON.stringify(rows)
])
}
6 changes: 6 additions & 0 deletions migrations/022.do.add-retrieval-timings.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
CREATE TABLE retrieval_timings (
day DATE NOT NULL,
miner_id TEXT NOT NULL,
ttfb_p50 INT[] NOT NULL,
PRIMARY KEY (day, miner_id)
);
75 changes: 75 additions & 0 deletions test/public-stats.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ describe('public-stats', () => {
await pgClient.query('DELETE FROM retrieval_stats')
await pgClient.query('DELETE FROM indexer_query_stats')
await pgClient.query('DELETE FROM daily_deals')
await pgClient.query('DELETE FROM retrieval_timings')

// Run all tests inside a transaction to ensure `now()` always returns the same value
// See https://dba.stackexchange.com/a/63549/125312
Expand Down Expand Up @@ -560,8 +561,82 @@ describe('public-stats', () => {
})
})

describe('retrieval_times', () => {
it('creates or updates rows for today', async () => {
/** @type {Measurement[]} */
let acceptedMeasurements = [
givenTimeToFirstByte({ ...VALID_MEASUREMENT, cid: 'cidone', minerId: 'f1first', retrievalResult: 'OK' }, 1000),
givenTimeToFirstByte({ ...VALID_MEASUREMENT, cid: 'cidone', minerId: 'f1first', retrievalResult: 'OK' }, 3000),
givenTimeToFirstByte({ ...VALID_MEASUREMENT, cid: 'cidone', minerId: 'f1second', retrievalResult: 'OK' }, 2000),
givenTimeToFirstByte({ ...VALID_MEASUREMENT, cid: 'cidone', minerId: 'f1second', retrievalResult: 'OK' }, 1000),
// measurments with invalid values
givenTimeToFirstByte({ ...VALID_MEASUREMENT, cid: 'cidone', minerId: 'f1second', retrievalResult: 'OK' }, -1000),
{ ...VALID_MEASUREMENT, cid: 'cidone', minerId: 'f1second', retrievalResult: 'OK', first_byte_at: /** @type {any} */('invalid') }
]

/** @type {Measurement[]} */
const rejectedMeasurements = [
givenTimeToFirstByte({ ...VALID_MEASUREMENT, cid: 'cidone', minerId: 'f1first', retrievalResult: 'UNKNOWN_ERROR' }, 100),
givenTimeToFirstByte({ ...VALID_MEASUREMENT, cid: 'cidone', minerId: 'f1first', retrievalResult: 'UNKNOWN_ERROR' }, 200),
givenTimeToFirstByte({ ...VALID_MEASUREMENT, cid: 'cidone', minerId: 'f1first', retrievalResult: 'UNKNOWN_ERROR' }, 300),
givenTimeToFirstByte({ ...VALID_MEASUREMENT, cid: 'cidone', minerId: 'f1second', retrievalResult: 'UNKNOWN_ERROR' }, 300),
givenTimeToFirstByte({ ...VALID_MEASUREMENT, cid: 'cidone', minerId: 'f1second', retrievalResult: 'UNKNOWN_ERROR' }, 200),
givenTimeToFirstByte({ ...VALID_MEASUREMENT, cid: 'cidone', minerId: 'f1second', retrievalResult: 'UNKNOWN_ERROR' }, 100)
]

let allMeasurements = [...acceptedMeasurements, ...rejectedMeasurements]
let committees = buildEvaluatedCommitteesFromMeasurements(acceptedMeasurements)

await updatePublicStats({
createPgClient,
committees,
allMeasurements,
findDealClients: (_minerId, _cid) => ['f0client']
})
const { rows: created } = await pgClient.query(
'SELECT day::TEXT, miner_id, ttfb_p50 FROM retrieval_timings ORDER BY miner_id'
)
assert.deepStrictEqual(created, [
{ day: today, miner_id: 'f1first', ttfb_p50: [2000] },
{ day: today, miner_id: 'f1second', ttfb_p50: [1500] }
])

acceptedMeasurements = [
givenTimeToFirstByte({ ...VALID_MEASUREMENT, cid: 'cidone', minerId: 'f1first', retrievalResult: 'OK' }, 3000),
givenTimeToFirstByte({ ...VALID_MEASUREMENT, cid: 'cidone', minerId: 'f1first', retrievalResult: 'OK' }, 5000),
givenTimeToFirstByte({ ...VALID_MEASUREMENT, cid: 'cidone', minerId: 'f1first', retrievalResult: 'OK' }, 1000)
]
allMeasurements = [...acceptedMeasurements, ...rejectedMeasurements]
committees = buildEvaluatedCommitteesFromMeasurements(acceptedMeasurements)
await updatePublicStats({
createPgClient,
committees,
allMeasurements,
findDealClients: (_minerId, _cid) => ['f0client']
})
const { rows: updated } = await pgClient.query(
'SELECT day::TEXT, miner_id, ttfb_p50 FROM retrieval_timings ORDER BY miner_id'
)
assert.deepStrictEqual(updated, [
{ day: today, miner_id: 'f1first', ttfb_p50: [2000, 3000] },
{ day: today, miner_id: 'f1second', ttfb_p50: [1500] }
])
})
})

const getCurrentDate = async () => {
const { rows: [{ today }] } = await pgClient.query('SELECT now()::DATE::TEXT as today')
return today
}

/**
*
* @param {Measurement} measurement
* @param {number} timeToFirstByte Time in milliseconds
* @returns
*/
function givenTimeToFirstByte (measurement, timeToFirstByte) {
measurement.first_byte_at = measurement.start_at + timeToFirstByte
return measurement
}
})

0 comments on commit d236813

Please sign in to comment.