Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement peerDAS on electra #6353

Draft
wants to merge 42 commits into
base: unstable
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
4805a2e
feat: placeholder PR for electra
g11tech Jan 24, 2024
499d93c
feat: implement peerDAS on electra
g11tech Jan 24, 2024
156ef53
fix: docker build issue for c-kzg
matthewkeil Jun 21, 2024
47eedae
feat: get various sync mechanisms working with/without sharded data
g11tech Jul 14, 2024
d423004
feat: add the modifications to work with devnet2
g11tech Jul 14, 2024
a0c5d27
fix: refactor to add and use nodeid computation and clear out nodeid …
g11tech Jul 16, 2024
c7f6341
fix the types/test
g11tech Aug 9, 2024
81aaeb5
feat: add and use metadatav3 for peer custody subnet
g11tech Aug 12, 2024
e6c613f
rename electra fork to peerdas for rebase and make csc in metadata uint8
g11tech Aug 27, 2024
a3533f8
add supernode flag to configure node custody requirement and make it …
g11tech Aug 27, 2024
54579b0
add more info for debugging
g11tech Aug 27, 2024
180f7d8
fix log
g11tech Aug 27, 2024
4b6f167
fix bug
g11tech Aug 27, 2024
ae7678e
fx
g11tech Aug 27, 2024
a33a72f
subnet count 128
g11tech Aug 27, 2024
585165e
remove banning unknown block, addmore log
g11tech Aug 28, 2024
2833ac0
make the csc encoding updates as per latest spec
g11tech Sep 5, 2024
bf08852
resolve availability when datacolumns are downloaded and matched
g11tech Sep 7, 2024
006e781
add debug log
g11tech Sep 7, 2024
aece0ab
fix add missing data availability resolutions
g11tech Sep 10, 2024
387da88
add more log
g11tech Sep 10, 2024
2bc1a0d
add cache tracking
g11tech Sep 10, 2024
5e1de6f
trying some fix
g11tech Sep 10, 2024
d7721f8
fix bug
g11tech Sep 10, 2024
bd84892
more log
g11tech Sep 10, 2024
de341b5
add send more log
g11tech Sep 10, 2024
8c21168
make pull a little less agressive
g11tech Sep 10, 2024
d35873e
further wait till cutoff for all data to be available
g11tech Sep 11, 2024
f7571f4
add some more loggig and availaibility tracking
g11tech Sep 11, 2024
74d8122
add some log for debugging inbound data columns request
g11tech Sep 11, 2024
af933fb
some fixes
g11tech Sep 11, 2024
56c8c6e
custodied column fetch debugging log
g11tech Sep 12, 2024
2b10e4d
datacolumns retrival fix
g11tech Sep 12, 2024
cdd9bae
update compute spec tests
g11tech Sep 12, 2024
c4d04ee
fix the column id compute
g11tech Sep 13, 2024
3470076
more debug log
g11tech Sep 13, 2024
4ec7aff
edge case optimization
g11tech Sep 13, 2024
a33303f
feat: refactor and unit test getDataColumnSidecars (#7072)
matthewkeil Sep 16, 2024
bd4f7f9
feat: update ckzg to final DAS version (#7050)
matthewkeil Sep 16, 2024
b1940ee
fix: remove ckzg build script (#7089)
matthewkeil Sep 17, 2024
20ef4c6
feat: validate data column sidecars (#7073)
matthewkeil Sep 17, 2024
fee7c08
validate inclusion proof
g11tech Sep 17, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
FROM --platform=${BUILDPLATFORM:-amd64} node:22.4-slim as build_src
ARG COMMIT
WORKDIR /usr/app
RUN apt-get update && apt-get install -y g++ make python3 python3-setuptools && apt-get clean && rm -rf /var/lib/apt/lists/*
RUN apt-get update && apt-get install -y git g++ make python3 python3-setuptools && apt-get clean && rm -rf /var/lib/apt/lists/*

COPY . .

Expand All @@ -23,7 +23,7 @@ RUN cd packages/cli && GIT_COMMIT=${COMMIT} yarn write-git-data
# Note: This step is redundant for the host arch
FROM node:22.4-slim as build_deps
WORKDIR /usr/app
RUN apt-get update && apt-get install -y g++ make python3 python3-setuptools && apt-get clean && rm -rf /var/lib/apt/lists/*
RUN apt-get update && apt-get install -y git g++ make python3 python3-setuptools && apt-get clean && rm -rf /var/lib/apt/lists/*

COPY --from=build_src /usr/app .

Expand Down
2 changes: 1 addition & 1 deletion packages/beacon-node/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@
"@lodestar/utils": "^1.21.0",
"@lodestar/validator": "^1.21.0",
"@multiformats/multiaddr": "^12.1.3",
"c-kzg": "^2.1.2",
"c-kzg": "^4.0.1",
"datastore-core": "^9.1.1",
"datastore-level": "^10.1.1",
"deepmerge": "^4.3.1",
Expand Down
49 changes: 39 additions & 10 deletions packages/beacon-node/src/api/impl/beacon/blocks/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ import {
reconstructFullBlockOrContents,
signedBeaconBlockToBlinded,
} from "@lodestar/state-transition";
import {ForkExecution, SLOTS_PER_HISTORICAL_ROOT, isForkExecution} from "@lodestar/params";
import {ForkExecution, SLOTS_PER_HISTORICAL_ROOT, isForkExecution, ForkName} from "@lodestar/params";
import {sleep, fromHex, toHex} from "@lodestar/utils";
import {
peerdas,
deneb,
isSignedBlockContents,
ProducedBlockSource,
Expand All @@ -23,10 +24,13 @@ import {
BlockInput,
BlobsSource,
BlockInputDataBlobs,
BlockInputDataDataColumns,
DataColumnsSource,
BlockInputData,
} from "../../../../chain/blocks/types.js";
import {promiseAllMaybeAsync} from "../../../../util/promises.js";
import {isOptimisticBlock} from "../../../../util/forkChoice.js";
import {computeBlobSidecars} from "../../../../util/blobs.js";
import {computeBlobSidecars, computeDataColumnSidecars} from "../../../../util/blobs.js";
import {BlockError, BlockErrorCode, BlockGossipError} from "../../../../chain/errors/index.js";
import {OpSource} from "../../../../metrics/validatorMonitor.js";
import {NetworkEvent} from "../../../../network/index.js";
Expand Down Expand Up @@ -65,17 +69,40 @@ export function getBeaconBlockApi({
opts: PublishBlockOpts = {}
) => {
const seenTimestampSec = Date.now() / 1000;
let blockForImport: BlockInput, signedBlock: SignedBeaconBlock, blobSidecars: deneb.BlobSidecars;
let blockForImport: BlockInput,
signedBlock: SignedBeaconBlock,
blobSidecars: deneb.BlobSidecars,
dataColumnSidecars: peerdas.DataColumnSidecars;

if (isSignedBlockContents(signedBlockOrContents)) {
({signedBlock} = signedBlockOrContents);
blobSidecars = computeBlobSidecars(config, signedBlock, signedBlockOrContents);
const blockData = {
fork: config.getForkName(signedBlock.message.slot),
blobs: blobSidecars,
blobsSource: BlobsSource.api,
blobsBytes: blobSidecars.map(() => null),
} as BlockInputDataBlobs;
const fork = config.getForkName(signedBlock.message.slot);
let blockData: BlockInputData;
if (fork === ForkName.peerdas) {
dataColumnSidecars = computeDataColumnSidecars(config, signedBlock, signedBlockOrContents);
blockData = {
fork,
dataColumnsLen: dataColumnSidecars.length,
// custodyColumns is a 1 based index of ith column present in dataColumns[custodyColumns[i-1]]
dataColumnsIndex: new Uint8Array(Array.from({length: dataColumnSidecars.length}, (_, j) => 1 + j)),
dataColumns: dataColumnSidecars,
dataColumnsBytes: dataColumnSidecars.map(() => null),
dataColumnsSource: DataColumnsSource.api,
} as BlockInputDataDataColumns;
blobSidecars = [];
} else if (fork === ForkName.deneb) {
blobSidecars = computeBlobSidecars(config, signedBlock, signedBlockOrContents);
blockData = {
fork,
blobs: blobSidecars,
blobsSource: BlobsSource.api,
blobsBytes: blobSidecars.map(() => null),
} as BlockInputDataBlobs;
dataColumnSidecars = [];
} else {
throw Error(`Invalid data fork=${fork} for publish`);
}

blockForImport = getBlockInput.availableData(
config,
signedBlock,
Expand All @@ -87,6 +114,7 @@ export function getBeaconBlockApi({
} else {
signedBlock = signedBlockOrContents;
blobSidecars = [];
dataColumnSidecars = [];
blockForImport = getBlockInput.preData(config, signedBlock, BlockSource.api, context?.sszBytes ?? null);
}

Expand Down Expand Up @@ -221,6 +249,7 @@ export function getBeaconBlockApi({
// b) they might require more hops to reach recipients in peerDAS kind of setup where
// blobs might need to hop between nodes because of partial subnet subscription
...blobSidecars.map((blobSidecar) => () => network.publishBlobSidecar(blobSidecar)),
...dataColumnSidecars.map((dataColumnSidecar) => () => network.publishDataColumnSidecar(dataColumnSidecar)),
() => network.publishBeaconBlock(signedBlock) as Promise<unknown>,
() =>
// there is no rush to persist block since we published it to gossip anyway
Expand Down
74 changes: 61 additions & 13 deletions packages/beacon-node/src/chain/blocks/importBlock.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
import {toHexString} from "@chainsafe/ssz";
import {capella, ssz, altair, BeaconBlock} from "@lodestar/types";
import {ForkLightClient, ForkSeq, INTERVALS_PER_SLOT, MAX_SEED_LOOKAHEAD, SLOTS_PER_EPOCH} from "@lodestar/params";
import {
ForkName,
ForkLightClient,
ForkSeq,
INTERVALS_PER_SLOT,
MAX_SEED_LOOKAHEAD,
SLOTS_PER_EPOCH,
} from "@lodestar/params";
import {
CachedBeaconStateAltair,
computeEpochAtSlot,
Expand Down Expand Up @@ -101,6 +108,39 @@ export async function importBlock(
this.metrics?.importBlock.bySource.inc({source});
this.logger.verbose("Added block to forkchoice and state cache", {slot: blockSlot, root: blockRootHex});

// We want to import block asap so call all event handler in the next event loop
callInNextEventLoop(async () => {
this.emitter.emit(routes.events.EventType.block, {
block: blockRootHex,
slot: blockSlot,
executionOptimistic: blockSummary != null && isOptimisticBlock(blockSummary),
});

// dataPromise will not end up here, but preDeneb could. In future we might also allow syncing
// out of data range blocks and import then in forkchoice although one would not be able to
// attest and propose with such head similar to optimistic sync
if (blockInput.type === BlockInputType.availableData) {
const {blockData} = blockInput;
if (blockData.fork === ForkName.deneb) {
const {blobsSource, blobs} = blockData;

this.metrics?.importBlock.blobsBySource.inc({blobsSource});
for (const blobSidecar of blobs) {
const {index, kzgCommitment} = blobSidecar;
this.emitter.emit(routes.events.EventType.blobSidecar, {
blockRoot: blockRootHex,
slot: blockSlot,
index,
kzgCommitment: toHexString(kzgCommitment),
versionedHash: toHexString(kzgCommitmentToVersionedHash(kzgCommitment)),
});
}
} else if (blockData.fork === ForkName.peerdas) {
// TODO peerDAS build and emit the event for the datacolumns
}
}
});

// 3. Import attestations to fork choice
//
// - For each attestation
Expand Down Expand Up @@ -424,16 +464,20 @@ export async function importBlock(
blockInput.type === BlockInputType.availableData &&
this.emitter.listenerCount(routes.events.EventType.blobSidecar)
) {
const {blobs} = blockInput.blockData;
for (const blobSidecar of blobs) {
const {index, kzgCommitment} = blobSidecar;
this.emitter.emit(routes.events.EventType.blobSidecar, {
blockRoot: blockRootHex,
slot: blockSlot,
index,
kzgCommitment: toHexString(kzgCommitment),
versionedHash: toHexString(kzgCommitmentToVersionedHash(kzgCommitment)),
});
if (blockInput.blockData.fork === ForkName.deneb) {
const {blobs} = blockInput.blockData;
for (const blobSidecar of blobs) {
const {index, kzgCommitment} = blobSidecar;
this.emitter.emit(routes.events.EventType.blobSidecar, {
blockRoot: blockRootHex,
slot: blockSlot,
index,
kzgCommitment: toHexString(kzgCommitment),
versionedHash: toHexString(kzgCommitmentToVersionedHash(kzgCommitment)),
});
}
} else {
// TODO add event for datacolumns
}
}
});
Expand All @@ -454,8 +498,12 @@ export async function importBlock(
// out of data range blocks and import then in forkchoice although one would not be able to
// attest and propose with such head similar to optimistic sync
if (blockInput.type === BlockInputType.availableData) {
const {blobsSource} = blockInput.blockData;
this.metrics?.importBlock.blobsBySource.inc({blobsSource});
if (blockInput.blockData.fork === ForkName.deneb) {
const {blobsSource} = blockInput.blockData;
this.metrics?.importBlock.blobsBySource.inc({blobsSource});
} else {
// TODO add data columns metrics
}
}

const advancedSlot = this.clock.slotWithFutureTolerance(REPROCESS_MIN_TIME_TO_NEXT_SLOT_SEC);
Expand Down
57 changes: 51 additions & 6 deletions packages/beacon-node/src/chain/blocks/types.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import {CachedBeaconStateAllForks, computeEpochAtSlot} from "@lodestar/state-transition";
import {MaybeValidExecutionStatus, DataAvailabilityStatus} from "@lodestar/fork-choice";
import {deneb, Slot, RootHex, SignedBeaconBlock} from "@lodestar/types";
import {deneb, Slot, RootHex, SignedBeaconBlock, peerdas, ColumnIndex} from "@lodestar/types";
import {ForkSeq, ForkName} from "@lodestar/params";
import {ChainForkConfig} from "@lodestar/config";

Expand Down Expand Up @@ -29,23 +29,48 @@ export enum BlobsSource {
byRoot = "req_resp_by_root",
}

export enum DataColumnsSource {
gossip = "gossip",
api = "api",
byRange = "req_resp_by_range",
byRoot = "req_resp_by_root",
}

export enum GossipedInputType {
block = "block",
blob = "blob",
dataColumn = "dataColumn",
}

type BlobsCacheMap = Map<number, {blobSidecar: deneb.BlobSidecar; blobBytes: Uint8Array | null}>;
export type BlobsCacheMap = Map<number, {blobSidecar: deneb.BlobSidecar; blobBytes: Uint8Array | null}>;
export type DataColumnsCacheMap = Map<
number,
{dataColumnSidecar: peerdas.DataColumnSidecar; dataColumnBytes: Uint8Array | null}
>;

type ForkBlobsInfo = {fork: ForkName.deneb};
type BlobsData = {blobs: deneb.BlobSidecars; blobsBytes: (Uint8Array | null)[]; blobsSource: BlobsSource};
export type BlockInputDataBlobs = ForkBlobsInfo & BlobsData;
export type BlockInputData = BlockInputDataBlobs;

export type BlockInputBlobs = {blobs: deneb.BlobSidecars; blobsBytes: (Uint8Array | null)[]; blobsSource: BlobsSource};
type Availability<T> = {availabilityPromise: Promise<T>; resolveAvailability: (data: T) => void};
type ForkDataColumnsInfo = {fork: ForkName.peerdas};
type DataColumnsData = {
// marker of that columns are to be custodied
dataColumnsLen: number;
dataColumnsIndex: Uint8Array;
dataColumns: peerdas.DataColumnSidecars;
dataColumnsBytes: (Uint8Array | null)[];
dataColumnsSource: DataColumnsSource;
};
export type BlockInputDataDataColumns = ForkDataColumnsInfo & DataColumnsData;
export type BlockInputData = BlockInputDataBlobs | BlockInputDataDataColumns;

type Availability<T> = {availabilityPromise: Promise<T>; resolveAvailability: (data: T) => void};
type CachedBlobs = {blobsCache: BlobsCacheMap} & Availability<BlockInputDataBlobs>;
export type CachedData = ForkBlobsInfo & CachedBlobs;
type CachedDataColumns = {dataColumnsCache: DataColumnsCacheMap} & Availability<BlockInputDataDataColumns>;
export type CachedData = {cacheId: number} & (
| (ForkBlobsInfo & CachedBlobs)
| (ForkDataColumnsInfo & CachedDataColumns)
);

export type BlockInput = {block: SignedBeaconBlock; source: BlockSource; blockBytes: Uint8Array | null} & (
| {type: BlockInputType.preData | BlockInputType.outOfRangeData}
Expand Down Expand Up @@ -161,6 +186,26 @@ export function getBlockInputBlobs(blobsCache: BlobsCacheMap): Omit<BlobsData, "
return {blobs, blobsBytes};
}

export function getBlockInputDataColumns(
dataColumnsCache: DataColumnsCacheMap,
columnIndexes: ColumnIndex[]
): Omit<DataColumnsData, "dataColumnsLen" | "dataColumnsIndex" | "dataColumnsSource"> {
const dataColumns = [];
const dataColumnsBytes = [];

for (const index of columnIndexes) {
const dataColumnCache = dataColumnsCache.get(index);
if (dataColumnCache === undefined) {
// check if the index is correct as per the custody columns
throw Error(`Missing dataColumnCache at index=${index}`);
}
const {dataColumnSidecar, dataColumnBytes} = dataColumnCache;
dataColumns.push(dataColumnSidecar);
dataColumnsBytes.push(dataColumnBytes);
}
return {dataColumns, dataColumnsBytes};
}

export enum AttestationImportOpt {
Skip,
Force,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,19 @@ import {DataAvailabilityStatus} from "@lodestar/fork-choice";
import {ChainForkConfig} from "@lodestar/config";
import {deneb, UintNum64} from "@lodestar/types";
import {Logger} from "@lodestar/utils";
import {ForkName} from "@lodestar/params";
import {BlockError, BlockErrorCode} from "../errors/index.js";
import {validateBlobSidecars} from "../validation/blobSidecar.js";
import {validateDataColumnsSidecars} from "../validation/dataColumnSidecar.js";
import {Metrics} from "../../metrics/metrics.js";
import {BlockInput, BlockInputType, ImportBlockOpts, BlobSidecarValidation, getBlockInput} from "./types.js";
import {
BlockInput,
BlockInputType,
ImportBlockOpts,
BlobSidecarValidation,
getBlockInput,
BlockInputData,
} from "./types.js";

// we can now wait for full 12 seconds because unavailable block sync will try pulling
// the blobs from the network anyway after 500ms of seeing the block
Expand Down Expand Up @@ -88,27 +97,37 @@ async function maybeValidateBlobs(
// run full validation
const {block} = blockInput;
const blockSlot = block.message.slot;

const blobsData =
blockInput.type === BlockInputType.availableData
? blockInput.blockData
: await raceWithCutoff(chain, blockInput, blockInput.cachedData.availabilityPromise);
const {blobs} = blobsData;

const {blobKzgCommitments} = (block as deneb.SignedBeaconBlock).message.body;
const beaconBlockRoot = chain.config.getForkTypes(blockSlot).BeaconBlock.hashTreeRoot(block.message);

// if the blob siddecars have been individually verified then we can skip kzg proof check
// but other checks to match blobs with block data still need to be performed
const skipProofsCheck = opts.validBlobSidecars === BlobSidecarValidation.Individual;
validateBlobSidecars(blockSlot, beaconBlockRoot, blobKzgCommitments, blobs, {skipProofsCheck});
const blockData =
blockInput.type === BlockInputType.availableData
? blockInput.blockData
: await raceWithCutoff(
chain,
blockInput,
blockInput.cachedData.availabilityPromise as Promise<BlockInputData>
);

if (blockData.fork === ForkName.deneb) {
const {blobs} = blockData;

// if the blob siddecars have been individually verified then we can skip kzg proof check
// but other checks to match blobs with block data still need to be performed
const skipProofsCheck = opts.validBlobSidecars === BlobSidecarValidation.Individual;
validateBlobSidecars(blockSlot, beaconBlockRoot, blobKzgCommitments, blobs, {skipProofsCheck});
} else if (blockData.fork === ForkName.peerdas) {
const {dataColumns} = blockData;
const skipProofsCheck = opts.validBlobSidecars === BlobSidecarValidation.Individual;
// might require numColumns, custodyColumns from blockData as input to below
validateDataColumnsSidecars(blockSlot, beaconBlockRoot, blobKzgCommitments, dataColumns, {skipProofsCheck});
}

const availableBlockInput = getBlockInput.availableData(
chain.config,
blockInput.block,
blockInput.source,
blockInput.blockBytes,
blobsData
blockData
);
return {dataAvailabilityStatus: DataAvailabilityStatus.Available, availableBlockInput: availableBlockInput};
}
Expand Down
Loading