Skip to content

Commit

Permalink
added column support for req resp domain
Browse files Browse the repository at this point in the history
  • Loading branch information
agnxsh committed Nov 23, 2024
1 parent cd8de2f commit 8a48fc4
Show file tree
Hide file tree
Showing 4 changed files with 154 additions and 1 deletion.
2 changes: 2 additions & 0 deletions beacon_chain/rpc/rest_constants.nim
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,8 @@ const
"Validator inactive"
BlobsOutOfRange* =
"Requested slot is outside of blobs window"
DataColumnsOutOfRange* =
"Requested slot is outside of data columns window"
InvalidBlsToExecutionChangeObjectError* =
"Unable to decode BLS to execution change object(s)"
BlsToExecutionChangeValidationError* =
Expand Down
4 changes: 4 additions & 0 deletions beacon_chain/spec/network.nim
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ const
MAX_REQUEST_BLOB_SIDECARS*: uint64 =
MAX_REQUEST_BLOCKS_DENEB * MAX_BLOBS_PER_BLOCK

# https://github.com/ethereum/consensus-specs/blob/v1.5.0-alpha.8/specs/_features/eip7594/p2p-interface.md#configuration
MAX_REQUEST_DATA_COLUMN_SIDECARS*: uint64 =
MAX_REQUEST_BLOCKS_DENEB * NUMBER_OF_COLUMNS

defaultEth2TcpPort* = 9000
defaultEth2TcpPortDesc* = $defaultEth2TcpPort

Expand Down
2 changes: 1 addition & 1 deletion beacon_chain/sync/request_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import std/[sequtils, strutils]
import chronos, chronicles
import
../spec/datatypes/[phase0, deneb],
../spec/datatypes/[phase0, deneb, fulu],
../spec/[forks, network],
../networking/eth2_network,
../consensus_object_pools/block_quarantine,
Expand Down
147 changes: 147 additions & 0 deletions beacon_chain/sync/sync_protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ const
## Allow syncing ~64 blocks/sec (minus request costs)
blobResponseCost = allowedOpsPerSecondCost(1000)
## Multiple can exist per block, they are much smaller than blocks
dataColumnResponseCost = allowedOpsPerSecondCost(8000)
## 8 data columns take the same memory as 1 blob approximately

type
BeaconSyncNetworkState* {.final.} = ref object of RootObj
Expand All @@ -37,6 +39,7 @@ type

BlockRootsList* = List[Eth2Digest, Limit MAX_REQUEST_BLOCKS]
BlobIdentifierList* = List[BlobIdentifier, Limit (MAX_REQUEST_BLOB_SIDECARS)]
DataColumnIdentifierList* = List[DataColumnIdentifier, Limit (MAX_REQUEST_DATA_COLUMN_SIDECARS)]

proc readChunkPayload*(
conn: Connection, peer: Peer, MsgType: type (ref ForkedSignedBeaconBlock)):
Expand Down Expand Up @@ -80,6 +83,28 @@ proc readChunkPayload*(
else:
return neterr InvalidContextBytes

proc readChunkPayload*(
conn: Connection, peer: Peer, MsgType: type (ref DataColumnSidecar)):
Future[NetRes[MsgType]] {.async: (raises: [CancelledError]).} =
var contextBytes: ForkDigest
try:
await conn.readExactly(addr contextBytes, sizeof contextBytes)
except CatchableError:
return neterr UnexpectedEOF
let contextFork =
peer.network.forkDigests[].consensusForkForDigest(contextBytes).valueOr:
return neterr InvalidContextBytes

withConsensusFork(contextFork):
when consensusFork >= ConsensusFork.Fulu:
let res = await readChunkPayload(conn, peer, DataColumnSidecar)
if res.isOk:
return ok newClone(res.get)
else:
return err(res.error)
else:
return neterr InvalidContextBytes

{.pop.} # TODO fix p2p macro for raises

p2pProtocol BeaconSync(version = 1,
Expand Down Expand Up @@ -339,6 +364,128 @@ p2pProtocol BeaconSync(version = 1,
debug "BlobSidecar range request done",
peer, startSlot, count = reqCount, found

# https://github.com/ethereum/consensus-specs/blob/v1.5.0-alpha.8/specs/_features/eip7594/p2p-interface.md#datacolumnsidecarsbyroot-v1
proc dataColumnSidecarsByRoot(
peer: Peer,
colIds: DataColumnIdentifierList,
response: MultipleChunksResponse[
ref DataColumnSidecar, Limit(MAX_REQUEST_DATA_COLUMN_SIDECARS)])
{.async, libp2pProtocol("data_column_sidecars_by_root", 1).} =

trace "got data column root request", peer, len = colIds.len
if colIds.len == 0:
raise newException(InvalidInputsError, "No data columns request for root")

if colIds.lenu64 > MAX_REQUEST_DATA_COLUMN_SIDECARS:
raise newException(InvalidInputsError, "Exceeding data column request limit")

let
dag = peer.networkState.dag
count = colIds.len

var
found = 0
bytes: seq[byte]

for i in 0..<count:
let blockRef =
dag.getBlockRef(colIds[i].block_root).valueOr:
continue
let index =
colIds[i].index
if dag.db.getDataColumnSidecarSZ(blockRef.bid.root, index, bytes):
let uncompressedLen = uncompressedLenFramed(bytes).valueOr:
warn "Cannot read data column size, database corrupt?",
bytes = bytes.len, blck = shortLog(blockRef), columnIndex = index
continue

peer.awaitQuota(dataColumnResponseCost, "data_column_sidecars_by_root/1")
peer.network.awaitQuota(dataColumnResponseCost, "data_column_sidecars_by_root/1")

await response.writeBytesSZ(
uncompressedLen, bytes,
peer.network.forkDigestAtEpoch(blockRef.slot.epoch).data)
inc found

# additional logging for devnets
debug "responsded to data column sidecar by root request",
peer, blck = shortLog(blockRef), columnIndex = index

debug "Data column root request done",
peer, roots = colIds.len, count, found

# https://github.com/ethereum/consensus-specs/blob/v1.5.0-alpha.8/specs/_features/eip7594/p2p-interface.md#datacolumnsidecarsbyrange-v1
proc dataColumnSidecarsByRange(
peer: Peer,
startSlot: Slot,
reqCount: uint64,
reqColumns: List[ColumnIndex, NUMBER_OF_COLUMNS],
response: MultipleChunksResponse[
ref DataColumnSidecar, Limit(MAX_REQUEST_DATA_COLUMN_SIDECARS)])
{.async, libp2pProtocol("data_column_sidecars_by_range", 1).} =

trace "got data columns range request", peer, startSlot,
count = reqCount, columns, reqColumns

if reqCount == 0 or reqColumns.len == 0:
raise newException(InvalidInputsError, "Empty range requested")

let
dag = peer.networkState.dag
# Using MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS until
# MIN_EPOCHS_FOR_DATA_COLUMN_SIDECARS_REQUESTS is released in
# Fulu. Effectively both the values are same
epochBoundary =
if dag.cfg.MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS >= dag.head.slot.epoch:
GENESIS_EPOCH
else:
dag.head.slot.epoch - dag.cfg.MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS

if startSlot.epoch < epochBoundary:
raise newException(ResourceUnavailableError, DataColumnsOutOfRange)

var blockIds: array[int(MAX_REQUEST_DATA_COLUMN_SIDECARS), BlockId]
let
count = int min(reqCount, blockIds.lenu64)
endIndex = count - 1
startIndex =
dag.getBlockRange(startSlot, 1, blockIds.toOpenArray(0, endIndex))

var
found = 0
bytes: seq[byte]

for i in startIndex..endIndex:
for k in reqColumns:
if dag.db.getDataColumnSidecarSZ(blockIds[i].root, ColumnIndex k, bytes):
if blockIds[i].slot.epoch >= dag.cfg.DENEB_FORK_EPOCH and
not dag.head.executionValid:
continue

let uncompressedLen = uncompressedLenFramed(bytes).valueOr:
warn "Cannot read data column sidecar size, database corrup?",
bytes = bytes.len, blck = shortLog(blockIds[i])
continue

peer.awaitQuota(dataColumnResponseCost, "data_column_sidecars_by_range/1")
peer.network.awaitQuota(dataColumnResponseCost, "data_column_sidecars_by_range/1")

await response.writeBytesSZ(
uncompressedLen, bytes,
peer.network.forkDigestAtEpoch(blockIds[i].slot.epoch).data)
inc found

var
respondedCols: seq[ColumnIndex]
respondedCols.add(k)

# additional logging for devnets
debug "responded to data column sidecar range request",
peer, blck = shortLog(blockIds[i]), columns = respondedCols

debug "Data column range request done",
peer, startSlot, count = reqCount, columns = reqColumns, found

proc init*(T: type BeaconSync.NetworkState, dag: ChainDAGRef): T =
T(
dag: dag,
Expand Down

0 comments on commit 8a48fc4

Please sign in to comment.