Skip to content

Commit

Permalink
check height of incoming block (#1276)
Browse files Browse the repository at this point in the history
- prevent processing the same block twice
- prevent processing blocks out-of-order
  • Loading branch information
turbocrime authored Jun 11, 2024
1 parent 729a8b5 commit 6ee8222
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 62 deletions.
5 changes: 5 additions & 0 deletions .changeset/sixty-spiders-thank.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@penumbra-zone/query': patch
---

prevent processing the same block twice
131 changes: 69 additions & 62 deletions packages/query/src/block-processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,67 +116,9 @@ export class BlockProcessor implements BlockProcessorInterface {

public stop = (r: string) => this.abortController.abort(`Sync stop ${r}`);

async identifyTransactions(
spentNullifiers: Set<Nullifier>,
commitmentRecordsByStateCommitment: Map<StateCommitment, SpendableNoteRecord | SwapRecord>,
blockTx: Transaction[],
) {
const relevantTx = new Map<TransactionId, Transaction>();
const recordsWithSources = new Array<SpendableNoteRecord | SwapRecord>();
for (const tx of blockTx) {
let txId: TransactionId | undefined;

const txCommitments = (tx.body?.actions ?? []).flatMap(({ action }) => {
switch (action.case) {
case 'output':
return action.value.body?.notePayload?.noteCommitment;
case 'swap':
return action.value.body?.payload?.commitment;
case 'swapClaim':
return [action.value.body?.output1Commitment, action.value.body?.output2Commitment];
default: // TODO: what other actions have commitments?
return;
}
});

const txNullifiers = (tx.body?.actions ?? []).map(({ action }) => {
switch (action.case) {
case 'spend':
case 'swapClaim':
return action.value.body?.nullifier;
default: // TODO: what other actions have nullifiers?
return;
}
});

for (const spentNullifier of spentNullifiers) {
if (txNullifiers.some(txNullifier => spentNullifier.equals(txNullifier))) {
txId = new TransactionId({ inner: await sha256Hash(tx.toBinary()) });
relevantTx.set(txId, tx);
spentNullifiers.delete(spentNullifier);
}
}

for (const [stateCommitment, spendableNoteRecord] of commitmentRecordsByStateCommitment) {
if (txCommitments.some(txCommitment => stateCommitment.equals(txCommitment))) {
txId ??= new TransactionId({ inner: await sha256Hash(tx.toBinary()) });
relevantTx.set(txId, tx);
if (BLANK_TX_SOURCE.equals(spendableNoteRecord.source)) {
spendableNoteRecord.source = new CommitmentSource({
source: { case: 'transaction', value: { id: txId.inner } },
});
recordsWithSources.push(spendableNoteRecord);
}
commitmentRecordsByStateCommitment.delete(stateCommitment);
}
}
}
return { relevantTx, recordsWithSources };
}

private async syncAndStore() {
// start at next block, or 0 if height is undefined
const startHeight = ((await this.indexedDb.getFullSyncHeight()) ?? -1n) + 1n;
// start at next block, or genesis if height is undefined
let currentHeight = (await this.indexedDb.getFullSyncHeight()) ?? -1n;

// this is the first network query of the block processor. use backoff to
// delay until network is available
Expand All @@ -189,7 +131,7 @@ export class BlockProcessor implements BlockProcessorInterface {
{ retry: () => true },
);

if (startHeight === 0n) {
if (currentHeight === -1n) {
// In the `for` loop below, we only update validator infos once we've
// reached the latest known epoch. This means that, if a user is syncing
// for the first time, they could experience a broken UI until the latest
Expand All @@ -203,10 +145,17 @@ export class BlockProcessor implements BlockProcessorInterface {
// this is an indefinite stream of the (compact) chain from the network
// intended to run continuously
for await (const compactBlock of this.querier.compactBlock.compactBlockRange({
startHeight,
startHeight: currentHeight + 1n,
keepAlive: true,
abortSignal: this.abortController.signal,
})) {
// confirm block height to prevent corruption of local state
if (compactBlock.height === currentHeight + 1n) {
currentHeight = compactBlock.height;
} else {
throw new Error(`Unexpected block height: ${compactBlock.height} at ${currentHeight}`);
}

if (compactBlock.appParametersUpdated) {
await this.indexedDb.saveAppParams(await this.querier.app.appParams());
}
Expand Down Expand Up @@ -377,6 +326,64 @@ export class BlockProcessor implements BlockProcessorInterface {
}
}

private async identifyTransactions(
spentNullifiers: Set<Nullifier>,
commitmentRecordsByStateCommitment: Map<StateCommitment, SpendableNoteRecord | SwapRecord>,
blockTx: Transaction[],
) {
const relevantTx = new Map<TransactionId, Transaction>();
const recordsWithSources = new Array<SpendableNoteRecord | SwapRecord>();
for (const tx of blockTx) {
let txId: TransactionId | undefined;

const txCommitments = (tx.body?.actions ?? []).flatMap(({ action }) => {
switch (action.case) {
case 'output':
return action.value.body?.notePayload?.noteCommitment;
case 'swap':
return action.value.body?.payload?.commitment;
case 'swapClaim':
return [action.value.body?.output1Commitment, action.value.body?.output2Commitment];
default: // TODO: what other actions have commitments?
return;
}
});

const txNullifiers = (tx.body?.actions ?? []).map(({ action }) => {
switch (action.case) {
case 'spend':
case 'swapClaim':
return action.value.body?.nullifier;
default: // TODO: what other actions have nullifiers?
return;
}
});

for (const spentNullifier of spentNullifiers) {
if (txNullifiers.some(txNullifier => spentNullifier.equals(txNullifier))) {
txId = new TransactionId({ inner: await sha256Hash(tx.toBinary()) });
relevantTx.set(txId, tx);
spentNullifiers.delete(spentNullifier);
}
}

for (const [stateCommitment, spendableNoteRecord] of commitmentRecordsByStateCommitment) {
if (txCommitments.some(txCommitment => stateCommitment.equals(txCommitment))) {
txId ??= new TransactionId({ inner: await sha256Hash(tx.toBinary()) });
relevantTx.set(txId, tx);
if (BLANK_TX_SOURCE.equals(spendableNoteRecord.source)) {
spendableNoteRecord.source = new CommitmentSource({
source: { case: 'transaction', value: { id: txId.inner } },
});
recordsWithSources.push(spendableNoteRecord);
}
commitmentRecordsByStateCommitment.delete(stateCommitment);
}
}
}
return { relevantTx, recordsWithSources };
}

private async saveAndReturnMetadata(assetId: AssetId): Promise<Metadata | undefined> {
const metadataAlreadyInDb = await this.indexedDb.getAssetsMetadata(assetId);
if (metadataAlreadyInDb) return metadataAlreadyInDb;
Expand Down

0 comments on commit 6ee8222

Please sign in to comment.