Skip to content

Commit

Permalink
feat: Prover node checks txs availability before sending quote
Browse files Browse the repository at this point in the history
Fixes #10803
  • Loading branch information
spalladino committed Dec 24, 2024
1 parent c8dcd8f commit 5e02272
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 31 deletions.
19 changes: 5 additions & 14 deletions yarn-project/prover-node/src/job/epoch-proving-job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@ export class EpochProvingJob implements Traceable {
private dbProvider: ForkMerkleTreeOperations,
private epochNumber: bigint,
private blocks: L2Block[],
private txs: Tx[],
private prover: EpochProver,
private publicProcessorFactory: PublicProcessorFactory,
private publisher: L1Publisher,
private l2BlockSource: L2BlockSource,
private l1ToL2MessageSource: L1ToL2MessageSource,
private coordination: ProverCoordination,
private metrics: ProverNodeMetrics,
private config: { parallelBlockLimit: number } = { parallelBlockLimit: 32 },
private cleanUp: (job: EpochProvingJob) => Promise<void> = () => Promise.resolve(),
Expand Down Expand Up @@ -92,10 +92,9 @@ export class EpochProvingJob implements Traceable {

await asyncPool(this.config.parallelBlockLimit, this.blocks, async block => {
const globalVariables = block.header.globalVariables;
const txHashes = block.body.txEffects.map(tx => tx.txHash);
const txCount = block.body.numberOfTxsIncludingPadded;
const txs = this.getTxs(block);
const l1ToL2Messages = await this.getL1ToL2Messages(block);
const txs = await this.getTxs(txHashes, block.number);
const previousHeader = await this.getBlockHeader(block.number - 1);

this.log.verbose(`Starting processing block ${block.number}`, {
Expand Down Expand Up @@ -162,17 +161,9 @@ export class EpochProvingJob implements Traceable {
return this.l2BlockSource.getBlockHeader(blockNumber);
}

private async getTxs(txHashes: TxHash[], blockNumber: number): Promise<Tx[]> {
const txs = await Promise.all(
txHashes.map(txHash => this.coordination.getTxByHash(txHash).then(tx => [txHash, tx] as const)),
);
const notFound = txs.filter(([_, tx]) => !tx);
if (notFound.length) {
throw new Error(
`Txs not found for block ${blockNumber}: ${notFound.map(([txHash]) => txHash.toString()).join(', ')}`,
);
}
return txs.map(([_, tx]) => tx!);
private getTxs(block: L2Block): Tx[] {
const txHashes = block.body.txEffects.map(tx => tx.txHash);
return this.txs.filter(tx => txHashes.includes(tx.getTxHash()));
}

private getL1ToL2Messages(block: L2Block) {
Expand Down
41 changes: 36 additions & 5 deletions yarn-project/prover-node/src/prover-node.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import {
type Body,
type EpochProofClaim,
EpochProofQuote,
EpochProofQuotePayload,
Expand All @@ -9,6 +10,9 @@ import {
type MerkleTreeWriteOperations,
P2PClientType,
type ProverCoordination,
type Tx,
type TxEffect,
TxHash,
WorldStateRunningState,
type WorldStateSynchronizer,
} from '@aztec/circuit-types';
Expand Down Expand Up @@ -44,7 +48,8 @@ describe('prover-node', () => {
let l1ToL2MessageSource: MockProxy<L1ToL2MessageSource>;
let contractDataSource: MockProxy<ContractDataSource>;
let worldState: MockProxy<WorldStateSynchronizer>;
let coordination: MockProxy<ProverCoordination> | ProverCoordination;
let coordination: ProverCoordination;
let mockCoordination: MockProxy<ProverCoordination>;
let quoteProvider: MockProxy<QuoteProvider>;
let quoteSigner: MockProxy<QuoteSigner>;
let bondManager: MockProxy<BondManager>;
Expand Down Expand Up @@ -108,7 +113,8 @@ describe('prover-node', () => {
l1ToL2MessageSource = mock<L1ToL2MessageSource>();
contractDataSource = mock<ContractDataSource>();
worldState = mock<WorldStateSynchronizer>();
coordination = mock<ProverCoordination>();
mockCoordination = mock<ProverCoordination>();
coordination = mockCoordination;
quoteProvider = mock<QuoteProvider>();
quoteSigner = mock<QuoteSigner>();
bondManager = mock<BondManager>();
Expand All @@ -134,10 +140,23 @@ describe('prover-node', () => {
// Signer returns an empty signature
quoteSigner.sign.mockImplementation(payload => Promise.resolve(new EpochProofQuote(payload, Signature.empty())));

// We create 3 fake blocks with 1 tx effect each
blocks = times(3, i =>
mock<L2Block>({
number: i + 20,
hash: () => new Fr(i),
body: mock<Body>({ txEffects: [mock<TxEffect>({ txHash: TxHash.random() } as TxEffect)] }),
}),
);

// Archiver returns a bunch of fake blocks
blocks = times(3, i => mock<L2Block>({ number: i + 20, hash: () => new Fr(i) }));
l2BlockSource.getBlocksForEpoch.mockResolvedValue(blocks);

// Coordination plays along and returns a tx whenever requested
mockCoordination.getTxByHash.mockImplementation(hash =>
Promise.resolve(mock<Tx>({ getTxHash: () => hash, tryGetTxHash: () => hash })),
);

// A sample claim
claim = { epochToProve: 10n, bondProvider: address } as EpochProofClaim;

Expand Down Expand Up @@ -175,6 +194,12 @@ describe('prover-node', () => {
expect(coordination.addEpochProofQuote).not.toHaveBeenCalled();
});

it('does not send a quote if there is a tx missing from coordinator', async () => {
mockCoordination.getTxByHash.mockResolvedValue(undefined);
await proverNode.handleEpochCompleted(10n);
expect(coordination.addEpochProofQuote).not.toHaveBeenCalled();
});

it('does not send a quote on a finished epoch if the provider does not return one', async () => {
quoteProvider.getQuote.mockResolvedValue(undefined);
await proverNode.handleEpochCompleted(10n);
Expand Down Expand Up @@ -309,7 +334,7 @@ describe('prover-node', () => {
// Things to test
// - Another aztec node receives the proof quote via p2p
// - The prover node can get the it is missing via p2p, or it has them in it's mempool
describe('Using a p2p coordination', () => {
describe('using a p2p coordination', () => {
let bootnode: BootstrapNode;
let epochCache: MockProxy<EpochCache>;
let p2pClient: P2PClient<P2PClientType.Prover>;
Expand Down Expand Up @@ -346,6 +371,11 @@ describe('prover-node', () => {
// Set the p2p client to be the coordination method
coordination = p2pClient;

// But still mock getTxByHash
const mockGetTxByHash = (hash: TxHash) => Promise.resolve(mock<Tx>({ getTxHash: () => hash }));
jest.spyOn(p2pClient, 'getTxByHash').mockImplementation(mockGetTxByHash);
jest.spyOn(otherP2PClient, 'getTxByHash').mockImplementation(mockGetTxByHash);

await Promise.all([p2pClient.start(), otherP2PClient.start()]);

// Sleep to enable peer discovery
Expand Down Expand Up @@ -373,7 +403,7 @@ describe('prover-node', () => {
await proverNode.stop();
});

it('Should send a proof quote via p2p to another node', async () => {
it('should send a proof quote via p2p to another node', async () => {
const epochNumber = 10n;
epochCache.getEpochAndSlotNow.mockReturnValue({
epoch: epochNumber,
Expand Down Expand Up @@ -412,6 +442,7 @@ describe('prover-node', () => {
protected override doCreateEpochProvingJob(
epochNumber: bigint,
_blocks: L2Block[],
_txs: Tx[],
_publicProcessorFactory: PublicProcessorFactory,
cleanUp: (job: EpochProvingJob) => Promise<void>,
): EpochProvingJob {
Expand Down
60 changes: 48 additions & 12 deletions yarn-project/prover-node/src/prover-node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {
type ProverCoordination,
type ProverNodeApi,
type Service,
type Tx,
type WorldStateSynchronizer,
tryStop,
} from '@aztec/circuit-types';
Expand Down Expand Up @@ -49,6 +50,7 @@ export class ProverNode implements ClaimsMonitorHandler, EpochMonitorHandler, Pr

private latestEpochWeAreProving: bigint | undefined;
private jobs: Map<string, EpochProvingJob> = new Map();
private cachedEpochData: { epochNumber: bigint; blocks: L2Block[]; txs: Tx[] } | undefined = undefined;
private options: ProverNodeOptions;
private metrics: ProverNodeMetrics;

Expand Down Expand Up @@ -139,13 +141,12 @@ export class ProverNode implements ClaimsMonitorHandler, EpochMonitorHandler, Pr
*/
async handleEpochCompleted(epochNumber: bigint): Promise<void> {
try {
// Construct a quote for the epoch
const blocks = await this.l2BlockSource.getBlocksForEpoch(epochNumber);
if (blocks.length === 0) {
this.log.info(`No blocks found for epoch ${epochNumber}`);
return;
}
// Gather data for the epoch
const epochData = await this.gatherEpochData(epochNumber);
const { blocks } = epochData;
this.cachedEpochData = { epochNumber, ...epochData };

// Construct a quote for the epoch
const partialQuote = await this.quoteProvider.getQuote(Number(epochNumber), blocks);
if (!partialQuote) {
this.log.info(`No quote produced for epoch ${epochNumber}`);
Expand Down Expand Up @@ -256,10 +257,9 @@ export class ProverNode implements ClaimsMonitorHandler, EpochMonitorHandler, Pr
}

// Gather blocks for this epoch
const blocks = await this.l2BlockSource.getBlocksForEpoch(epochNumber);
if (blocks.length === 0) {
throw new Error(`No blocks found for epoch ${epochNumber}`);
}
const cachedEpochData = this.cachedEpochData?.epochNumber === epochNumber ? this.cachedEpochData : undefined;
const { blocks, txs } = cachedEpochData ?? (await this.gatherEpochData(epochNumber));

const fromBlock = blocks[0].number;
const toBlock = blocks.at(-1)!.number;

Expand All @@ -279,28 +279,64 @@ export class ProverNode implements ClaimsMonitorHandler, EpochMonitorHandler, Pr
return Promise.resolve();
};

const job = this.doCreateEpochProvingJob(epochNumber, blocks, publicProcessorFactory, cleanUp);
const job = this.doCreateEpochProvingJob(epochNumber, blocks, txs, publicProcessorFactory, cleanUp);
this.jobs.set(job.getId(), job);
return job;
}

@trackSpan('ProverNode.gatherEpochData', epochNumber => ({ [Attributes.EPOCH_NUMBER]: Number(epochNumber) }))
private async gatherEpochData(epochNumber: bigint) {
// Gather blocks for this epoch and their txs
const blocks = await this.gatherBlocks(epochNumber);
const txs = await this.gatherTxs(epochNumber, blocks);

return { blocks, txs };
}

private async gatherBlocks(epochNumber: bigint) {
const blocks = await this.l2BlockSource.getBlocksForEpoch(epochNumber);
if (blocks.length === 0) {
throw new Error(`No blocks found for epoch ${epochNumber}`);
}
return blocks;
}

private async gatherTxs(epochNumber: bigint, blocks: L2Block[]) {
const txs = await Promise.all(
blocks.flatMap(block =>
block.body.txEffects
.map(tx => tx.txHash)
.map(txHash => this.coordination.getTxByHash(txHash).then(tx => [block.number, txHash, tx] as const)),
),
);

const notFound = txs.filter(([_blockNum, _txHash, tx]) => !tx);
if (notFound.length) {
const notFoundList = notFound.map(([blockNum, txHash]) => `${txHash.toString()} (block ${blockNum})`).join(', ');
throw new Error(`Txs not found for epoch ${epochNumber}: ${notFoundList}`);
}

return txs.map(([_blockNumber, _txHash, tx]) => tx!);
}

/** Extracted for testing purposes. */
protected doCreateEpochProvingJob(
epochNumber: bigint,
blocks: L2Block[],
txs: Tx[],
publicProcessorFactory: PublicProcessorFactory,
cleanUp: () => Promise<void>,
) {
return new EpochProvingJob(
this.worldState,
epochNumber,
blocks,
txs,
this.prover.createEpochProver(),
publicProcessorFactory,
this.publisher,
this.l2BlockSource,
this.l1ToL2MessageSource,
this.coordination,
this.metrics,
{ parallelBlockLimit: this.options.maxParallelBlocksPerEpoch },
cleanUp,
Expand Down

0 comments on commit 5e02272

Please sign in to comment.