Skip to content

Commit

Permalink
refactor: break fraudAssessment into evaluations (#442)
Browse files Browse the repository at this point in the history
Break `Measurement.fraudAssessment` into two new fields:

- `taskingEvaluation` for the tasking-related checks
  Example: DUP_INET_GROUP

- `consensusEvaluation ` for the result of the majority seeking
  process based on committees.
  Example: MINORITY_RESULT

After this change, places filtering "accepted" measurements have to
explicitly spell out how they define "accepted".

- Some places are interested in tasking evaluation results only
  and consider minority results as "accepted" too. Example: RSR
  calculated from individual measurements.

- Other places are stricter and want only measurements in majority.
  Example: which measurements to reward.

However, this pull request is pure refactoring with no change in the 
functionality.

Signed-off-by: Miroslav Bajtoš <[email protected]>
Co-authored-by: Julian Gruber <[email protected]>
  • Loading branch information
bajtos and juliangruber authored Jan 27, 2025
1 parent b59eab7 commit 0f6d5bf
Show file tree
Hide file tree
Showing 16 changed files with 301 additions and 190 deletions.
2 changes: 1 addition & 1 deletion bin/dry-run.js
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ if (ignoredErrors.length) {
}

if (DUMP) {
const props = ['cid', 'minerId', 'participantAddress', 'inet_group', 'retrievalResult', 'fraudAssessment']
const props = ['cid', 'minerId', 'participantAddress', 'inet_group', 'retrievalResult', 'taskingEvaluation', 'consensusEvaluation']
for (const k of Object.keys(round.measurements[0])) {
if (!props.includes(k)) props.push(k)
}
Expand Down
32 changes: 19 additions & 13 deletions bin/evaluate-measurements.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ const EVALUATION_NDJSON_FILE = `${basename(measurementsPath, '.ndjson')}.evaluat
const evaluationTxtWriter = fs.createWriteStream(EVALUATION_TXT_FILE)
const evaluationNdjsonWriter = fs.createWriteStream(EVALUATION_NDJSON_FILE)

evaluationTxtWriter.write(formatHeader({ includeFraudAssesment: keepRejected }) + '\n')
evaluationTxtWriter.write(formatHeader({ includeEvaluation: keepRejected }) + '\n')

const resultCounts = {
total: 0
Expand Down Expand Up @@ -98,22 +98,26 @@ async function processRound (roundIndex, measurements, resultCounts) {
})

for (const m of round.measurements) {
if (m.fraudAssessment !== 'OK') continue
// FIXME: we should include non-majority measurements too
// See https://github.com/filecoin-station/spark-evaluate/pull/396
if (m.taskingEvaluation !== 'OK' && m.consensusEvaluation === 'MAJORITY_RESULT') continue
resultCounts.total++
resultCounts[m.retrievalResult] = (resultCounts[m.retrievalResult] ?? 0) + 1
}

if (!keepRejected) {
round.measurements = round.measurements
// Keep accepted measurements only
.filter(m => m.fraudAssessment === 'OK')
// Remove the fraudAssessment field as all accepted measurements have the same 'OK' value
.map(m => ({ ...m, fraudAssessment: undefined }))
// FIXME: we should include non-majority measurements too
// See https://github.com/filecoin-station/spark-evaluate/pull/396
.filter(m => m.taskingEvaluation === 'OK' && m.consensusEvaluation === 'MAJORITY_RESULT')
// Remove the taskingEvaluation and consensusEvaluation fields as all accepted measurements have the same value
.map(m => ({ ...m, taskingEvaluation: undefined, majorityEvaluation: undefined }))
}

evaluationTxtWriter.write(
round.measurements
.map(m => formatMeasurement(m, { includeFraudAssesment: keepRejected }) + '\n')
.map(m => formatMeasurement(m, { includeEvaluation: keepRejected }) + '\n')
.join('')
)
evaluationNdjsonWriter.write(
Expand Down Expand Up @@ -144,17 +148,19 @@ function isFlagEnabled (envVarValue) {
/**
* @param {import('../lib/preprocess.js').Measurement} m
* @param {object} options
* @param {boolean} [options.includeFraudAssesment]
* @param {boolean} [options.includeEvaluation]
*/
function formatMeasurement (m, { includeFraudAssesment } = {}) {
function formatMeasurement (m, { includeEvaluation } = {}) {
const fields = [
new Date(m.finished_at).toISOString(),
(m.cid ?? '').padEnd(70),
(m.protocol ?? '').padEnd(10)
]

if (includeFraudAssesment) {
fields.push((m.fraudAssessment === 'OK' ? '🫡 ' : '🙅 '))
if (includeEvaluation) {
// FIXME: we should distinguish tasking and majority evaluation
// See https://github.com/filecoin-station/spark-evaluate/pull/396
fields.push((m.taskingEvaluation === 'OK' && m.consensusEvaluation === 'MAJORITY_RESULT' ? '🫡 ' : '🙅 '))
}

fields.push((m.retrievalResult ?? ''))
Expand All @@ -164,16 +170,16 @@ function formatMeasurement (m, { includeFraudAssesment } = {}) {

/**
* @param {object} options
* @param {boolean} [options.includeFraudAssesment]
* @param {boolean} [options.includeEvaluation]
*/
function formatHeader ({ includeFraudAssesment } = {}) {
function formatHeader ({ includeEvaluation } = {}) {
const fields = [
'Timestamp'.padEnd(new Date().toISOString().length),
'CID'.padEnd(70),
'Protocol'.padEnd(10)
]

if (includeFraudAssesment) {
if (includeEvaluation) {
fields.push('🕵️ ')
}

Expand Down
43 changes: 23 additions & 20 deletions lib/committee.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,18 @@ import createDebug from 'debug'
import { getTaskId } from './retrieval-stats.js'

/** @import {Measurement} from './preprocess.js' */
/** @import {RetrievalResult, CommitteeCheckError} from './typings.js' */
/** @import {RetrievalResult, ConsensusNotFoundReason} from './typings.js' */

const debug = createDebug('spark:committee')

/** @typedef {Map<string, Committee>} TaskIdToCommitteeMap */

/** @typedef {{
hasIndexMajority: boolean;
indexerResult: string | CommitteeCheckError;
hasRetrievalMajority: boolean;
indexMajorityFound: boolean;
indexerResult: string | ConsensusNotFoundReason;
retrievalMajorityFound: boolean;
retrievalResult: RetrievalResult
}} CommitteeEvaluation
}} CommitteeDecision
*/
export class Committee {
/** @type {Measurement[]} */
Expand All @@ -28,8 +28,8 @@ export class Committee {

this.#measurements = []

/** @type {CommitteeEvaluation | undefined} */
this.evaluation = undefined
/** @type {CommitteeDecision | undefined} */
this.decision = undefined
}

get size () {
Expand All @@ -48,7 +48,7 @@ export class Committee {
addMeasurement (m) {
assert.strictEqual(m.cid, this.retrievalTask.cid, 'cid must match')
assert.strictEqual(m.minerId, this.retrievalTask.minerId, 'minerId must match')
assert.strictEqual(m.fraudAssessment, 'OK', 'only accepted measurements can be added')
assert.strictEqual(m.taskingEvaluation, 'OK', 'only measurements accepted by task evaluation can be added')
this.#measurements.push(m)
}

Expand All @@ -69,13 +69,13 @@ export class Committee {
this.#measurements.length,
requiredCommitteeSize
)
this.evaluation = {
hasIndexMajority: false,
this.decision = {
indexMajorityFound: false,
indexerResult: 'COMMITTEE_TOO_SMALL',
hasRetrievalMajority: false,
retrievalMajorityFound: false,
retrievalResult: 'COMMITTEE_TOO_SMALL'
}
for (const m of this.#measurements) m.fraudAssessment = 'COMMITTEE_TOO_SMALL'
for (const m of this.#measurements) m.consensusEvaluation = 'COMMITTEE_TOO_SMALL'
return
}

Expand All @@ -88,7 +88,7 @@ export class Committee {
'protocol'
]
const indexerResultMajority = this.#findMajority(indexerResultProps)
const hasIndexMajority = !!indexerResultMajority
const indexMajorityFound = !!indexerResultMajority
const indexerResult = indexerResultMajority
? indexerResultMajority.majorityValue.indexerResult
: 'MAJORITY_NOT_FOUND'
Expand All @@ -110,23 +110,26 @@ export class Committee {
]

const retrievalResultMajority = this.#findMajority(retrievalResultProps)
const hasRetrievalMajority = !!retrievalResultMajority
/** @type {CommitteeEvaluation['retrievalResult']} */
const retrievalMajorityFound = !!retrievalResultMajority
/** @type {CommitteeDecision['retrievalResult']} */
let retrievalResult
if (retrievalResultMajority) {
retrievalResult = retrievalResultMajority.majorityValue.retrievalResult
for (const m of retrievalResultMajority.majorityMeasurements) {
m.consensusEvaluation = 'MAJORITY_RESULT'
}
for (const m of retrievalResultMajority.minorityMeasurements) {
m.fraudAssessment = 'MINORITY_RESULT'
m.consensusEvaluation = 'MINORITY_RESULT'
}
} else {
retrievalResult = 'MAJORITY_NOT_FOUND'
for (const m of this.#measurements) m.fraudAssessment = 'MAJORITY_NOT_FOUND'
for (const m of this.#measurements) m.consensusEvaluation = 'MAJORITY_NOT_FOUND'
}

this.evaluation = {
hasIndexMajority,
this.decision = {
indexMajorityFound,
indexerResult,
hasRetrievalMajority,
retrievalMajorityFound,
retrievalResult
}
}
Expand Down
48 changes: 25 additions & 23 deletions lib/evaluate.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,14 @@ export const evaluate = async ({
requiredCommitteeSize,
logger
})
const honestMeasurements = measurements.filter(m => m.fraudAssessment === 'OK')
const measurementsToReward = measurements.filter(
m => m.taskingEvaluation === 'OK' && m.consensusEvaluation === 'MAJORITY_RESULT'
)

// Calculate reward shares
const participants = {}
let sum = 0n
for (const measurement of honestMeasurements) {
for (const measurement of measurementsToReward) {
if (!participants[measurement.participantAddress]) {
participants[measurement.participantAddress] = 0n
}
Expand All @@ -76,7 +78,7 @@ export const evaluate = async ({
for (const [participantAddress, participantTotal] of Object.entries(participants)) {
const score = participantTotal *
MAX_SCORE /
BigInt(honestMeasurements.length)
BigInt(measurementsToReward.length)
participants[participantAddress] = score
sum += score
}
Expand All @@ -88,24 +90,24 @@ export const evaluate = async ({
logger.log('EVALUATE ROUND %s: added %s as rounding to MAX_SCORE', roundIndex, delta)
}

// Calculate aggregates per fraud detection outcome
// Calculate aggregates per evaluation outcome
// This is used for logging and telemetry
/** @type {Partial<Record<import('./typings.js').FraudAssesment, number>>} */
const fraudAssessments = {
/** @type {Partial<Record<import('./typings.js').TaskingEvaluation, number>>} */
const evaluationOutcomes = {
OK: 0,
TASK_NOT_IN_ROUND: 0,
DUP_INET_GROUP: 0,
TOO_MANY_TASKS: 0
}
for (const m of measurements) {
fraudAssessments[m.fraudAssessment] = (fraudAssessments[m.fraudAssessment] ?? 0) + 1
evaluationOutcomes[m.taskingEvaluation] = (evaluationOutcomes[m.taskingEvaluation] ?? 0) + 1
}
logger.log(
'EVALUATE ROUND %s: Evaluated %s measurements, found %s honest entries.\n%o',
'EVALUATE ROUND %s: Evaluated %s measurements, rewarding %s entries.\n%o',
roundIndex,
measurements.length,
honestMeasurements.length,
fraudAssessments
measurementsToReward.length,
evaluationOutcomes
)

const fraudDetectionDuration = Date.now() - started
Expand Down Expand Up @@ -138,11 +140,11 @@ export const evaluate = async ({
point.intField('total_participants', Object.keys(participants).length)
point.intField('total_measurements', measurements.length)
point.intField('total_nodes', countUniqueNodes(measurements))
point.intField('honest_measurements', honestMeasurements.length)
point.intField('honest_measurements', measurementsToReward.length)
point.intField('set_scores_duration_ms', setScoresDuration)
point.intField('fraud_detection_duration_ms', fraudDetectionDuration)

for (const [type, count] of Object.entries(fraudAssessments)) {
for (const [type, count] of Object.entries(evaluationOutcomes)) {
point.intField(`measurements_${type}`, count)
}
})
Expand All @@ -152,7 +154,9 @@ export const evaluate = async ({
try {
recordTelemetry('retrieval_stats_honest', (point) => {
point.intField('round_index', roundIndex)
buildRetrievalStats(honestMeasurements, point)
// FIXME: Include non-majority measurements in these stats
// See https://github.com/filecoin-station/spark-evaluate/issues/446
buildRetrievalStats(measurementsToReward, point)
})
} catch (err) {
console.error('Cannot record retrieval stats (honest).', err)
Expand All @@ -177,7 +181,7 @@ export const evaluate = async ({
point.intField('committees_all', committees.length)
point.intField('committees_too_small',
committees
.filter(c => c.evaluation?.retrievalResult === 'COMMITTEE_TOO_SMALL')
.filter(c => c.decision?.retrievalResult === 'COMMITTEE_TOO_SMALL')
.length
)
recordCommitteeSizes(committees, point)
Expand Down Expand Up @@ -271,8 +275,6 @@ export const runFraudDetection = async ({
// or missing some of the required fields like `inet_group`
//
for (const m of measurements) {
if (m.fraudAssessment) continue

// sanity checks to get nicer errors if we forget to set required fields in unit tests
assert(typeof m.inet_group === 'string', 'missing inet_group')
assert(typeof m.finished_at === 'number', 'missing finished_at')
Expand All @@ -281,15 +283,15 @@ export const runFraudDetection = async ({
t => t.cid === m.cid && t.minerId === m.minerId
)
if (!isValidTask) {
m.fraudAssessment = 'TASK_NOT_IN_ROUND'
m.taskingEvaluation = 'TASK_NOT_IN_ROUND'
continue
}

const isValidTaskForNode = tasksAllowedForStations.get(m.stationId).some(
t => t.cid === m.cid && t.minerId === m.minerId
)
if (!isValidTaskForNode) {
m.fraudAssessment = 'TASK_WRONG_NODE'
m.taskingEvaluation = 'TASK_WRONG_NODE'
}
}

Expand All @@ -299,7 +301,7 @@ export const runFraudDetection = async ({
/** @type {Map<string, Measurement[]>} */
const inetGroups = new Map()
for (const m of measurements) {
if (m.fraudAssessment) continue
if (m.taskingEvaluation) continue

const key = m.inet_group
let group = inetGroups.get(key)
Expand Down Expand Up @@ -347,18 +349,18 @@ export const runFraudDetection = async ({

if (tasksSeen.has(taskId)) {
debug(' pa: %s h: %s task: %s - task was already rewarded', m.participantAddress, h, taskId)
m.fraudAssessment = 'DUP_INET_GROUP'
m.taskingEvaluation = 'DUP_INET_GROUP'
continue
}

if (tasksSeen.size >= sparkRoundDetails.maxTasksPerNode) {
debug(' pa: %s h: %s task: %s - already rewarded max tasks', m.participantAddress, h, taskId)
m.fraudAssessment = 'TOO_MANY_TASKS'
m.taskingEvaluation = 'TOO_MANY_TASKS'
continue
}

tasksSeen.add(taskId)
m.fraudAssessment = 'OK'
m.taskingEvaluation = 'OK'
debug(' pa: %s h: %s task: %s - REWARD', m.participantAddress, h, taskId)
}
}
Expand All @@ -372,7 +374,7 @@ export const runFraudDetection = async ({
// needs is to iterate over the accepted measurements once.
const iterateAcceptedMeasurements = function * () {
for (const m of measurements) {
if (m.fraudAssessment !== 'OK') continue
if (m.taskingEvaluation !== 'OK') continue
yield m
}
}
Expand Down
2 changes: 1 addition & 1 deletion lib/platform-stats.js
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ export const updateStationsAndParticipants = async (
}

stationStats.total++
if (m.fraudAssessment === 'OK') stationStats.accepted++
if (m.taskingEvaluation === 'OK' && m.consensusEvaluation === 'MAJORITY_RESULT') stationStats.accepted++

let subnetsSet = subnets.get(participantId)
if (!subnetsSet) {
Expand Down
6 changes: 4 additions & 2 deletions lib/preprocess.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ export class Measurement {
// Note: providerId is recorded by spark-publish but we don't use it for evaluations yet
this.providerId = pointerize(m.provider_id)
this.spark_version = pointerize(m.spark_version)
/** @type {import('./typings.js').FraudAssesment} */
this.fraudAssessment = null
/** @type {import('./typings.js').TaskingEvaluation} */
this.taskingEvaluation = null
/** @type {import('./typings.js').ConsensusEvaluation} */
this.consensusEvaluation = null
this.inet_group = pointerize(m.inet_group)
this.finished_at = parseDateTime(m.finished_at)
this.provider_address = pointerize(m.provider_address)
Expand Down
Loading

0 comments on commit 0f6d5bf

Please sign in to comment.